This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6acc220e03a KAFKA-15773 Group protocol configuration should be 
validated (#16543)
6acc220e03a is described below

commit 6acc220e03a3d38d1ceb26926f072db56809ef1b
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Jul 18 18:31:36 2024 +0800

    KAFKA-15773 Group protocol configuration should be validated (#16543)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/clients/consumer/ConsumerConfig.java     |  7 +++++++
 .../org/apache/kafka/clients/consumer/ConsumerConfigTest.java | 11 +++++++++++
 .../clients/consumer/internals/AsyncKafkaConsumerTest.java    |  9 ++++-----
 .../kafka/tools/consumer/group/DescribeConsumerGroupTest.java |  2 +-
 4 files changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index c4c10c404b4..c3a0bf57900 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -674,6 +674,7 @@ public class ConsumerConfig extends AbstractConfig {
         Map<String, Object> refinedConfigs = 
CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
         maybeOverrideClientId(refinedConfigs);
         maybeOverrideEnableAutoCommit(refinedConfigs);
+        checkGroupRemoteAssignor();
         return refinedConfigs;
     }
 
@@ -720,6 +721,12 @@ public class ConsumerConfig extends AbstractConfig {
         }
     }
 
+    private void checkGroupRemoteAssignor() {
+        if 
(getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CLASSIC.name())
 && getString(GROUP_REMOTE_ASSIGNOR_CONFIG) != null && 
!getString(GROUP_REMOTE_ASSIGNOR_CONFIG).isEmpty()) {
+            throw new ConfigException(GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot 
be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name());
+        }
+    }
+
     public ConsumerConfig(Properties props) {
         super(CONFIG, props);
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index e4cc699484e..99c45f05c15 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -193,6 +193,17 @@ public class ConsumerConfigTest {
         assertEquals(remoteAssignorName, 
consumerConfig.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
     }
 
+    @Test
+    public void testRemoteAssignorWithClassicGroupProtocol() {
+        String remoteAssignorName = "SomeAssignor";
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        configs.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, 
remoteAssignorName);
+        ConfigException exception = assertThrows(ConfigException.class, () -> 
new ConsumerConfig(configs));
+        
assertTrue(exception.getMessage().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG
 + " cannot be set when " + ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + 
GroupProtocol.CLASSIC.name()));
+    }
+
     @Test
     public void testDefaultMetadataRecoveryStrategy() {
         Map<String, Object> configs = new HashMap<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 7ef0b28108e..f1d227fae4e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -55,6 +55,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
@@ -1758,6 +1759,7 @@ public class AsyncKafkaConsumerTest {
     public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() {
         final Properties props = requiredConsumerConfig();
         props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
+        props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
         final ConsumerConfig config = new ConsumerConfig(props);
         consumer = newConsumer(config);
 
@@ -1765,15 +1767,12 @@ public class AsyncKafkaConsumerTest {
     }
 
     @Test
-    public void testGroupRemoteAssignorUnusedInGenericProtocol() {
+    public void testGroupRemoteAssignorInClassicProtocol() {
         final Properties props = requiredConsumerConfig();
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
         props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT));
         props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
-        final ConsumerConfig config = new ConsumerConfig(props);
-        consumer = newConsumer(config);
-
-        
assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
+        assertThrows(ConfigException.class, () -> new ConsumerConfig(props));
     }
 
     @Test
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
index 852a6434a1c..9883dc471eb 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
@@ -338,7 +338,7 @@ public class DescribeConsumerGroupTest {
             createTopic(topic);
 
             // run one consumer in the group consuming from a single-partition 
topic
-            try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, group, topic, 
Collections.singletonMap(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "range"));
+            try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, group, topic, 
Collections.singletonMap(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, 
groupProtocol == GroupProtocol.CONSUMER ? "range" : ""));
                  ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", group})
             ) {
                 TestUtils.waitForCondition(() -> {

Reply via email to