This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 88b9ff30adf KAFKA-15859 Introduce
remote.list.offsets.request.timeout.ms dynamic config (#17045)
88b9ff30adf is described below
commit 88b9ff30adf19ba43072a8b26ede07aa4bfa727d
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Tue Sep 3 20:36:01 2024 +0530
KAFKA-15859 Introduce remote.list.offsets.request.timeout.ms dynamic config
(#17045)
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/suppressions.xml | 1 +
.../scala/kafka/server/DynamicBrokerConfig.scala | 3 +-
.../kafka/server/DynamicBrokerConfigTest.scala | 35 ++++++++++++++++++++++
.../log/remote/storage/RemoteLogManagerConfig.java | 16 +++++++++-
4 files changed, 53 insertions(+), 2 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a0cd6750b56..331ecebf655 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,6 +39,7 @@
<suppress checks="NPathComplexity"
files="(ClusterTestExtensions|KafkaApisBuilder|SharePartition).java"/>
<suppress
checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling|JavaNCSS"
files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="MethodLength" files="RemoteLogManager.java"/>
+ <suppress checks="MethodLength" files="RemoteLogManagerConfig.java"/>
<suppress checks="ClassFanOutComplexity"
files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index a1d03cf9bce..dbae5d28373 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1215,6 +1215,7 @@ object DynamicRemoteLogConfig {
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
- RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP
+ RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
+ RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP
)
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index e83f5a41a3a..716a361f722 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -826,6 +826,41 @@ class DynamicBrokerConfigTest {
}
}
+ @Test
+ def testDynamicRemoteListOffsetsRequestTimeoutMsConfig(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val config = KafkaConfig(props)
+ val kafkaBroker = mock(classOf[KafkaBroker])
+ when(kafkaBroker.config).thenReturn(config)
+ when(kafkaBroker.remoteLogManagerOpt).thenReturn(None)
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS,
+ config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs)
+
+ val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
+ config.dynamicConfig.initialize(None, None)
+ config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
+
+ val newProps = new Properties()
+
newProps.put(RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
"60000")
+ // update default config
+ config.dynamicConfig.validate(newProps, perBrokerConfig = false)
+ config.dynamicConfig.updateDefaultConfig(newProps)
+ assertEquals(60000L,
config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs)
+
+ // update per broker config
+
newProps.put(RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
"10000")
+ config.dynamicConfig.validate(newProps, perBrokerConfig = true)
+ config.dynamicConfig.updateBrokerConfig(0, newProps)
+ assertEquals(10000L,
config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs)
+
+ // invalid values
+ for (timeoutMs <- Seq(-1, 0)) {
+
newProps.put(RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
timeoutMs.toString)
+ assertThrows(classOf[ConfigException], () =>
config.dynamicConfig.validate(newProps, perBrokerConfig = true))
+ assertThrows(classOf[ConfigException], () =>
config.dynamicConfig.validate(newProps, perBrokerConfig = false))
+ }
+ }
+
@Test
def testUpdateDynamicRemoteLogManagerConfig(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect,
port = 8181)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 9f73af4a7c9..02f3d3a2860 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -186,6 +186,10 @@ public final class RemoteLogManagerConfig {
public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum
amount of time the server will wait before answering the remote fetch request";
public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
+ public static final String REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP =
"remote.list.offsets.request.timeout.ms";
+ public static final String REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_DOC =
"The maximum amount of time the server will wait for the remote list offsets
request to complete.";
+ public static final long DEFAULT_REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS =
30000L;
+
private final AbstractConfig config;
public static ConfigDef configDef() {
@@ -353,7 +357,13 @@ public final class RemoteLogManagerConfig {
DEFAULT_REMOTE_FETCH_MAX_WAIT_MS,
atLeast(1),
MEDIUM,
- REMOTE_FETCH_MAX_WAIT_MS_DOC);
+ REMOTE_FETCH_MAX_WAIT_MS_DOC)
+ .define(REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
+ LONG,
+ DEFAULT_REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_DOC);
}
public RemoteLogManagerConfig(AbstractConfig config) {
@@ -485,6 +495,10 @@ public final class RemoteLogManagerConfig {
return
config.getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP);
}
+ public long remoteListOffsetsRequestTimeoutMs() {
+ return config.getLong(REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP);
+ }
+
public static void main(String[] args) {
System.out.println(configDef().toHtml(4, config ->
"remote_log_manager_" + config));
}