This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 139e5b15a1c KAFKA-17928: Make remote log manager thread-pool configs
dynamic (#17859)
139e5b15a1c is described below
commit 139e5b15a1ca1c369b7f42842da8c69caa10d09f
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Sat Dec 14 13:14:05 2024 +0530
KAFKA-17928: Make remote log manager thread-pool configs dynamic (#17859)
- Disallow configuring -1 for copier and expiration thread pools dynamically
Co-authored-by: Peter Lee <[email protected]>
Reviewers: Peter Lee <[email protected]>, Satish Duggana
<[email protected]>
---
.../org/apache/kafka/common/utils/ThreadUtils.java | 31 +++++++-
.../java/kafka/log/remote/RemoteLogManager.java | 88 +++++++++++-----------
.../scala/kafka/server/DynamicBrokerConfig.scala | 42 +++++++++--
.../kafka/server/DynamicBrokerConfigTest.scala | 75 ++++++++++++++++++
.../scala/unit/kafka/server/KafkaConfigTest.scala | 4 +-
.../log/remote/storage/RemoteLogManagerConfig.java | 11 +--
.../internals/log/RemoteStorageThreadPool.java | 30 +++-----
7 files changed, 199 insertions(+), 82 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
b/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
index 51cfe74fcde..a47e9ddb36a 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
@@ -25,12 +25,15 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static java.lang.Thread.UncaughtExceptionHandler;
+
/**
* Utilities for working with threads.
*/
public class ThreadUtils {
private static final Logger log =
LoggerFactory.getLogger(ThreadUtils.class);
+
/**
* Create a new ThreadFactory.
*
@@ -42,6 +45,22 @@ public class ThreadUtils {
*/
public static ThreadFactory createThreadFactory(final String pattern,
final boolean daemon) {
+ return createThreadFactory(pattern, daemon, null);
+ }
+
+ /**
+ * Create a new ThreadFactory.
+ *
+ * @param pattern The pattern to use. If this contains %d, it will
be
+ * replaced with a thread number. It should not
contain more
+ * than one %d.
+ * @param daemon True if we want daemon threads.
+ * @param ueh thread's uncaught exception handler.
+ * @return The new ThreadFactory.
+ */
+ public static ThreadFactory createThreadFactory(final String pattern,
+ final boolean daemon,
+ final
UncaughtExceptionHandler ueh) {
return new ThreadFactory() {
private final AtomicLong threadEpoch = new AtomicLong(0);
@@ -55,6 +74,9 @@ public class ThreadUtils {
}
Thread thread = new Thread(r, threadName);
thread.setDaemon(daemon);
+ if (ueh != null) {
+ thread.setUncaughtExceptionHandler(ueh);
+ }
return thread;
}
};
@@ -64,12 +86,15 @@ public class ThreadUtils {
* Shuts down an executor service in two phases, first by calling shutdown
to reject incoming tasks,
* and then calling shutdownNow, if necessary, to cancel any lingering
tasks.
* After the timeout/on interrupt, the service is forcefully closed.
+ * This pattern of shutting down thread pool is adopted from here:
+ * <a
href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html">ExecutorService</a>
* @param executorService The service to shut down.
- * @param timeout The timeout of the shutdown.
- * @param timeUnit The time unit of the shutdown timeout.
+ * @param timeout The timeout of the shutdown.
+ * @param timeUnit The time unit of the shutdown timeout.
*/
public static void shutdownExecutorServiceQuietly(ExecutorService
executorService,
- long timeout, TimeUnit
timeUnit) {
+ long timeout,
+ TimeUnit timeUnit) {
if (executorService == null) {
return;
}
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index b2b1ab856c0..03c2ceb2125 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -41,8 +41,8 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ChildFirstClassLoader;
import org.apache.kafka.common.utils.CloseableIterator;
-import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.CheckpointFile;
@@ -130,7 +130,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
@@ -162,7 +161,7 @@ import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE
public class RemoteLogManager implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(RemoteLogManager.class);
- private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX =
"remote-log-reader";
+ private static final String REMOTE_LOG_READER_THREAD_NAME_PATTERN =
"remote-log-reader-%d";
private final RemoteLogManagerConfig rlmConfig;
private final int brokerId;
private final String logDir;
@@ -255,18 +254,18 @@ public class RemoteLogManager implements Closeable {
indexCache = new
RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(),
remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmCopyThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerCopierThreadPoolSize(),
- "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-");
+ "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-%d");
rlmExpirationThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerExpirationThreadPoolSize(),
- "RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-");
+ "RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-%d");
followerThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
- "RLMFollowerScheduledThreadPool",
"kafka-rlm-follower-thread-pool-");
+ "RLMFollowerScheduledThreadPool",
"kafka-rlm-follower-thread-pool-%d");
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC,
rlmCopyThreadPool::getIdlePercent);
remoteReadTimer =
metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC,
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
- REMOTE_LOG_READER_THREAD_NAME_PREFIX,
+ REMOTE_LOG_READER_THREAD_NAME_PATTERN,
rlmConfig.remoteLogReaderThreads(),
rlmConfig.remoteLogReaderMaxPendingTasks()
);
@@ -290,6 +289,24 @@ public class RemoteLogManager implements Closeable {
rlmFetchQuotaManager.updateQuota(new Quota(quota, true));
}
+ public void resizeCopierThreadPool(int newSize) {
+ int currentSize = rlmCopyThreadPool.getCorePoolSize();
+ LOGGER.info("Updating remote copy thread pool size from {} to {}",
currentSize, newSize);
+ rlmCopyThreadPool.setCorePoolSize(newSize);
+ }
+
+ public void resizeExpirationThreadPool(int newSize) {
+ int currentSize = rlmExpirationThreadPool.getCorePoolSize();
+ LOGGER.info("Updating remote expiration thread pool size from {} to
{}", currentSize, newSize);
+ rlmExpirationThreadPool.setCorePoolSize(newSize);
+ }
+
+ public void resizeReaderThreadPool(int newSize) {
+ int currentSize = remoteStorageReaderThreadPool.getCorePoolSize();
+ LOGGER.info("Updating remote reader thread pool size from {} to {}",
currentSize, newSize);
+ remoteStorageReaderThreadPool.setCorePoolSize(newSize);
+ }
+
private void removeMetrics() {
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
@@ -2077,28 +2094,10 @@ public class RemoteLogManager implements Closeable {
}
}
- private static void shutdownAndAwaitTermination(ExecutorService pool,
String poolName, long timeout, TimeUnit timeUnit) {
- // This pattern of shutting down thread pool is adopted from here:
https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html
- LOGGER.info("Shutting down of thread pool {} is started", poolName);
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(timeout, timeUnit)) {
- LOGGER.info("Shutting down of thread pool {} could not be
completed. It will retry cancelling the tasks using shutdownNow.", poolName);
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(timeout, timeUnit))
- LOGGER.warn("Shutting down of thread pool {} could not be
completed even after retrying cancellation of the tasks using shutdownNow.",
poolName);
- }
- } catch (InterruptedException ex) {
- // (Re-)Cancel if current thread also interrupted
- LOGGER.warn("Encountered InterruptedException while shutting down
thread pool {}. It will retry cancelling the tasks using shutdownNow.",
poolName);
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
-
- LOGGER.info("Shutting down of thread pool {} is completed", poolName);
+ private static void shutdownAndAwaitTermination(ExecutorService executor,
String poolName, long timeout, TimeUnit timeUnit) {
+ LOGGER.info("Shutting down {} executor", poolName);
+ ThreadUtils.shutdownExecutorServiceQuietly(executor, timeout,
timeUnit);
+ LOGGER.info("{} executor shutdown completed", poolName);
}
//Visible for testing
@@ -2152,31 +2151,32 @@ public class RemoteLogManager implements Closeable {
static class RLMScheduledThreadPool {
private static final Logger LOGGER =
LoggerFactory.getLogger(RLMScheduledThreadPool.class);
- private final int poolSize;
private final String threadPoolName;
- private final String threadNamePrefix;
+ private final String threadNamePattern;
private final ScheduledThreadPoolExecutor scheduledThreadPool;
- public RLMScheduledThreadPool(int poolSize, String threadPoolName,
String threadNamePrefix) {
- this.poolSize = poolSize;
+ public RLMScheduledThreadPool(int poolSize, String threadPoolName,
String threadNamePattern) {
this.threadPoolName = threadPoolName;
- this.threadNamePrefix = threadNamePrefix;
- scheduledThreadPool = createPool();
+ this.threadNamePattern = threadNamePattern;
+ scheduledThreadPool = createPool(poolSize);
+ }
+
+ public void setCorePoolSize(int newSize) {
+ scheduledThreadPool.setCorePoolSize(newSize);
+ }
+
+ public int getCorePoolSize() {
+ return scheduledThreadPool.getCorePoolSize();
}
- private ScheduledThreadPoolExecutor createPool() {
+ private ScheduledThreadPoolExecutor createPool(int poolSize) {
+ ThreadFactory threadFactory =
ThreadUtils.createThreadFactory(threadNamePattern, true,
+ (t, e) -> LOGGER.error("Uncaught exception in thread
'{}':", t.getName(), e));
ScheduledThreadPoolExecutor threadPool = new
ScheduledThreadPoolExecutor(poolSize);
threadPool.setRemoveOnCancelPolicy(true);
threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- threadPool.setThreadFactory(new ThreadFactory() {
- private final AtomicInteger sequence = new AtomicInteger();
-
- public Thread newThread(Runnable r) {
- return KafkaThread.daemon(threadNamePrefix +
sequence.incrementAndGet(), r);
- }
- });
-
+ threadPool.setThreadFactory(threadFactory);
return threadPool;
}
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 9f69a44c919..02fbf7eb2f0 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1167,6 +1167,22 @@ class DynamicRemoteLogConfig(server: KafkaBroker)
extends BrokerReconfigurable w
throw new ConfigException(s"$errorMsg, value should be at least 1")
}
}
+
+ if (RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP.equals(k) ||
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP.equals(k)
||
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP.equals(k))
{
+ val newValue = v.asInstanceOf[Int]
+ val oldValue = server.config.getInt(k)
+ if (newValue != oldValue) {
+ val errorMsg = s"Dynamic thread count update validation failed for
$k=$v"
+ if (newValue <= 0)
+ throw new ConfigException(s"$errorMsg, value should be at least 1")
+ if (newValue < oldValue / 2)
+ throw new ConfigException(s"$errorMsg, value should be at least
half the current value $oldValue")
+ if (newValue > oldValue * 2)
+ throw new ConfigException(s"$errorMsg, value should not be greater
than double the current value $oldValue")
+ }
+ }
}
}
@@ -1176,29 +1192,40 @@ class DynamicRemoteLogConfig(server: KafkaBroker)
extends BrokerReconfigurable w
def isChangedLongValue(k : String): Boolean = oldLongValue(k) !=
newLongValue(k)
- val remoteLogManager = server.remoteLogManagerOpt
- if (remoteLogManager.nonEmpty) {
+ if (server.remoteLogManagerOpt.nonEmpty) {
+ val remoteLogManager = server.remoteLogManagerOpt.get
if
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
{
val oldValue =
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
val newValue =
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
- remoteLogManager.get.resizeCacheSize(newValue)
+ remoteLogManager.resizeCacheSize(newValue)
info(s"Dynamic remote log manager config:
${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP}
updated, " +
s"old value: $oldValue, new value: $newValue")
}
if
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP))
{
val oldValue =
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
val newValue =
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
- remoteLogManager.get.updateCopyQuota(newValue)
+ remoteLogManager.updateCopyQuota(newValue)
info(s"Dynamic remote log manager config:
${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP}
updated, " +
s"old value: $oldValue, new value: $newValue")
}
if
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP))
{
val oldValue =
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
val newValue =
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
- remoteLogManager.get.updateFetchQuota(newValue)
+ remoteLogManager.updateFetchQuota(newValue)
info(s"Dynamic remote log manager config:
${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP}
updated, " +
s"old value: $oldValue, new value: $newValue")
}
+
+ val newRLMConfig = newConfig.remoteLogManagerConfig
+ val oldRLMConfig = oldConfig.remoteLogManagerConfig
+ if (newRLMConfig.remoteLogManagerCopierThreadPoolSize() !=
oldRLMConfig.remoteLogManagerCopierThreadPoolSize())
+
remoteLogManager.resizeCopierThreadPool(newRLMConfig.remoteLogManagerCopierThreadPoolSize())
+
+ if (newRLMConfig.remoteLogManagerExpirationThreadPoolSize() !=
oldRLMConfig.remoteLogManagerExpirationThreadPoolSize())
+
remoteLogManager.resizeExpirationThreadPool(newRLMConfig.remoteLogManagerExpirationThreadPoolSize())
+
+ if (newRLMConfig.remoteLogReaderThreads() !=
oldRLMConfig.remoteLogReaderThreads())
+
remoteLogManager.resizeReaderThreadPool(newRLMConfig.remoteLogReaderThreads())
}
}
@@ -1219,6 +1246,9 @@ object DynamicRemoteLogConfig {
RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
- RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP
+ RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
+ RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+ RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+ RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
)
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 768be4786bd..4ee1a108d1d 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -202,6 +202,81 @@ class DynamicBrokerConfigTest {
)
}
+ @Test
+ def testUpdateRemoteLogManagerDynamicThreadPool(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect,
port = 8181)
+ val config = KafkaConfig(origProps)
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
+ assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_READER_THREADS,
config.remoteLogManagerConfig.remoteLogReaderThreads())
+
+ val serverMock = mock(classOf[KafkaBroker])
+ val remoteLogManager = mock(classOf[RemoteLogManager])
+ when(serverMock.config).thenReturn(config)
+ when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
+
+ config.dynamicConfig.initialize(None, None)
+ config.dynamicConfig.addBrokerReconfigurable(new
DynamicRemoteLogConfig(serverMock))
+
+ // Test dynamic update with valid values
+ val props = new Properties()
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
"8")
+ config.dynamicConfig.validate(props, perBrokerConfig = true)
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(8,
config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
+ verify(remoteLogManager).resizeCopierThreadPool(8)
+
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
"7")
+ config.dynamicConfig.validate(props, perBrokerConfig = false)
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(7,
config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
+ verify(remoteLogManager).resizeExpirationThreadPool(7)
+
+ props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "6")
+ config.dynamicConfig.validate(props, perBrokerConfig = true)
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(6, config.remoteLogManagerConfig.remoteLogReaderThreads())
+ verify(remoteLogManager).resizeReaderThreadPool(6)
+ props.clear()
+ verifyNoMoreInteractions(remoteLogManager)
+ }
+
+ @Test
+ def testRemoteLogDynamicThreadPoolWithInvalidValues(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect,
port = 8181)
+ val config = KafkaConfig(origProps)
+
+ val serverMock = mock(classOf[KafkaBroker])
+ val remoteLogManager = mock(classOf[RemoteLogManager])
+ when(serverMock.config).thenReturn(config)
+ when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
+
+ config.dynamicConfig.initialize(None, None)
+ config.dynamicConfig.addBrokerReconfigurable(new
DynamicRemoteLogConfig(serverMock))
+
+ // Test dynamic update with invalid values
+ val props = new Properties()
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
"0")
+ val err = assertThrows(classOf[ConfigException], () =>
config.dynamicConfig.validate(props, perBrokerConfig = true))
+ assertTrue(err.getMessage.contains("Value must be at least 1"))
+
+ val props1 = new Properties()
+
props1.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
"-1")
+ val err1 = assertThrows(classOf[ConfigException], () =>
config.dynamicConfig.validate(props1, perBrokerConfig = false))
+ assertTrue(err1.getMessage.contains("Value must be at least 1"))
+
+ val props2 = new Properties()
+ props2.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "2")
+ val err2 = assertThrows(classOf[ConfigException], () =>
config.dynamicConfig.validate(props2, perBrokerConfig = false))
+ assertTrue(err2.getMessage.contains("value should be at least half the
current value"))
+
+ val props3 = new Properties()
+ props3.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "-1")
+ val err3 = assertThrows(classOf[ConfigException], () =>
config.dynamicConfig.validate(props, perBrokerConfig = true))
+ assertTrue(err3.getMessage.contains("Value must be at least 1"))
+ verifyNoMoreInteractions(remoteLogManager)
+ }
+
@nowarn("cat=deprecation")
@Test
def testConfigUpdateWithSomeInvalidConfigs(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index a349564b951..755c83f082e 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1102,8 +1102,8 @@ class KafkaConfigTest {
case
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => //
ignore string
case
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
- case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
- case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
+ case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, -2)
+ case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, -2)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 1be0f0e1890..d6cf56eff44 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -93,21 +93,18 @@ public final class RemoteLogManagerConfig {
public static final long
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP =
"remote.log.manager.thread.pool.size";
- public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC =
"Deprecated. Size of the thread pool used in scheduling tasks to copy " +
- "segments, fetch remote log indexes and clean up remote log
segments.";
+ public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size
of the thread pool used in scheduling follower tasks to read " +
+ "the highest-uploaded remote-offset for follower partitions.";
public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 2;
- private static final String REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK = "The
default value of -1 means that this will be set to the configured value of " +
- REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP + ", if available; otherwise,
it defaults to " + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE + ".";
-
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP
= "remote.log.manager.copier.thread.pool.size";
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC
= "Size of the thread pool used in scheduling tasks " +
- "to copy segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
+ "to copy segments.";
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE
= 10;
public static final String
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP =
"remote.log.manager.expiration.thread.pool.size";
public static final String
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool
used in scheduling tasks " +
- "to clean up remote log segments. " +
REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
+ "to clean up the expired remote log segments.";
public static final int
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10;
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP =
"remote.log.manager.task.interval.ms";
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
index 849ac556155..9c6b9f644e4 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -18,16 +18,15 @@ package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC;
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC;
@@ -37,11 +36,17 @@ public final class RemoteStorageThreadPool extends
ThreadPoolExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(RemoteStorageThreadPool.class);
private final KafkaMetricsGroup metricsGroup = new
KafkaMetricsGroup(this.getClass());
- public RemoteStorageThreadPool(String threadNamePrefix,
+ public RemoteStorageThreadPool(String threadNamePattern,
int numThreads,
int maxPendingTasks) {
- super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<>(maxPendingTasks),
- new RemoteStorageThreadFactory(threadNamePrefix));
+ super(numThreads,
+ numThreads,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(maxPendingTasks),
+ ThreadUtils.createThreadFactory(threadNamePattern, false,
+ (t, e) -> LOGGER.error("Uncaught exception in thread
'{}':", t.getName(), e))
+ );
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
() -> getQueue().size());
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
@@ -61,21 +66,6 @@ public final class RemoteStorageThreadPool extends
ThreadPoolExecutor {
}
}
- private static class RemoteStorageThreadFactory implements ThreadFactory {
- private final String namePrefix;
- private final AtomicInteger threadNumber = new AtomicInteger(0);
-
- RemoteStorageThreadFactory(String namePrefix) {
- this.namePrefix = namePrefix;
- }
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, namePrefix + threadNumber.getAndIncrement());
- }
-
- }
-
public void removeMetrics() {
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
}