This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2637d12e2f4 KAFKA-16908 Refactor `QuorumConfig` with `AbstractConfig`
(#17231)
2637d12e2f4 is described below
commit 2637d12e2f4cbd8d46c1ed108a9a3b4e12c0c924
Author: xijiu <[email protected]>
AuthorDate: Thu Sep 26 04:06:42 2024 +0800
KAFKA-16908 Refactor `QuorumConfig` with `AbstractConfig` (#17231)
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/raft/RaftManager.scala | 4 +--
core/src/main/scala/kafka/server/KafkaConfig.scala | 19 ++++-------
.../main/scala/kafka/server/KafkaRaftServer.scala | 4 +--
core/src/main/scala/kafka/server/KafkaServer.scala | 4 +--
.../main/scala/kafka/tools/TestRaftServer.scala | 4 +--
.../scala/unit/kafka/raft/RaftManagerTest.scala | 4 +--
.../scala/unit/kafka/server/KafkaConfigTest.scala | 4 +--
.../java/org/apache/kafka/raft/QuorumConfig.java | 38 +++++++++-------------
.../apache/kafka/raft/RaftClientTestContext.java | 17 +++++-----
.../apache/kafka/raft/RaftEventSimulationTest.java | 17 +++++-----
10 files changed, 52 insertions(+), 63 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala
b/core/src/main/scala/kafka/raft/RaftManager.scala
index d248443e611..bfde57c6706 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -247,7 +247,7 @@ class KafkaRaftManager[T](
private def buildNetworkChannel(): KafkaNetworkChannel = {
val (listenerName, netClient) = buildNetworkClient()
- new KafkaNetworkChannel(time, listenerName, netClient,
config.quorumRequestTimeoutMs, threadNamePrefix)
+ new KafkaNetworkChannel(time, listenerName, netClient,
config.quorumConfig.requestTimeoutMs, threadNamePrefix)
}
private def createDataDir(): File = {
@@ -313,7 +313,7 @@ class KafkaRaftManager[T](
reconnectBackoffMsMs,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.socketReceiveBufferBytes,
- config.quorumRequestTimeoutMs,
+ config.quorumConfig.requestTimeoutMs,
config.connectionSetupTimeoutMs,
config.connectionSetupTimeoutMaxMs,
time,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 454308990c8..237da5c8e3b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -231,6 +231,9 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig
+ private val _quorumConfig = new QuorumConfig(this)
+ def quorumConfig: QuorumConfig = _quorumConfig
+
private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this)
def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig
@@ -664,16 +667,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
def lz4CompressionLevel = getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG)
def zstdCompressionLevel =
getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG)
- /** ********* Raft Quorum Configuration *********/
- val quorumVoters = getList(QuorumConfig.QUORUM_VOTERS_CONFIG)
- val quorumBootstrapServers =
getList(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG)
- val quorumElectionTimeoutMs =
getInt(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG)
- val quorumFetchTimeoutMs =
getInt(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG)
- val quorumElectionBackoffMs =
getInt(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG)
- val quorumLingerMs = getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG)
- val quorumRequestTimeoutMs =
getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG)
- val quorumRetryBackoffMs =
getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG)
-
/** Internal Configurations **/
val unstableApiVersionsEnabled =
getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG)
val unstableFeatureVersionsEnabled =
getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG)
@@ -878,9 +871,9 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
val advertisedBrokerListenerNames =
effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
// validate KRaft-related configs
- val voterIds = QuorumConfig.parseVoterIds(quorumVoters)
+ val voterIds = QuorumConfig.parseVoterIds(quorumConfig.voters)
def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = {
- if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
+ if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) {
throw new ConfigException(
s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either
${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
|contain the set of bootstrap controllers or
${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
@@ -889,7 +882,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
}
def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = {
- if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
+ if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) {
throw new ConfigException(
s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either
${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
|contain the set of bootstrap controllers or
${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index dbc6f763eb9..7f56600f8b0 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -70,8 +70,8 @@ class KafkaRaftServer(
metaPropsEnsemble,
time,
metrics,
-
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
- QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
+
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
+ QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
new StandardFaultHandlerFactory(),
)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0a9edc32c6b..1ebf6c7e899 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -429,7 +429,7 @@ class KafkaServer(
logger.info("Successfully deleted local metadata log. It will be
re-created.")
// If the ZK broker is in migration mode, start up a RaftManager to
learn about the new KRaft controller
- val quorumVoters =
QuorumConfig.parseVoterConnections(config.quorumVoters)
+ val quorumVoters =
QuorumConfig.parseVoterConnections(config.quorumConfig.voters)
raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaPropsEnsemble.clusterId().get(),
config,
@@ -442,7 +442,7 @@ class KafkaServer(
metrics,
threadNamePrefix,
CompletableFuture.completedFuture(quorumVoters),
- QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
+
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
// Endpoint information is only needed for KRaft controllers
(voters). ZK brokers
// (observers) can never be KRaft controllers
Endpoints.empty(),
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 070a09f52d6..5cc4c249eb3 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -104,8 +104,8 @@ class TestRaftServer(
time,
metrics,
Some(threadNamePrefix),
-
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
- QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
+
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
+ QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
endpoints,
new ProcessTerminatingFaultHandler.Builder().build()
)
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 13a75007417..09b35318818 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -128,8 +128,8 @@ class RaftManagerTest {
Time.SYSTEM,
new Metrics(Time.SYSTEM),
Option.empty,
-
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
- QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
+
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
+ QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
endpoints,
mock(classOf[FaultHandler])
)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 66550cf140b..c2fddfc33a5 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1462,7 +1462,7 @@ class KafkaConfigTest {
private def assertValidQuorumVoters(expectedVoters: util.Map[Integer,
InetSocketAddress], value: String): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, value)
- val addresses =
QuorumConfig.parseVoterConnections(KafkaConfig.fromProps(props).quorumVoters)
+ val addresses =
QuorumConfig.parseVoterConnections(KafkaConfig.fromProps(props).quorumConfig.voters)
assertEquals(expectedVoters, addresses)
}
@@ -1477,7 +1477,7 @@ class KafkaConfigTest {
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG,
"kafka1:9092,kafka2:9092")
val addresses = QuorumConfig.parseBootstrapServers(
- KafkaConfig.fromProps(props).quorumBootstrapServers
+ KafkaConfig.fromProps(props).quorumConfig.bootstrapServers
)
assertEquals(expected, addresses)
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
index 05b27ada396..1a7fff83ee6 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
@@ -109,6 +109,8 @@ public class QuorumConfig {
.define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT,
DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC)
.define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT,
DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC);
+ private final List<String> voters;
+ private final List<String> bootstrapServers;
private final int requestTimeoutMs;
private final int retryBackoffMs;
private final int electionTimeoutMs;
@@ -117,30 +119,22 @@ public class QuorumConfig {
private final int appendLingerMs;
public QuorumConfig(AbstractConfig abstractConfig) {
- this(
- abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG),
- abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG),
- abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG),
- abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG),
- abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG),
- abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG)
- );
+ this.voters = abstractConfig.getList(QUORUM_VOTERS_CONFIG);
+ this.bootstrapServers =
abstractConfig.getList(QUORUM_BOOTSTRAP_SERVERS_CONFIG);
+ this.requestTimeoutMs =
abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
+ this.retryBackoffMs =
abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
+ this.electionTimeoutMs =
abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
+ this.electionBackoffMaxMs =
abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
+ this.fetchTimeoutMs =
abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
+ this.appendLingerMs = abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG);
}
- public QuorumConfig(
- int requestTimeoutMs,
- int retryBackoffMs,
- int electionTimeoutMs,
- int electionBackoffMaxMs,
- int fetchTimeoutMs,
- int appendLingerMs
- ) {
- this.requestTimeoutMs = requestTimeoutMs;
- this.retryBackoffMs = retryBackoffMs;
- this.electionTimeoutMs = electionTimeoutMs;
- this.electionBackoffMaxMs = electionBackoffMaxMs;
- this.fetchTimeoutMs = fetchTimeoutMs;
- this.appendLingerMs = appendLingerMs;
+ public List<String> voters() {
+ return voters;
+ }
+
+ public List<String> bootstrapServers() {
+ return bootstrapServers;
}
public int requestTimeoutMs() {
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 0f1cfd6f3c4..f9f1c9e0f95 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
@@ -383,14 +384,14 @@ public final class RaftClientTestContext {
Endpoints.empty() :
this.localListeners;
- QuorumConfig quorumConfig = new QuorumConfig(
- requestTimeoutMs,
- RETRY_BACKOFF_MS,
- electionTimeoutMs,
- ELECTION_BACKOFF_MAX_MS,
- FETCH_TIMEOUT_MS,
- appendLingerMs
- );
+ Map<String, Integer> configMap = new HashMap<>();
+ configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG,
requestTimeoutMs);
+ configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG,
RETRY_BACKOFF_MS);
+ configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG,
electionTimeoutMs);
+ configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG,
ELECTION_BACKOFF_MAX_MS);
+ configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG,
FETCH_TIMEOUT_MS);
+ configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG,
appendLingerMs);
+ QuorumConfig quorumConfig = new QuorumConfig(new
AbstractConfig(QuorumConfig.CONFIG_DEF, configMap));
List<InetSocketAddress> computedBootstrapServers =
bootstrapServers.orElseGet(() -> {
if (isStartingVotersStatic) {
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index cd8743635ca..7ecd4dc4e22 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.raft;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.metrics.Metrics;
@@ -763,14 +764,14 @@ public class RaftEventSimulationTest {
.stream()
.collect(Collectors.toMap(Node::id, Cluster::nodeAddress));
- QuorumConfig quorumConfig = new QuorumConfig(
- REQUEST_TIMEOUT_MS,
- RETRY_BACKOFF_MS,
- ELECTION_TIMEOUT_MS,
- ELECTION_JITTER_MS,
- FETCH_TIMEOUT_MS,
- LINGER_MS
- );
+ Map<String, Integer> configMap = new HashMap<>();
+ configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG,
REQUEST_TIMEOUT_MS);
+ configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG,
RETRY_BACKOFF_MS);
+ configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG,
ELECTION_TIMEOUT_MS);
+ configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG,
ELECTION_JITTER_MS);
+ configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG,
FETCH_TIMEOUT_MS);
+ configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, LINGER_MS);
+ QuorumConfig quorumConfig = new QuorumConfig(new
AbstractConfig(QuorumConfig.CONFIG_DEF, configMap));
Metrics metrics = new Metrics(time);
persistentState.log.reopen();