This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2637d12e2f4 KAFKA-16908 Refactor `QuorumConfig` with `AbstractConfig` 
(#17231)
2637d12e2f4 is described below

commit 2637d12e2f4cbd8d46c1ed108a9a3b4e12c0c924
Author: xijiu <[email protected]>
AuthorDate: Thu Sep 26 04:06:42 2024 +0800

    KAFKA-16908 Refactor `QuorumConfig` with `AbstractConfig` (#17231)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/raft/RaftManager.scala   |  4 +--
 core/src/main/scala/kafka/server/KafkaConfig.scala | 19 ++++-------
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  4 +--
 core/src/main/scala/kafka/server/KafkaServer.scala |  4 +--
 .../main/scala/kafka/tools/TestRaftServer.scala    |  4 +--
 .../scala/unit/kafka/raft/RaftManagerTest.scala    |  4 +--
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  4 +--
 .../java/org/apache/kafka/raft/QuorumConfig.java   | 38 +++++++++-------------
 .../apache/kafka/raft/RaftClientTestContext.java   | 17 +++++-----
 .../apache/kafka/raft/RaftEventSimulationTest.java | 17 +++++-----
 10 files changed, 52 insertions(+), 63 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala 
b/core/src/main/scala/kafka/raft/RaftManager.scala
index d248443e611..bfde57c6706 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -247,7 +247,7 @@ class KafkaRaftManager[T](
 
   private def buildNetworkChannel(): KafkaNetworkChannel = {
     val (listenerName, netClient) = buildNetworkClient()
-    new KafkaNetworkChannel(time, listenerName, netClient, 
config.quorumRequestTimeoutMs, threadNamePrefix)
+    new KafkaNetworkChannel(time, listenerName, netClient, 
config.quorumConfig.requestTimeoutMs, threadNamePrefix)
   }
 
   private def createDataDir(): File = {
@@ -313,7 +313,7 @@ class KafkaRaftManager[T](
       reconnectBackoffMsMs,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       config.socketReceiveBufferBytes,
-      config.quorumRequestTimeoutMs,
+      config.quorumConfig.requestTimeoutMs,
       config.connectionSetupTimeoutMs,
       config.connectionSetupTimeoutMaxMs,
       time,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 454308990c8..237da5c8e3b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -231,6 +231,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
   def remoteLogManagerConfig = _remoteLogManagerConfig
 
+  private val _quorumConfig = new QuorumConfig(this)
+  def quorumConfig: QuorumConfig = _quorumConfig
+
   private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this)
 
   def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig
@@ -664,16 +667,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   def lz4CompressionLevel = getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG)
   def zstdCompressionLevel = 
getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG)
 
-  /** ********* Raft Quorum Configuration *********/
-  val quorumVoters = getList(QuorumConfig.QUORUM_VOTERS_CONFIG)
-  val quorumBootstrapServers = 
getList(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG)
-  val quorumElectionTimeoutMs = 
getInt(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG)
-  val quorumFetchTimeoutMs = 
getInt(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG)
-  val quorumElectionBackoffMs = 
getInt(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG)
-  val quorumLingerMs = getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG)
-  val quorumRequestTimeoutMs = 
getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG)
-  val quorumRetryBackoffMs = 
getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG)
-
   /** Internal Configurations **/
   val unstableApiVersionsEnabled = 
getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG)
   val unstableFeatureVersionsEnabled = 
getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG)
@@ -878,9 +871,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     val advertisedBrokerListenerNames = 
effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
 
     // validate KRaft-related configs
-    val voterIds = QuorumConfig.parseVoterIds(quorumVoters)
+    val voterIds = QuorumConfig.parseVoterIds(quorumConfig.voters)
     def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = {
-      if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
+      if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) {
         throw new ConfigException(
           s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either 
${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
           |contain the set of bootstrap controllers or 
${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
@@ -889,7 +882,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       }
     }
     def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = {
-      if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
+      if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) {
         throw new ConfigException(
           s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either 
${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
           |contain the set of bootstrap controllers or 
${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala 
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index dbc6f763eb9..7f56600f8b0 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -70,8 +70,8 @@ class KafkaRaftServer(
     metaPropsEnsemble,
     time,
     metrics,
-    
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
-    QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
+    
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
+    QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
     new StandardFaultHandlerFactory(),
   )
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0a9edc32c6b..1ebf6c7e899 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -429,7 +429,7 @@ class KafkaServer(
           logger.info("Successfully deleted local metadata log. It will be 
re-created.")
 
           // If the ZK broker is in migration mode, start up a RaftManager to 
learn about the new KRaft controller
-          val quorumVoters = 
QuorumConfig.parseVoterConnections(config.quorumVoters)
+          val quorumVoters = 
QuorumConfig.parseVoterConnections(config.quorumConfig.voters)
           raftManager = new KafkaRaftManager[ApiMessageAndVersion](
             metaPropsEnsemble.clusterId().get(),
             config,
@@ -442,7 +442,7 @@ class KafkaServer(
             metrics,
             threadNamePrefix,
             CompletableFuture.completedFuture(quorumVoters),
-            QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
+            
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
             // Endpoint information is only needed for KRaft controllers 
(voters). ZK brokers
             // (observers) can never be KRaft controllers
             Endpoints.empty(),
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 070a09f52d6..5cc4c249eb3 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -104,8 +104,8 @@ class TestRaftServer(
       time,
       metrics,
       Some(threadNamePrefix),
-      
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
-      QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
+      
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
+      QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
       endpoints,
       new ProcessTerminatingFaultHandler.Builder().build()
     )
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala 
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 13a75007417..09b35318818 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -128,8 +128,8 @@ class RaftManagerTest {
       Time.SYSTEM,
       new Metrics(Time.SYSTEM),
       Option.empty,
-      
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
-      QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
+      
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
+      QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
       endpoints,
       mock(classOf[FaultHandler])
     )
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 66550cf140b..c2fddfc33a5 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1462,7 +1462,7 @@ class KafkaConfigTest {
   private def assertValidQuorumVoters(expectedVoters: util.Map[Integer, 
InetSocketAddress], value: String): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
     props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, value)
-    val addresses = 
QuorumConfig.parseVoterConnections(KafkaConfig.fromProps(props).quorumVoters)
+    val addresses = 
QuorumConfig.parseVoterConnections(KafkaConfig.fromProps(props).quorumConfig.voters)
     assertEquals(expectedVoters, addresses)
   }
 
@@ -1477,7 +1477,7 @@ class KafkaConfigTest {
     props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"kafka1:9092,kafka2:9092")
 
     val addresses = QuorumConfig.parseBootstrapServers(
-      KafkaConfig.fromProps(props).quorumBootstrapServers
+      KafkaConfig.fromProps(props).quorumConfig.bootstrapServers
     )
 
     assertEquals(expected, addresses)
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java 
b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
index 05b27ada396..1a7fff83ee6 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
@@ -109,6 +109,8 @@ public class QuorumConfig {
             .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, 
DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC)
             .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, 
DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC);
 
+    private final List<String> voters;
+    private final List<String> bootstrapServers;
     private final int requestTimeoutMs;
     private final int retryBackoffMs;
     private final int electionTimeoutMs;
@@ -117,30 +119,22 @@ public class QuorumConfig {
     private final int appendLingerMs;
 
     public QuorumConfig(AbstractConfig abstractConfig) {
-        this(
-            abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG),
-            abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG),
-            abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG),
-            abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG),
-            abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG),
-            abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG)
-        );
+        this.voters = abstractConfig.getList(QUORUM_VOTERS_CONFIG);
+        this.bootstrapServers = 
abstractConfig.getList(QUORUM_BOOTSTRAP_SERVERS_CONFIG);
+        this.requestTimeoutMs = 
abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
+        this.retryBackoffMs = 
abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
+        this.electionTimeoutMs = 
abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
+        this.electionBackoffMaxMs = 
abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
+        this.fetchTimeoutMs = 
abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
+        this.appendLingerMs = abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG);
     }
 
-    public QuorumConfig(
-        int requestTimeoutMs,
-        int retryBackoffMs,
-        int electionTimeoutMs,
-        int electionBackoffMaxMs,
-        int fetchTimeoutMs,
-        int appendLingerMs
-    ) {
-        this.requestTimeoutMs = requestTimeoutMs;
-        this.retryBackoffMs = retryBackoffMs;
-        this.electionTimeoutMs = electionTimeoutMs;
-        this.electionBackoffMaxMs = electionBackoffMaxMs;
-        this.fetchTimeoutMs = fetchTimeoutMs;
-        this.appendLingerMs = appendLingerMs;
+    public List<String> voters() {
+        return voters;
+    }
+
+    public List<String> bootstrapServers() {
+        return bootstrapServers;
     }
 
     public int requestTimeoutMs() {
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 0f1cfd6f3c4..f9f1c9e0f95 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.message.AddRaftVoterRequestData;
@@ -383,14 +384,14 @@ public final class RaftClientTestContext {
                     Endpoints.empty() :
                 this.localListeners;
 
-            QuorumConfig quorumConfig = new QuorumConfig(
-                requestTimeoutMs,
-                RETRY_BACKOFF_MS,
-                electionTimeoutMs,
-                ELECTION_BACKOFF_MAX_MS,
-                FETCH_TIMEOUT_MS,
-                appendLingerMs
-            );
+            Map<String, Integer> configMap = new HashMap<>();
+            configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeoutMs);
+            configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, 
RETRY_BACKOFF_MS);
+            configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, 
electionTimeoutMs);
+            configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, 
ELECTION_BACKOFF_MAX_MS);
+            configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, 
FETCH_TIMEOUT_MS);
+            configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, 
appendLingerMs);
+            QuorumConfig quorumConfig = new QuorumConfig(new 
AbstractConfig(QuorumConfig.CONFIG_DEF, configMap));
 
             List<InetSocketAddress> computedBootstrapServers = 
bootstrapServers.orElseGet(() -> {
                 if (isStartingVotersStatic) {
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index cd8743635ca..7ecd4dc4e22 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.raft;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.metrics.Metrics;
@@ -763,14 +764,14 @@ public class RaftEventSimulationTest {
                 .stream()
                 .collect(Collectors.toMap(Node::id, Cluster::nodeAddress));
 
-            QuorumConfig quorumConfig = new QuorumConfig(
-                REQUEST_TIMEOUT_MS,
-                RETRY_BACKOFF_MS,
-                ELECTION_TIMEOUT_MS,
-                ELECTION_JITTER_MS,
-                FETCH_TIMEOUT_MS,
-                LINGER_MS
-            );
+            Map<String, Integer> configMap = new HashMap<>();
+            configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, 
REQUEST_TIMEOUT_MS);
+            configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, 
RETRY_BACKOFF_MS);
+            configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, 
ELECTION_TIMEOUT_MS);
+            configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, 
ELECTION_JITTER_MS);
+            configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, 
FETCH_TIMEOUT_MS);
+            configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, LINGER_MS);
+            QuorumConfig quorumConfig = new QuorumConfig(new 
AbstractConfig(QuorumConfig.CONFIG_DEF, configMap));
             Metrics metrics = new Metrics(time);
 
             persistentState.log.reopen();

Reply via email to