dajac commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1376703936


##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
     private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+    /**
+     * <code>group.protocol</code>
+     */
+    public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+    public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT);
+    public static final String GROUP_PROTOCOL_DOC = "The group protocol 
consumer should use.  We currently " +

Review Comment:
   nit: Let's remove the double spaces after the dots.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##########
@@ -158,4 +158,27 @@ public void testCaseInsensitiveSecurityProtocol() {
         final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
         assertEquals(saslSslLowerCase, 
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
+
+    @Test
+    public void testDefaultConsumerGroupConfig() {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+        assertEquals("generic", 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+        assertEquals(null, 
consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+    }
+
+    @Test
+    public void testValidConsumerGroupConfig() {
+        String remoteAssignorName = 
"org.apache.kafka.clients.group.someAssignor";
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
+        configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, 
"org.apache.kafka.clients.group.someAssignor");

Review Comment:
   nit: I suppose that we could reuse `remoteAssignorName` here?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##########
@@ -158,4 +158,27 @@ public void testCaseInsensitiveSecurityProtocol() {
         final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
         assertEquals(saslSslLowerCase, 
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
+
+    @Test
+    public void testDefaultConsumerGroupConfig() {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+        assertEquals("generic", 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+        assertEquals(null, 
consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+    }
+
+    @Test
+    public void testValidConsumerGroupConfig() {
+        String remoteAssignorName = 
"org.apache.kafka.clients.group.someAssignor";
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
+        configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, 
"org.apache.kafka.clients.group.someAssignor");
+        final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+        assertEquals("consumer", 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+        assertEquals(remoteAssignorName, 
consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+    }

Review Comment:
   Should we add a test to ensure that `GROUP_PROTOCOL_CONFIG` can only accept 
`consumer` and `generic`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to