This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 12d8a1bbf8e KAFKA-19237: Add dynamic config
remote.log.manager.follower.thread.pool.size (#19809)
12d8a1bbf8e is described below
commit 12d8a1bbf8e9f6db733f7902e2527861aee2a51d
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Fri Jun 13 12:03:45 2025 +0800
KAFKA-19237: Add dynamic config
remote.log.manager.follower.thread.pool.size (#19809)
Deprecate the `remote.log.manager.thread.pool.size` configuration and
introduce a new dynamic configuration:
`remote.log.manager.follower.thread.pool.size`.
Reviewers: Kamal Chandraprakash <[email protected]>, Luke Chen
<[email protected]>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 18 +++++++++--
.../kafka/server/DynamicBrokerConfigTest.scala | 35 ++++++++++++++++++++
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 +
docs/configuration.html | 1 +
.../log/remote/storage/RemoteLogManager.java | 13 +++++++-
.../log/remote/storage/RemoteLogManagerConfig.java | 37 +++++++++++++++++++---
.../remote/storage/RemoteLogManagerConfigTest.java | 23 +++++++++++++-
.../log/remote/storage/RemoteLogManagerTest.java | 9 ++++++
8 files changed, 129 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index bdd9c94a319..74ae428deae 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1029,9 +1029,19 @@ class DynamicRemoteLogConfig(server: KafkaBroker)
extends BrokerReconfigurable w
if (RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP.equals(k) ||
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP.equals(k)
||
-
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP.equals(k))
{
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP.equals(k)
||
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP.equals(k))
{
val newValue = v.asInstanceOf[Int]
- val oldValue = server.config.getInt(k)
+ val oldValue: Int = {
+ // This logic preserves backward compatibility in scenarios where
+ // `remote.log.manager.thread.pool.size` is configured in config
file,
+ // but `remote.log.manager.follower.thread.pool.size` is set
dynamically.
+ // This can be removed once `remote.log.manager.thread.pool.size` is
removed.
+ if
(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP.equals(k))
+
server.config.remoteLogManagerConfig.remoteLogManagerFollowerThreadPoolSize()
+ else
+ server.config.getInt(k)
+ }
if (newValue != oldValue) {
val errorMsg = s"Dynamic thread count update validation failed for
$k=$v"
if (newValue <= 0)
@@ -1083,6 +1093,9 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends
BrokerReconfigurable w
if (newRLMConfig.remoteLogManagerExpirationThreadPoolSize() !=
oldRLMConfig.remoteLogManagerExpirationThreadPoolSize())
remoteLogManager.resizeExpirationThreadPool(newRLMConfig.remoteLogManagerExpirationThreadPoolSize())
+ if (newRLMConfig.remoteLogManagerFollowerThreadPoolSize() !=
oldRLMConfig.remoteLogManagerFollowerThreadPoolSize())
+
remoteLogManager.resizeFollowerThreadPool(newRLMConfig.remoteLogManagerFollowerThreadPoolSize())
+
if (newRLMConfig.remoteLogReaderThreads() !=
oldRLMConfig.remoteLogReaderThreads())
remoteLogManager.resizeReaderThreadPool(newRLMConfig.remoteLogReaderThreads())
}
@@ -1108,6 +1121,7 @@ object DynamicRemoteLogConfig {
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+ RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
)
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index c20cd042fc5..141b5138c07 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -175,6 +175,7 @@ class DynamicBrokerConfigTest {
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_READER_THREADS,
config.remoteLogManagerConfig.remoteLogReaderThreads())
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE,
config.remoteLogManagerConfig.remoteLogManagerFollowerThreadPoolSize())
val serverMock = mock(classOf[KafkaBroker])
val remoteLogManager = mock(classOf[RemoteLogManager])
@@ -203,6 +204,13 @@ class DynamicBrokerConfigTest {
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(6, config.remoteLogManagerConfig.remoteLogReaderThreads())
verify(remoteLogManager).resizeReaderThreadPool(6)
+
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
"3")
+ config.dynamicConfig.validate(props, perBrokerConfig = false)
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(3,
config.remoteLogManagerConfig.remoteLogManagerFollowerThreadPoolSize())
+ verify(remoteLogManager).resizeFollowerThreadPool(3)
+
props.clear()
verifyNoMoreInteractions(remoteLogManager)
}
@@ -241,6 +249,33 @@ class DynamicBrokerConfigTest {
val err3 = assertThrows(classOf[ConfigException], () =>
config.dynamicConfig.validate(props, perBrokerConfig = true))
assertTrue(err3.getMessage.contains("Value must be at least 1"))
verifyNoMoreInteractions(remoteLogManager)
+
+ val props4 = new Properties()
+
props4.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
"10")
+ val err4 = assertThrows(classOf[ConfigException], () =>
config.dynamicConfig.validate(props4, perBrokerConfig = false))
+ assertTrue(err4.getMessage.contains("value should not be greater than
double the current value"))
+ verifyNoMoreInteractions(remoteLogManager)
+ }
+
+ @Test
+ def testDynamicRemoteLogManagerFollowerThreadPoolSizeConfig(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, port = 9092)
+
origProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
"10")
+ val config = KafkaConfig(origProps)
+
+ val serverMock = mock(classOf[KafkaBroker])
+ val remoteLogManager = mock(classOf[RemoteLogManager])
+ when(serverMock.config).thenReturn(config)
+ when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
+
+ config.dynamicConfig.initialize(None)
+ config.dynamicConfig.addBrokerReconfigurable(new
DynamicRemoteLogConfig(serverMock))
+
+ val props = new Properties()
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
"2")
+ val err = assertThrows(classOf[ConfigException], () =>
config.dynamicConfig.validate(props, perBrokerConfig = false))
+ assertTrue(err.getMessage.contains("value should be at least half the
current value"))
+ verifyNoMoreInteractions(remoteLogManager)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index d16201462dd..3b38dd592ee 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1010,6 +1010,7 @@ class KafkaConfigTest {
case
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => //
ignore string
case
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, -2)
case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, -2)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git a/docs/configuration.html b/docs/configuration.html
index c922d92f105..f6dcde9a106 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -156,6 +156,7 @@
<li><code>remote.log.reader.threads</code></li>
<li><code>remote.log.manager.copier.thread.pool.size</code></li>
<li><code>remote.log.manager.expiration.thread.pool.size</code></li>
+ <li><code>remote.log.manager.follower.thread.pool.size</code></li>
</ul>
<h5>Updating ConnectionQuota Configs</h5>
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index ddb584bee2e..0970f6f0a34 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -246,7 +246,7 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
"RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-%d");
rlmExpirationThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerExpirationThreadPoolSize(),
"RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-%d");
- followerThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+ followerThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerFollowerThreadPoolSize(),
"RLMFollowerScheduledThreadPool",
"kafka-rlm-follower-thread-pool-%d");
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC,
rlmCopyThreadPool::getIdlePercent);
@@ -290,6 +290,12 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
rlmExpirationThreadPool.setCorePoolSize(newSize);
}
+ public void resizeFollowerThreadPool(int newSize) {
+ int currentSize = followerThreadPool.getCorePoolSize();
+ LOGGER.info("Updating remote follower thread pool size from {} to {}",
currentSize, newSize);
+ followerThreadPool.setCorePoolSize(newSize);
+ }
+
public void resizeReaderThreadPool(int newSize) {
int currentSize = remoteStorageReaderThreadPool.getCorePoolSize();
int currentMaximumSize =
remoteStorageReaderThreadPool.getMaximumPoolSize();
@@ -314,6 +320,11 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
return remoteStorageReaderThreadPool.getCorePoolSize();
}
+ // Visible for testing
+ int followerThreadPoolSize() {
+ return followerThreadPool.getCorePoolSize();
+ }
+
/**
* Returns the timeout for the RLM Tasks to wait for the quota to be
available
*/
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 15002d12205..8e0acbb16ee 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -92,10 +92,19 @@ public final class RemoteLogManagerConfig {
"from remote storage in the local storage.";
public static final long
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
+ public static final String
REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP =
"remote.log.manager.follower.thread.pool.size";
+ public static final String
REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_DOC = "Size of the thread pool
used in scheduling follower tasks to read " +
+ "the highest-uploaded remote-offset for follower partitions.";
+ public static final int
DEFAULT_REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE = 2;
+
+ @Deprecated(since = "4.2", forRemoval = true)
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP =
"remote.log.manager.thread.pool.size";
+ @Deprecated(since = "4.2", forRemoval = true)
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size
of the thread pool used in scheduling follower tasks to read " +
- "the highest-uploaded remote-offset for follower partitions.";
- public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 2;
+ "the highest-uploaded remote-offset for follower partitions. This
config is deprecated since 4.2, please use <code>" +
+ REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP + "</code>
instead.";
+ @Deprecated(since = "4.2", forRemoval = true)
+ public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE =
DEFAULT_REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE;
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP
= "remote.log.manager.copier.thread.pool.size";
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC
= "Size of the thread pool used in scheduling tasks " +
@@ -106,7 +115,7 @@ public final class RemoteLogManagerConfig {
public static final String
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool
used in scheduling tasks " +
"to clean up the expired remote log segments.";
public static final int
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10;
-
+
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP =
"remote.log.manager.task.interval.ms";
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC =
"Interval at which remote log manager runs the scheduled tasks like copy " +
"segments, and clean up remote log segments.";
@@ -270,6 +279,12 @@ public final class RemoteLogManagerConfig {
atLeast(1),
MEDIUM,
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
+ .define(REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
+ INT,
+ DEFAULT_REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_DOC)
.define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
LONG,
DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS,
@@ -391,8 +406,14 @@ public final class RemoteLogManagerConfig {
return config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP);
}
+
+ /**
+ * @deprecated since 4.2, please use {@link
#remoteLogManagerFollowerThreadPoolSize()} instead.
+ * @return the value of the remote log manager follower thread pool size.
+ */
+ @Deprecated(since = "4.2", forRemoval = true)
public int remoteLogManagerThreadPoolSize() {
- return config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP);
+ return remoteLogManagerFollowerThreadPoolSize();
}
public int remoteLogManagerCopierThreadPoolSize() {
@@ -403,6 +424,14 @@ public final class RemoteLogManagerConfig {
return
config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
}
+ public int remoteLogManagerFollowerThreadPoolSize() {
+ if
(config.originals().containsKey(REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP))
{
+ return
config.getInt(REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP);
+ } else {
+ return config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP);
+ }
+ }
+
public long remoteLogManagerTaskIntervalMs() {
return config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP);
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index a83951a2196..70ca4333cce 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+@SuppressWarnings("removal")
public class RemoteLogManagerConfigTest {
@Test
public void testValidConfigs() {
@@ -62,16 +63,34 @@ public class RemoteLogManagerConfigTest {
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE,
remoteLogManagerConfigEmptyConfig.remoteLogManagerFollowerThreadPoolSize());
}
@Test
public void testValidateEmptyStringConfig() {
- // Test with a empty string props should throw ConfigException
+ // Test with an empty string props should throw ConfigException
Map<String, Object> emptyStringProps =
Map.of(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
"");
assertThrows(ConfigException.class, () ->
new RLMTestConfig(emptyStringProps).remoteLogManagerConfig());
}
+ @Test
+ public void testRemoteLogManagerFollowerThreadPoolSize() {
+ Map<String, Object> props = Map.of(
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP, 1,
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP, 2);
+ RemoteLogManagerConfig rlmConfig = new
RLMTestConfig(props).remoteLogManagerConfig();
+ assertEquals(2, rlmConfig.remoteLogManagerFollowerThreadPoolSize());
+
+ props =
Map.of(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP, 1);
+ rlmConfig = new RLMTestConfig(props).remoteLogManagerConfig();
+ assertEquals(1, rlmConfig.remoteLogManagerFollowerThreadPoolSize());
+
+ props =
Map.of(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
2);
+ rlmConfig = new RLMTestConfig(props).remoteLogManagerConfig();
+ assertEquals(2, rlmConfig.remoteLogManagerFollowerThreadPoolSize());
+ }
+
private Map<String, Object> getRLMProps(String rsmPrefix, String
rlmmPrefix) {
Map<String, Object> props = new HashMap<>();
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
@@ -89,6 +108,8 @@ public class RemoteLogManagerConfigTest {
1024 * 1024L);
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
1);
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
+ 1);
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
1);
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index c79e7c7f806..e82536d7233 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -3762,6 +3762,15 @@ public class RemoteLogManagerTest {
assertEquals(12, remoteLogManager.readerThreadPoolSize());
}
+ @Test
+ void testUpdateRemoteStorageFollowerThreads() {
+ assertEquals(2, remoteLogManager.followerThreadPoolSize());
+ remoteLogManager.resizeFollowerThreadPool(6);
+ assertEquals(6, remoteLogManager.followerThreadPoolSize());
+ remoteLogManager.resizeFollowerThreadPool(4);
+ assertEquals(4, remoteLogManager.followerThreadPoolSize());
+ }
+
private void appendRecordsToFile(File file, int nRecords, int
nRecordsPerBatch) throws IOException {
byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
Compression compression = Compression.NONE;