This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a2653281c2d KAFKA-18837: Ensure controller quorum timeouts and
backoffs are at least 0 (#18998)
a2653281c2d is described below
commit a2653281c2d8b74bae138aa915d239fe5a0ecf99
Author: TaiJuWu <[email protected]>
AuthorDate: Wed Mar 12 23:43:38 2025 +0800
KAFKA-18837: Ensure controller quorum timeouts and backoffs are at least 0
(#18998)
Add validators to the QuorumConfig's configuration definitions.
Reviewers: Mickael Maison <[email protected]>, Ken Huang
<[email protected]>, TengYao Chi <[email protected]>
---
.../java/org/apache/kafka/raft/QuorumConfig.java | 13 ++---
.../org/apache/kafka/raft/QuorumConfigTest.java | 55 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 6 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
index 1a7fff83ee6..0dece254184 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
@@ -102,12 +103,12 @@ public class QuorumConfig {
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new
ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC)
.define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST,
DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new
ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC)
- .define(QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT,
DEFAULT_QUORUM_ELECTION_TIMEOUT_MS, null, HIGH, QUORUM_ELECTION_TIMEOUT_MS_DOC)
- .define(QUORUM_FETCH_TIMEOUT_MS_CONFIG, INT,
DEFAULT_QUORUM_FETCH_TIMEOUT_MS, null, HIGH, QUORUM_FETCH_TIMEOUT_MS_DOC)
- .define(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT,
DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS, null, HIGH,
QUORUM_ELECTION_BACKOFF_MAX_MS_DOC)
- .define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS,
null, MEDIUM, QUORUM_LINGER_MS_DOC)
- .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT,
DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC)
- .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT,
DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC);
+ .define(QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT,
DEFAULT_QUORUM_ELECTION_TIMEOUT_MS, atLeast(0), HIGH,
QUORUM_ELECTION_TIMEOUT_MS_DOC)
+ .define(QUORUM_FETCH_TIMEOUT_MS_CONFIG, INT,
DEFAULT_QUORUM_FETCH_TIMEOUT_MS, atLeast(0), HIGH, QUORUM_FETCH_TIMEOUT_MS_DOC)
+ .define(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT,
DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS, atLeast(0), HIGH,
QUORUM_ELECTION_BACKOFF_MAX_MS_DOC)
+ .define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS,
atLeast(0), MEDIUM, QUORUM_LINGER_MS_DOC)
+ .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT,
DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, atLeast(0), MEDIUM,
QUORUM_REQUEST_TIMEOUT_MS_DOC)
+ .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT,
DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(0), LOW, QUORUM_RETRY_BACKOFF_MS_DOC);
private final List<String> voters;
private final List<String> bootstrapServers;
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
new file mode 100644
index 00000000000..2197f3766c2
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class QuorumConfigTest {
+ @Test
+ public void testIllegalConfig() {
+
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG,
"-1"));
+
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, "-1"));
+
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG,
"-1"));
+ assertInvalidConfig(Map.of(QuorumConfig.QUORUM_LINGER_MS_CONFIG,
"-1"));
+
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG,
"-1"));
+
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "-1"));
+ }
+
+ private void assertInvalidConfig(Map<String, Object> overrideConfig) {
+ Map<String, Object> props = new HashMap<>();
+ props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9092");
+ props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
+ props.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, "10");
+ props.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, "10");
+ props.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, "10");
+ props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, "10");
+ props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, "10");
+ props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "10");
+
+ props.putAll(overrideConfig);
+
+ assertThrows(ConfigException.class, () ->
QuorumConfig.CONFIG_DEF.parse(props));
+ }
+
+}