This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 12d8a1bbf8e KAFKA-19237: Add dynamic config 
remote.log.manager.follower.thread.pool.size (#19809)
12d8a1bbf8e is described below

commit 12d8a1bbf8e9f6db733f7902e2527861aee2a51d
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Fri Jun 13 12:03:45 2025 +0800

    KAFKA-19237: Add dynamic config 
remote.log.manager.follower.thread.pool.size (#19809)
    
    Deprecate the `remote.log.manager.thread.pool.size` configuration and 
introduce a new dynamic configuration:
    `remote.log.manager.follower.thread.pool.size`.
    
    Reviewers: Kamal Chandraprakash <[email protected]>, Luke Chen 
<[email protected]>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 18 +++++++++--
 .../kafka/server/DynamicBrokerConfigTest.scala     | 35 ++++++++++++++++++++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  1 +
 docs/configuration.html                            |  1 +
 .../log/remote/storage/RemoteLogManager.java       | 13 +++++++-
 .../log/remote/storage/RemoteLogManagerConfig.java | 37 +++++++++++++++++++---
 .../remote/storage/RemoteLogManagerConfigTest.java | 23 +++++++++++++-
 .../log/remote/storage/RemoteLogManagerTest.java   |  9 ++++++
 8 files changed, 129 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index bdd9c94a319..74ae428deae 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1029,9 +1029,19 @@ class DynamicRemoteLogConfig(server: KafkaBroker) 
extends BrokerReconfigurable w
 
       if (RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP.equals(k) ||
           
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP.equals(k)
 ||
-          
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP.equals(k))
 {
+          
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP.equals(k)
 ||
+          
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP.equals(k))
 {
         val newValue = v.asInstanceOf[Int]
-        val oldValue = server.config.getInt(k)
+        val oldValue: Int = {
+          // This logic preserves backward compatibility in scenarios where
+          // `remote.log.manager.thread.pool.size` is configured in config 
file,
+          // but `remote.log.manager.follower.thread.pool.size` is set 
dynamically.
+          // This can be removed once `remote.log.manager.thread.pool.size` is 
removed.
+          if 
(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP.equals(k))
+            
server.config.remoteLogManagerConfig.remoteLogManagerFollowerThreadPoolSize()
+          else
+            server.config.getInt(k)
+        }
         if (newValue != oldValue) {
           val errorMsg = s"Dynamic thread count update validation failed for 
$k=$v"
           if (newValue <= 0)
@@ -1083,6 +1093,9 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends 
BrokerReconfigurable w
       if (newRLMConfig.remoteLogManagerExpirationThreadPoolSize() != 
oldRLMConfig.remoteLogManagerExpirationThreadPoolSize())
         
remoteLogManager.resizeExpirationThreadPool(newRLMConfig.remoteLogManagerExpirationThreadPoolSize())
 
+      if (newRLMConfig.remoteLogManagerFollowerThreadPoolSize() != 
oldRLMConfig.remoteLogManagerFollowerThreadPoolSize())
+        
remoteLogManager.resizeFollowerThreadPool(newRLMConfig.remoteLogManagerFollowerThreadPoolSize())
+
       if (newRLMConfig.remoteLogReaderThreads() != 
oldRLMConfig.remoteLogReaderThreads())
         
remoteLogManager.resizeReaderThreadPool(newRLMConfig.remoteLogReaderThreads())
     }
@@ -1108,6 +1121,7 @@ object DynamicRemoteLogConfig {
     RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
     RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
     RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
     RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
   )
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index c20cd042fc5..141b5138c07 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -175,6 +175,7 @@ class DynamicBrokerConfigTest {
     
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
 config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
     
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
 config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
     assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_READER_THREADS, 
config.remoteLogManagerConfig.remoteLogReaderThreads())
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE,
 config.remoteLogManagerConfig.remoteLogManagerFollowerThreadPoolSize())
 
     val serverMock = mock(classOf[KafkaBroker])
     val remoteLogManager = mock(classOf[RemoteLogManager])
@@ -203,6 +204,13 @@ class DynamicBrokerConfigTest {
     config.dynamicConfig.updateDefaultConfig(props)
     assertEquals(6, config.remoteLogManagerConfig.remoteLogReaderThreads())
     verify(remoteLogManager).resizeReaderThreadPool(6)
+
+    
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
 "3")
+    config.dynamicConfig.validate(props, perBrokerConfig = false)
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(3, 
config.remoteLogManagerConfig.remoteLogManagerFollowerThreadPoolSize())
+    verify(remoteLogManager).resizeFollowerThreadPool(3)
+
     props.clear()
     verifyNoMoreInteractions(remoteLogManager)
   }
@@ -241,6 +249,33 @@ class DynamicBrokerConfigTest {
     val err3 = assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(props, perBrokerConfig = true))
     assertTrue(err3.getMessage.contains("Value must be at least 1"))
     verifyNoMoreInteractions(remoteLogManager)
+
+    val props4 = new Properties()
+    
props4.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
 "10")
+    val err4 = assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(props4, perBrokerConfig = false))
+    assertTrue(err4.getMessage.contains("value should not be greater than 
double the current value"))
+    verifyNoMoreInteractions(remoteLogManager)
+  }
+
+  @Test
+  def testDynamicRemoteLogManagerFollowerThreadPoolSizeConfig(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, port = 9092)
+    
origProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP, 
"10")
+    val config = KafkaConfig(origProps)
+
+    val serverMock = mock(classOf[KafkaBroker])
+    val remoteLogManager = mock(classOf[RemoteLogManager])
+    when(serverMock.config).thenReturn(config)
+    when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
+
+    config.dynamicConfig.initialize(None)
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
+
+    val props = new Properties()
+    
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
 "2")
+    val err = assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(props, perBrokerConfig = false))
+    assertTrue(err.getMessage.contains("value should be at least half the 
current value"))
+    verifyNoMoreInteractions(remoteLogManager)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index d16201462dd..3b38dd592ee 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1010,6 +1010,7 @@ class KafkaConfigTest {
         case 
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => // 
ignore string
         case 
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, -2)
         case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, -2)
         case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git a/docs/configuration.html b/docs/configuration.html
index c922d92f105..f6dcde9a106 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -156,6 +156,7 @@
     <li><code>remote.log.reader.threads</code></li>
     <li><code>remote.log.manager.copier.thread.pool.size</code></li>
     <li><code>remote.log.manager.expiration.thread.pool.size</code></li>
+    <li><code>remote.log.manager.follower.thread.pool.size</code></li>
   </ul>
 
   <h5>Updating ConnectionQuota Configs</h5>
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index ddb584bee2e..0970f6f0a34 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -246,7 +246,7 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
             "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-%d");
         rlmExpirationThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerExpirationThreadPoolSize(),
             "RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-%d");
-        followerThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+        followerThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerFollowerThreadPoolSize(),
             "RLMFollowerScheduledThreadPool", 
"kafka-rlm-follower-thread-pool-%d");
 
         
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, 
rlmCopyThreadPool::getIdlePercent);
@@ -290,6 +290,12 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
         rlmExpirationThreadPool.setCorePoolSize(newSize);
     }
 
+    public void resizeFollowerThreadPool(int newSize) {
+        int currentSize = followerThreadPool.getCorePoolSize();
+        LOGGER.info("Updating remote follower thread pool size from {} to {}", 
currentSize, newSize);
+        followerThreadPool.setCorePoolSize(newSize);
+    }
+
     public void resizeReaderThreadPool(int newSize) {
         int currentSize = remoteStorageReaderThreadPool.getCorePoolSize();
         int currentMaximumSize = 
remoteStorageReaderThreadPool.getMaximumPoolSize();
@@ -314,6 +320,11 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
         return remoteStorageReaderThreadPool.getCorePoolSize();
     }
 
+    // Visible for testing
+    int followerThreadPoolSize() {
+        return followerThreadPool.getCorePoolSize();
+    }
+
     /**
      * Returns the timeout for the RLM Tasks to wait for the quota to be 
available
      */
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 15002d12205..8e0acbb16ee 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -92,10 +92,19 @@ public final class RemoteLogManagerConfig {
             "from remote storage in the local storage.";
     public static final long 
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
 
+    public static final String 
REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.follower.thread.pool.size";
+    public static final String 
REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_DOC = "Size of the thread pool 
used in scheduling follower tasks to read " +
+            "the highest-uploaded remote-offset for follower partitions.";
+    public static final int 
DEFAULT_REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE = 2;
+
+    @Deprecated(since = "4.2", forRemoval = true)
     public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.thread.pool.size";
+    @Deprecated(since = "4.2", forRemoval = true)
     public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size 
of the thread pool used in scheduling follower tasks to read " +
-            "the highest-uploaded remote-offset for follower partitions.";
-    public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 2;
+            "the highest-uploaded remote-offset for follower partitions. This 
config is deprecated since 4.2, please use <code>" +
+            REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP + "</code> 
instead.";
+    @Deprecated(since = "4.2", forRemoval = true)
+    public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 
DEFAULT_REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE;
 
     public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP 
= "remote.log.manager.copier.thread.pool.size";
     public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC 
= "Size of the thread pool used in scheduling tasks " +
@@ -106,7 +115,7 @@ public final class RemoteLogManagerConfig {
     public static final String 
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool 
used in scheduling tasks " +
             "to clean up the expired remote log segments.";
     public static final int 
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10;
-    
+
     public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = 
"remote.log.manager.task.interval.ms";
     public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = 
"Interval at which remote log manager runs the scheduled tasks like copy " +
             "segments, and clean up remote log segments.";
@@ -270,6 +279,12 @@ public final class RemoteLogManagerConfig {
                         atLeast(1),
                         MEDIUM,
                         REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
+                .define(REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
+                        INT,
+                        DEFAULT_REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_DOC)
                 .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
                         LONG,
                         DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS,
@@ -391,8 +406,14 @@ public final class RemoteLogManagerConfig {
         return config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP);
     }
 
+
+    /**
+     * @deprecated since 4.2, please use {@link 
#remoteLogManagerFollowerThreadPoolSize()} instead.
+     * @return the value of the remote log manager follower thread pool size.
+     */
+    @Deprecated(since = "4.2", forRemoval = true)
     public int remoteLogManagerThreadPoolSize() {
-        return config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP);
+        return remoteLogManagerFollowerThreadPoolSize();
     }
 
     public int remoteLogManagerCopierThreadPoolSize() {
@@ -403,6 +424,14 @@ public final class RemoteLogManagerConfig {
         return 
config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
     }
 
+    public int remoteLogManagerFollowerThreadPoolSize() {
+        if 
(config.originals().containsKey(REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP))
 {
+            return 
config.getInt(REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP);
+        } else {
+            return config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP);
+        }
+    }
+
     public long remoteLogManagerTaskIntervalMs() {
         return config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP);
     }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index a83951a2196..70ca4333cce 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+@SuppressWarnings("removal")
 public class RemoteLogManagerConfigTest {
     @Test
     public void testValidConfigs() {
@@ -62,16 +63,34 @@ public class RemoteLogManagerConfigTest {
         
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
 remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
         
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
 remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
         
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
 remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
+        
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE,
 remoteLogManagerConfigEmptyConfig.remoteLogManagerFollowerThreadPoolSize());
     }
 
     @Test
     public void testValidateEmptyStringConfig() {
-        // Test with a empty string props should throw ConfigException
+        // Test with an empty string props should throw ConfigException
         Map<String, Object> emptyStringProps = 
Map.of(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, 
"");
         assertThrows(ConfigException.class, () ->
                 new RLMTestConfig(emptyStringProps).remoteLogManagerConfig());
     }
 
+    @Test
+    public void testRemoteLogManagerFollowerThreadPoolSize() {
+        Map<String, Object> props = Map.of(
+                
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP, 1,
+                
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP, 2);
+        RemoteLogManagerConfig rlmConfig = new 
RLMTestConfig(props).remoteLogManagerConfig();
+        assertEquals(2, rlmConfig.remoteLogManagerFollowerThreadPoolSize());
+
+        props = 
Map.of(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP, 1);
+        rlmConfig = new RLMTestConfig(props).remoteLogManagerConfig();
+        assertEquals(1, rlmConfig.remoteLogManagerFollowerThreadPoolSize());
+
+        props = 
Map.of(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
 2);
+        rlmConfig = new RLMTestConfig(props).remoteLogManagerConfig();
+        assertEquals(2, rlmConfig.remoteLogManagerFollowerThreadPoolSize());
+    }
+
     private Map<String, Object> getRLMProps(String rsmPrefix, String 
rlmmPrefix) {
         Map<String, Object> props = new HashMap<>();
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
@@ -89,6 +108,8 @@ public class RemoteLogManagerConfigTest {
                 1024 * 1024L);
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
                 1);
+        
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
+                1);
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
                 1);
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index c79e7c7f806..e82536d7233 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -3762,6 +3762,15 @@ public class RemoteLogManagerTest {
         assertEquals(12, remoteLogManager.readerThreadPoolSize());
     }
 
+    @Test
+    void testUpdateRemoteStorageFollowerThreads() {
+        assertEquals(2, remoteLogManager.followerThreadPoolSize());
+        remoteLogManager.resizeFollowerThreadPool(6);
+        assertEquals(6, remoteLogManager.followerThreadPoolSize());
+        remoteLogManager.resizeFollowerThreadPool(4);
+        assertEquals(4, remoteLogManager.followerThreadPoolSize());
+    }
+
     private void appendRecordsToFile(File file, int nRecords, int 
nRecordsPerBatch) throws IOException {
         byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
         Compression compression = Compression.NONE;

Reply via email to