This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 88b9ff30adf KAFKA-15859 Introduce 
remote.list.offsets.request.timeout.ms dynamic config (#17045)
88b9ff30adf is described below

commit 88b9ff30adf19ba43072a8b26ede07aa4bfa727d
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Tue Sep 3 20:36:01 2024 +0530

    KAFKA-15859 Introduce remote.list.offsets.request.timeout.ms dynamic config 
(#17045)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/suppressions.xml                        |  1 +
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  3 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     | 35 ++++++++++++++++++++++
 .../log/remote/storage/RemoteLogManagerConfig.java | 16 +++++++++-
 4 files changed, 53 insertions(+), 2 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a0cd6750b56..331ecebf655 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,6 +39,7 @@
     <suppress checks="NPathComplexity" 
files="(ClusterTestExtensions|KafkaApisBuilder|SharePartition).java"/>
     <suppress 
checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling|JavaNCSS"
 files="(RemoteLogManager|RemoteLogManagerTest).java"/>
     <suppress checks="MethodLength" files="RemoteLogManager.java"/>
+    <suppress checks="MethodLength" files="RemoteLogManagerConfig.java"/>
     <suppress checks="ClassFanOutComplexity" 
files="RemoteLogManagerTest.java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index a1d03cf9bce..dbae5d28373 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1215,6 +1215,7 @@ object DynamicRemoteLogConfig {
     RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
     RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
     RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
-    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP
+    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
+    RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP
   )
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index e83f5a41a3a..716a361f722 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -826,6 +826,41 @@ class DynamicBrokerConfigTest {
     }
   }
 
+  @Test
+  def testDynamicRemoteListOffsetsRequestTimeoutMsConfig(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val config = KafkaConfig(props)
+    val kafkaBroker = mock(classOf[KafkaBroker])
+    when(kafkaBroker.config).thenReturn(config)
+    when(kafkaBroker.remoteLogManagerOpt).thenReturn(None)
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS,
+      config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs)
+
+    val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
+    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
+
+    val newProps = new Properties()
+    
newProps.put(RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
 "60000")
+    // update default config
+    config.dynamicConfig.validate(newProps, perBrokerConfig = false)
+    config.dynamicConfig.updateDefaultConfig(newProps)
+    assertEquals(60000L, 
config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs)
+
+    // update per broker config
+    
newProps.put(RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
 "10000")
+    config.dynamicConfig.validate(newProps, perBrokerConfig = true)
+    config.dynamicConfig.updateBrokerConfig(0, newProps)
+    assertEquals(10000L, 
config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs)
+
+    // invalid values
+    for (timeoutMs <- Seq(-1, 0)) {
+      
newProps.put(RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
 timeoutMs.toString)
+      assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(newProps, perBrokerConfig = true))
+      assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(newProps, perBrokerConfig = false))
+    }
+  }
+
   @Test
   def testUpdateDynamicRemoteLogManagerConfig(): Unit = {
     val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 9f73af4a7c9..02f3d3a2860 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -186,6 +186,10 @@ public final class RemoteLogManagerConfig {
     public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum 
amount of time the server will wait before answering the remote fetch request";
     public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
 
+    public static final String REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP = 
"remote.list.offsets.request.timeout.ms";
+    public static final String REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_DOC = 
"The maximum amount of time the server will wait for the remote list offsets 
request to complete.";
+    public static final long DEFAULT_REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS = 
30000L;
+
     private final AbstractConfig config;
 
     public static ConfigDef configDef() {
@@ -353,7 +357,13 @@ public final class RemoteLogManagerConfig {
                         DEFAULT_REMOTE_FETCH_MAX_WAIT_MS,
                         atLeast(1),
                         MEDIUM,
-                        REMOTE_FETCH_MAX_WAIT_MS_DOC);
+                        REMOTE_FETCH_MAX_WAIT_MS_DOC)
+                .define(REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
+                        LONG,
+                        DEFAULT_REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_DOC);
     }
     
     public RemoteLogManagerConfig(AbstractConfig config) {
@@ -485,6 +495,10 @@ public final class RemoteLogManagerConfig {
         return 
config.getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP);
     }
 
+    public long remoteListOffsetsRequestTimeoutMs() {
+        return config.getLong(REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP);
+    }
+
     public static void main(String[] args) {
         System.out.println(configDef().toHtml(4, config -> 
"remote_log_manager_" + config));
     }

Reply via email to