This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 139e5b15a1c KAFKA-17928: Make remote log manager thread-pool configs 
dynamic (#17859)
139e5b15a1c is described below

commit 139e5b15a1ca1c369b7f42842da8c69caa10d09f
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Sat Dec 14 13:14:05 2024 +0530

    KAFKA-17928: Make remote log manager thread-pool configs dynamic (#17859)
    
    - Disallow configuring -1 for copier and expiration thread pools dynamically
    
    Co-authored-by: Peter Lee <[email protected]>
    
    Reviewers: Peter Lee <[email protected]>, Satish Duggana 
<[email protected]>
---
 .../org/apache/kafka/common/utils/ThreadUtils.java | 31 +++++++-
 .../java/kafka/log/remote/RemoteLogManager.java    | 88 +++++++++++-----------
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 42 +++++++++--
 .../kafka/server/DynamicBrokerConfigTest.scala     | 75 ++++++++++++++++++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  4 +-
 .../log/remote/storage/RemoteLogManagerConfig.java | 11 +--
 .../internals/log/RemoteStorageThreadPool.java     | 30 +++-----
 7 files changed, 199 insertions(+), 82 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
index 51cfe74fcde..a47e9ddb36a 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
@@ -25,12 +25,15 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static java.lang.Thread.UncaughtExceptionHandler;
+
 /**
  * Utilities for working with threads.
  */
 public class ThreadUtils {
 
     private static final Logger log = 
LoggerFactory.getLogger(ThreadUtils.class);
+
     /**
      * Create a new ThreadFactory.
      *
@@ -42,6 +45,22 @@ public class ThreadUtils {
      */
     public static ThreadFactory createThreadFactory(final String pattern,
                                                     final boolean daemon) {
+        return createThreadFactory(pattern, daemon, null);
+    }
+
+    /**
+     * Create a new ThreadFactory.
+     *
+     * @param pattern       The pattern to use.  If this contains %d, it will 
be
+     *                      replaced with a thread number.  It should not 
contain more
+     *                      than one %d.
+     * @param daemon        True if we want daemon threads.
+     * @param ueh           thread's uncaught exception handler.
+     * @return              The new ThreadFactory.
+     */
+    public static ThreadFactory createThreadFactory(final String pattern,
+                                                    final boolean daemon,
+                                                    final 
UncaughtExceptionHandler ueh) {
         return new ThreadFactory() {
             private final AtomicLong threadEpoch = new AtomicLong(0);
 
@@ -55,6 +74,9 @@ public class ThreadUtils {
                 }
                 Thread thread = new Thread(r, threadName);
                 thread.setDaemon(daemon);
+                if (ueh != null) {
+                    thread.setUncaughtExceptionHandler(ueh);
+                }
                 return thread;
             }
         };
@@ -64,12 +86,15 @@ public class ThreadUtils {
      * Shuts down an executor service in two phases, first by calling shutdown 
to reject incoming tasks,
      * and then calling shutdownNow, if necessary, to cancel any lingering 
tasks.
      * After the timeout/on interrupt, the service is forcefully closed.
+     * This pattern of shutting down thread pool is adopted from here:
+     * <a 
href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html";>ExecutorService</a>
      * @param executorService The service to shut down.
-     * @param timeout The timeout of the shutdown.
-     * @param timeUnit The time unit of the shutdown timeout.
+     * @param timeout         The timeout of the shutdown.
+     * @param timeUnit        The time unit of the shutdown timeout.
      */
     public static void shutdownExecutorServiceQuietly(ExecutorService 
executorService,
-                                                      long timeout, TimeUnit 
timeUnit) {
+                                                      long timeout,
+                                                      TimeUnit timeUnit) {
         if (executorService == null) {
             return;
         }
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index b2b1ab856c0..03c2ceb2125 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -41,8 +41,8 @@ import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ChildFirstClassLoader;
 import org.apache.kafka.common.utils.CloseableIterator;
-import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.common.CheckpointFile;
@@ -130,7 +130,6 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
@@ -162,7 +161,7 @@ import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE
 public class RemoteLogManager implements Closeable {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.class);
-    private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = 
"remote-log-reader";
+    private static final String REMOTE_LOG_READER_THREAD_NAME_PATTERN = 
"remote-log-reader-%d";
     private final RemoteLogManagerConfig rlmConfig;
     private final int brokerId;
     private final String logDir;
@@ -255,18 +254,18 @@ public class RemoteLogManager implements Closeable {
         indexCache = new 
RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), 
remoteLogStorageManager, logDir);
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
         rlmCopyThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerCopierThreadPoolSize(),
-            "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-");
+            "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-%d");
         rlmExpirationThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerExpirationThreadPoolSize(),
-            "RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-");
+            "RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-%d");
         followerThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
-            "RLMFollowerScheduledThreadPool", 
"kafka-rlm-follower-thread-pool-");
+            "RLMFollowerScheduledThreadPool", 
"kafka-rlm-follower-thread-pool-%d");
 
         
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, 
rlmCopyThreadPool::getIdlePercent);
         remoteReadTimer = 
metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC,
                 TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
 
         remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
-                REMOTE_LOG_READER_THREAD_NAME_PREFIX,
+                REMOTE_LOG_READER_THREAD_NAME_PATTERN,
                 rlmConfig.remoteLogReaderThreads(),
                 rlmConfig.remoteLogReaderMaxPendingTasks()
         );
@@ -290,6 +289,24 @@ public class RemoteLogManager implements Closeable {
         rlmFetchQuotaManager.updateQuota(new Quota(quota, true));
     }
 
+    public void resizeCopierThreadPool(int newSize) {
+        int currentSize = rlmCopyThreadPool.getCorePoolSize();
+        LOGGER.info("Updating remote copy thread pool size from {} to {}", 
currentSize, newSize);
+        rlmCopyThreadPool.setCorePoolSize(newSize);
+    }
+
+    public void resizeExpirationThreadPool(int newSize) {
+        int currentSize = rlmExpirationThreadPool.getCorePoolSize();
+        LOGGER.info("Updating remote expiration thread pool size from {} to 
{}", currentSize, newSize);
+        rlmExpirationThreadPool.setCorePoolSize(newSize);
+    }
+
+    public void resizeReaderThreadPool(int newSize) {
+        int currentSize = remoteStorageReaderThreadPool.getCorePoolSize();
+        LOGGER.info("Updating remote reader thread pool size from {} to {}", 
currentSize, newSize);
+        remoteStorageReaderThreadPool.setCorePoolSize(newSize);
+    }
+
     private void removeMetrics() {
         
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
         
metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
@@ -2077,28 +2094,10 @@ public class RemoteLogManager implements Closeable {
         }
     }
 
-    private static void shutdownAndAwaitTermination(ExecutorService pool, 
String poolName, long timeout, TimeUnit timeUnit) {
-        // This pattern of shutting down thread pool is adopted from here: 
https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html
-        LOGGER.info("Shutting down of thread pool {} is started", poolName);
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(timeout, timeUnit)) {
-                LOGGER.info("Shutting down of thread pool {} could not be 
completed. It will retry cancelling the tasks using shutdownNow.", poolName);
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(timeout, timeUnit))
-                    LOGGER.warn("Shutting down of thread pool {} could not be 
completed even after retrying cancellation of the tasks using shutdownNow.", 
poolName);
-            }
-        } catch (InterruptedException ex) {
-            // (Re-)Cancel if current thread also interrupted
-            LOGGER.warn("Encountered InterruptedException while shutting down 
thread pool {}. It will retry cancelling the tasks using shutdownNow.", 
poolName);
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-
-        LOGGER.info("Shutting down of thread pool {} is completed", poolName);
+    private static void shutdownAndAwaitTermination(ExecutorService executor, 
String poolName, long timeout, TimeUnit timeUnit) {
+        LOGGER.info("Shutting down {} executor", poolName);
+        ThreadUtils.shutdownExecutorServiceQuietly(executor, timeout, 
timeUnit);
+        LOGGER.info("{} executor shutdown completed", poolName);
     }
 
     //Visible for testing
@@ -2152,31 +2151,32 @@ public class RemoteLogManager implements Closeable {
     static class RLMScheduledThreadPool {
 
         private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMScheduledThreadPool.class);
-        private final int poolSize;
         private final String threadPoolName;
-        private final String threadNamePrefix;
+        private final String threadNamePattern;
         private final ScheduledThreadPoolExecutor scheduledThreadPool;
 
-        public RLMScheduledThreadPool(int poolSize, String threadPoolName, 
String threadNamePrefix) {
-            this.poolSize = poolSize;
+        public RLMScheduledThreadPool(int poolSize, String threadPoolName, 
String threadNamePattern) {
             this.threadPoolName = threadPoolName;
-            this.threadNamePrefix = threadNamePrefix;
-            scheduledThreadPool = createPool();
+            this.threadNamePattern = threadNamePattern;
+            scheduledThreadPool = createPool(poolSize);
+        }
+
+        public void setCorePoolSize(int newSize) {
+            scheduledThreadPool.setCorePoolSize(newSize);
+        }
+
+        public int getCorePoolSize() {
+            return scheduledThreadPool.getCorePoolSize();
         }
 
-        private ScheduledThreadPoolExecutor createPool() {
+        private ScheduledThreadPoolExecutor createPool(int poolSize) {
+            ThreadFactory threadFactory = 
ThreadUtils.createThreadFactory(threadNamePattern, true,
+                    (t, e) -> LOGGER.error("Uncaught exception in thread 
'{}':", t.getName(), e));
             ScheduledThreadPoolExecutor threadPool = new 
ScheduledThreadPoolExecutor(poolSize);
             threadPool.setRemoveOnCancelPolicy(true);
             
threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
             
threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
-            threadPool.setThreadFactory(new ThreadFactory() {
-                private final AtomicInteger sequence = new AtomicInteger();
-
-                public Thread newThread(Runnable r) {
-                    return KafkaThread.daemon(threadNamePrefix + 
sequence.incrementAndGet(), r);
-                }
-            });
-
+            threadPool.setThreadFactory(threadFactory);
             return threadPool;
         }
 
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 9f69a44c919..02fbf7eb2f0 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1167,6 +1167,22 @@ class DynamicRemoteLogConfig(server: KafkaBroker) 
extends BrokerReconfigurable w
           throw new ConfigException(s"$errorMsg, value should be at least 1")
         }
       }
+
+      if (RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP.equals(k) ||
+          
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP.equals(k)
 ||
+          
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP.equals(k))
 {
+        val newValue = v.asInstanceOf[Int]
+        val oldValue = server.config.getInt(k)
+        if (newValue != oldValue) {
+          val errorMsg = s"Dynamic thread count update validation failed for 
$k=$v"
+          if (newValue <= 0)
+            throw new ConfigException(s"$errorMsg, value should be at least 1")
+          if (newValue < oldValue / 2)
+            throw new ConfigException(s"$errorMsg, value should be at least 
half the current value $oldValue")
+          if (newValue > oldValue * 2)
+            throw new ConfigException(s"$errorMsg, value should not be greater 
than double the current value $oldValue")
+        }
+      }
     }
   }
 
@@ -1176,29 +1192,40 @@ class DynamicRemoteLogConfig(server: KafkaBroker) 
extends BrokerReconfigurable w
 
     def isChangedLongValue(k : String): Boolean = oldLongValue(k) != 
newLongValue(k)
 
-    val remoteLogManager = server.remoteLogManagerOpt
-    if (remoteLogManager.nonEmpty) {
+    if (server.remoteLogManagerOpt.nonEmpty) {
+      val remoteLogManager = server.remoteLogManagerOpt.get
       if 
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
 {
         val oldValue = 
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
         val newValue = 
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
-        remoteLogManager.get.resizeCacheSize(newValue)
+        remoteLogManager.resizeCacheSize(newValue)
         info(s"Dynamic remote log manager config: 
${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} 
updated, " +
           s"old value: $oldValue, new value: $newValue")
       }
       if 
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP))
 {
         val oldValue = 
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
         val newValue = 
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
-        remoteLogManager.get.updateCopyQuota(newValue)
+        remoteLogManager.updateCopyQuota(newValue)
         info(s"Dynamic remote log manager config: 
${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP} 
updated, " +
           s"old value: $oldValue, new value: $newValue")
       }
       if 
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP))
 {
         val oldValue = 
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
         val newValue = 
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
-        remoteLogManager.get.updateFetchQuota(newValue)
+        remoteLogManager.updateFetchQuota(newValue)
         info(s"Dynamic remote log manager config: 
${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP} 
updated, " +
           s"old value: $oldValue, new value: $newValue")
       }
+
+      val newRLMConfig = newConfig.remoteLogManagerConfig
+      val oldRLMConfig = oldConfig.remoteLogManagerConfig
+      if (newRLMConfig.remoteLogManagerCopierThreadPoolSize() != 
oldRLMConfig.remoteLogManagerCopierThreadPoolSize())
+        
remoteLogManager.resizeCopierThreadPool(newRLMConfig.remoteLogManagerCopierThreadPoolSize())
+
+      if (newRLMConfig.remoteLogManagerExpirationThreadPoolSize() != 
oldRLMConfig.remoteLogManagerExpirationThreadPoolSize())
+        
remoteLogManager.resizeExpirationThreadPool(newRLMConfig.remoteLogManagerExpirationThreadPoolSize())
+
+      if (newRLMConfig.remoteLogReaderThreads() != 
oldRLMConfig.remoteLogReaderThreads())
+        
remoteLogManager.resizeReaderThreadPool(newRLMConfig.remoteLogReaderThreads())
     }
   }
 
@@ -1219,6 +1246,9 @@ object DynamicRemoteLogConfig {
     RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
     RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
     RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
-    RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP
+    RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
+    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+    RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
   )
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 768be4786bd..4ee1a108d1d 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -202,6 +202,81 @@ class DynamicBrokerConfigTest {
     )
   }
 
+  @Test
+  def testUpdateRemoteLogManagerDynamicThreadPool(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+    val config = KafkaConfig(origProps)
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
 config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
 config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
+    assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_READER_THREADS, 
config.remoteLogManagerConfig.remoteLogReaderThreads())
+
+    val serverMock = mock(classOf[KafkaBroker])
+    val remoteLogManager = mock(classOf[RemoteLogManager])
+    when(serverMock.config).thenReturn(config)
+    when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
+
+    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
+
+    // Test dynamic update with valid values
+    val props = new Properties()
+    
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
 "8")
+    config.dynamicConfig.validate(props, perBrokerConfig = true)
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(8, 
config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
+    verify(remoteLogManager).resizeCopierThreadPool(8)
+
+    
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
 "7")
+    config.dynamicConfig.validate(props, perBrokerConfig = false)
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(7, 
config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
+    verify(remoteLogManager).resizeExpirationThreadPool(7)
+
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "6")
+    config.dynamicConfig.validate(props, perBrokerConfig = true)
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(6, config.remoteLogManagerConfig.remoteLogReaderThreads())
+    verify(remoteLogManager).resizeReaderThreadPool(6)
+    props.clear()
+    verifyNoMoreInteractions(remoteLogManager)
+  }
+
+  @Test
+  def testRemoteLogDynamicThreadPoolWithInvalidValues(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+    val config = KafkaConfig(origProps)
+
+    val serverMock = mock(classOf[KafkaBroker])
+    val remoteLogManager = mock(classOf[RemoteLogManager])
+    when(serverMock.config).thenReturn(config)
+    when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
+
+    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
+
+    // Test dynamic update with invalid values
+    val props = new Properties()
+    
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
 "0")
+    val err = assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(props, perBrokerConfig = true))
+    assertTrue(err.getMessage.contains("Value must be at least 1"))
+
+    val props1 = new Properties()
+    
props1.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
 "-1")
+    val err1 = assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(props1, perBrokerConfig = false))
+    assertTrue(err1.getMessage.contains("Value must be at least 1"))
+
+    val props2 = new Properties()
+    props2.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "2")
+    val err2 = assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(props2, perBrokerConfig = false))
+    assertTrue(err2.getMessage.contains("value should be at least half the 
current value"))
+
+    val props3 = new Properties()
+    props3.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "-1")
+    val err3 = assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(props, perBrokerConfig = true))
+    assertTrue(err3.getMessage.contains("Value must be at least 1"))
+    verifyNoMoreInteractions(remoteLogManager)
+  }
+
   @nowarn("cat=deprecation")
   @Test
   def testConfigUpdateWithSomeInvalidConfigs(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index a349564b951..755c83f082e 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1102,8 +1102,8 @@ class KafkaConfigTest {
         case 
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => // 
ignore string
         case 
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
-        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
-        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
+        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, -2)
+        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, -2)
         case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 1be0f0e1890..d6cf56eff44 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -93,21 +93,18 @@ public final class RemoteLogManagerConfig {
     public static final long 
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
 
     public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.thread.pool.size";
-    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = 
"Deprecated. Size of the thread pool used in scheduling tasks to copy " +
-            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size 
of the thread pool used in scheduling follower tasks to read " +
+            "the highest-uploaded remote-offset for follower partitions.";
     public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 2;
 
-    private static final String REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK = "The 
default value of -1 means that this will be set to the configured value of " +
-        REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP + ", if available; otherwise, 
it defaults to " + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE + ".";
-
     public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP 
= "remote.log.manager.copier.thread.pool.size";
     public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC 
= "Size of the thread pool used in scheduling tasks " +
-            "to copy segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
+            "to copy segments.";
     public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE 
= 10;
 
     public static final String 
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.expiration.thread.pool.size";
     public static final String 
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool 
used in scheduling tasks " +
-            "to clean up remote log segments. " + 
REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
+            "to clean up the expired remote log segments.";
     public static final int 
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10;
     
     public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = 
"remote.log.manager.task.interval.ms";
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
index 849ac556155..9c6b9f644e4 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -18,16 +18,15 @@ package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.internals.FatalExitError;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC;
@@ -37,11 +36,17 @@ public final class RemoteStorageThreadPool extends 
ThreadPoolExecutor {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteStorageThreadPool.class);
     private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup(this.getClass());
 
-    public RemoteStorageThreadPool(String threadNamePrefix,
+    public RemoteStorageThreadPool(String threadNamePattern,
                                    int numThreads,
                                    int maxPendingTasks) {
-        super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<>(maxPendingTasks),
-                new RemoteStorageThreadFactory(threadNamePrefix));
+        super(numThreads,
+                numThreads,
+                0L,
+                TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<>(maxPendingTasks),
+                ThreadUtils.createThreadFactory(threadNamePattern, false,
+                        (t, e) -> LOGGER.error("Uncaught exception in thread 
'{}':", t.getName(), e))
+        );
         
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
                 () -> getQueue().size());
         
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
@@ -61,21 +66,6 @@ public final class RemoteStorageThreadPool extends 
ThreadPoolExecutor {
         }
     }
 
-    private static class RemoteStorageThreadFactory implements ThreadFactory {
-        private final String namePrefix;
-        private final AtomicInteger threadNumber = new AtomicInteger(0);
-
-        RemoteStorageThreadFactory(String namePrefix) {
-            this.namePrefix = namePrefix;
-        }
-
-        @Override
-        public Thread newThread(Runnable r) {
-            return new Thread(r, namePrefix + threadNumber.getAndIncrement());
-        }
-
-    }
-
     public void removeMetrics() {
         REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
     }

Reply via email to