This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6acc220e03a KAFKA-15773 Group protocol configuration should be
validated (#16543)
6acc220e03a is described below
commit 6acc220e03a3d38d1ceb26926f072db56809ef1b
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Jul 18 18:31:36 2024 +0800
KAFKA-15773 Group protocol configuration should be validated (#16543)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/clients/consumer/ConsumerConfig.java | 7 +++++++
.../org/apache/kafka/clients/consumer/ConsumerConfigTest.java | 11 +++++++++++
.../clients/consumer/internals/AsyncKafkaConsumerTest.java | 9 ++++-----
.../kafka/tools/consumer/group/DescribeConsumerGroupTest.java | 2 +-
4 files changed, 23 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index c4c10c404b4..c3a0bf57900 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -674,6 +674,7 @@ public class ConsumerConfig extends AbstractConfig {
Map<String, Object> refinedConfigs =
CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
maybeOverrideClientId(refinedConfigs);
maybeOverrideEnableAutoCommit(refinedConfigs);
+ checkGroupRemoteAssignor();
return refinedConfigs;
}
@@ -720,6 +721,12 @@ public class ConsumerConfig extends AbstractConfig {
}
}
+ private void checkGroupRemoteAssignor() {
+ if
(getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CLASSIC.name())
&& getString(GROUP_REMOTE_ASSIGNOR_CONFIG) != null &&
!getString(GROUP_REMOTE_ASSIGNOR_CONFIG).isEmpty()) {
+ throw new ConfigException(GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot
be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name());
+ }
+ }
+
public ConsumerConfig(Properties props) {
super(CONFIG, props);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index e4cc699484e..99c45f05c15 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -193,6 +193,17 @@ public class ConsumerConfigTest {
assertEquals(remoteAssignorName,
consumerConfig.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}
+ @Test
+ public void testRemoteAssignorWithClassicGroupProtocol() {
+ String remoteAssignorName = "SomeAssignor";
+ final Map<String, Object> configs = new HashMap<>();
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializerClass);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializerClass);
+ configs.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG,
remoteAssignorName);
+ ConfigException exception = assertThrows(ConfigException.class, () ->
new ConsumerConfig(configs));
+
assertTrue(exception.getMessage().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG
+ " cannot be set when " + ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" +
GroupProtocol.CLASSIC.name()));
+ }
+
@Test
public void testDefaultMetadataRecoveryStrategy() {
Map<String, Object> configs = new HashMap<>();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 7ef0b28108e..f1d227fae4e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -55,6 +55,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
@@ -1758,6 +1759,7 @@ public class AsyncKafkaConsumerTest {
public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() {
final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
+ props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
@@ -1765,15 +1767,12 @@ public class AsyncKafkaConsumerTest {
}
@Test
- public void testGroupRemoteAssignorUnusedInGenericProtocol() {
+ public void testGroupRemoteAssignorInClassicProtocol() {
final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
- final ConsumerConfig config = new ConsumerConfig(props);
- consumer = newConsumer(config);
-
-
assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
+ assertThrows(ConfigException.class, () -> new ConsumerConfig(props));
}
@Test
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
index 852a6434a1c..9883dc471eb 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
@@ -338,7 +338,7 @@ public class DescribeConsumerGroupTest {
createTopic(topic);
// run one consumer in the group consuming from a single-partition
topic
- try (AutoCloseable protocolConsumerGroupExecutor =
consumerGroupClosable(groupProtocol, group, topic,
Collections.singletonMap(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "range"));
+ try (AutoCloseable protocolConsumerGroupExecutor =
consumerGroupClosable(groupProtocol, group, topic,
Collections.singletonMap(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG,
groupProtocol == GroupProtocol.CONSUMER ? "range" : ""));
ConsumerGroupCommand.ConsumerGroupService service =
consumerGroupService(new String[]{"--bootstrap-server",
clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {