This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new c7c3e609c00 Back-port KAFKA-16230 to 3.7 branch (#16951)
c7c3e609c00 is described below

commit c7c3e609c009b05c29a1032a21103411c5ea1e10
Author: Kirk True <[email protected]>
AuthorDate: Tue Sep 3 12:34:35 2024 -0700

    Back-port KAFKA-16230 to 3.7 branch (#16951)
    
    Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans 
<[email protected]>
---
 tests/kafkatest/services/verifiable_consumer.py    | 25 ++++++++++++---
 .../org/apache/kafka/tools/VerifiableConsumer.java | 37 ++++++++++++++++++++--
 2 files changed, 56 insertions(+), 6 deletions(-)

diff --git a/tests/kafkatest/services/verifiable_consumer.py 
b/tests/kafkatest/services/verifiable_consumer.py
index 93d9446fb9b..e1155c16aae 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -21,7 +21,7 @@ from ducktape.services.background_thread import 
BackgroundThreadService
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.kafka import TopicPartition
 from kafkatest.services.verifiable_client import VerifiableClientMixin
-from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_0_10_0_0
+from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_0_10_0_0
 
 
 class ConsumerState:
@@ -167,7 +167,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
 
     def __init__(self, context, num_nodes, kafka, topic, group_id,
                  static_membership=False, max_messages=-1, 
session_timeout_sec=30, enable_autocommit=False,
-                 assignment_strategy=None,
+                 assignment_strategy=None, group_protocol=None, 
group_remote_assignor=None,
                  version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", 
jaas_override_variables=None,
                  on_record_consumed=None, reset_policy="earliest", 
verify_offsets=True):
         """
@@ -184,6 +184,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         self.session_timeout_sec = session_timeout_sec
         self.enable_autocommit = enable_autocommit
         self.assignment_strategy = assignment_strategy
+        self.group_protocol = group_protocol
+        self.group_remote_assignor = group_remote_assignor
         self.prop_file = ""
         self.stop_timeout_sec = stop_timeout_sec
         self.on_record_consumed = on_record_consumed
@@ -306,8 +308,20 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
             # if `None` is passed as the argument value
             cmd += " --group-instance-id None"
 
-        if self.assignment_strategy:
-            cmd += " --assignment-strategy %s" % self.assignment_strategy
+        # 3.7.0 includes support for KIP-848 which introduced a new 
implementation of the consumer group protocol.
+        # The two implementations use slightly different configuration, hence 
these arguments are conditional.
+        #
+        # See the Java class/method VerifiableConsumer.createFromArgs() for 
how the command line arguments are
+        # parsed and used as configuration in the runner.
+        if node.version >= V_3_7_0 and 
self.is_consumer_group_protocol_enabled():
+            cmd += " --group-protocol %s" % self.group_protocol
+
+            if self.group_remote_assignor:
+                cmd += " --group-remote-assignor %s" % 
self.group_remote_assignor
+        else:
+            # Either we're an older consumer version or we're using the old 
consumer group protocol.
+            if self.assignment_strategy:
+                cmd += " --assignment-strategy %s" % self.assignment_strategy
 
         if self.enable_autocommit:
             cmd += " --enable-autocommit "
@@ -416,3 +430,6 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         with self.lock:
             return [handler.node for handler in self.event_handlers.values()
                     if handler.state != ConsumerState.Dead]
+
+    def is_consumer_group_protocol_enabled(self):
+        return self.group_protocol and self.group_protocol.upper() == 
"CONSUMER"
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index e930580d3da..cf36294d66d 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -529,6 +530,24 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
                 .metavar("TOPIC")
                 .help("Consumes messages from this topic.");
 
+        parser.addArgument("--group-protocol")
+                .action(store())
+                .required(false)
+                .type(String.class)
+                .setDefault(ConsumerConfig.DEFAULT_GROUP_PROTOCOL)
+                .metavar("GROUP_PROTOCOL")
+                .dest("groupProtocol")
+                .help(String.format("Group protocol (must be one of %s)", 
Utils.join(GroupProtocol.values(), ", ")));
+
+        parser.addArgument("--group-remote-assignor")
+                .action(store())
+                .required(false)
+                .type(String.class)
+                .setDefault(ConsumerConfig.DEFAULT_GROUP_REMOTE_ASSIGNOR)
+                .metavar("GROUP_REMOTE_ASSIGNOR")
+                .dest("groupRemoteAssignor")
+                .help(String.format("Group remote assignor; only used if the 
group protocol is %s", GroupProtocol.CONSUMER.name()));
+
         parser.addArgument("--group-id")
                 .action(store())
                 .required(true)
@@ -590,7 +609,7 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
                 .setDefault(RangeAssignor.class.getName())
                 .type(String.class)
                 .dest("assignmentStrategy")
-                .help("Set assignment strategy (e.g. " + 
RoundRobinAssignor.class.getName() + ")");
+                .help(String.format("Set assignment strategy (e.g. %s); only 
used if the group protocol is %s", RoundRobinAssignor.class.getName(), 
GroupProtocol.CLASSIC.name()));
 
         parser.addArgument("--consumer.config")
                 .action(store())
@@ -618,6 +637,21 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
             }
         }
 
+        String groupProtocol = res.getString("groupProtocol");
+
+        // 3.7.0 includes support for KIP-848 which introduced a new 
implementation of the consumer group protocol.
+        // The two implementations use slightly different configuration, hence 
these arguments are conditional.
+        //
+        // See the Python class/method VerifiableConsumer.start_cmd() in 
verifiable_consumer.py for how the
+        // command line arguments are passed in by the system test framework.
+        if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
+            consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol);
+            consumerProps.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, 
res.getString("groupRemoteAssignor"));
+        } else {
+            // This means we're using the old consumer group protocol.
+            
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
res.getString("assignmentStrategy"));
+        }
+
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
res.getString("groupId"));
 
         String groupInstanceId = res.getString("groupInstanceId");
@@ -640,7 +674,6 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
useAutoCommit);
         consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
res.getString("resetPolicy"));
         consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
Integer.toString(res.getInt("sessionTimeout")));
-        consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
res.getString("assignmentStrategy"));
 
         StringDeserializer deserializer = new StringDeserializer();
         KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(consumerProps, deserializer, deserializer);

Reply via email to