This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d2ad418cfd0 KAFKA-18156 VerifiableConsumer should ignore 
"--session-timeout" when using CONSUMER protocol (#18036)
d2ad418cfd0 is described below

commit d2ad418cfd05278e7fbff9a6db7088c68050e1f9
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Dec 11 21:12:46 2024 +0800

    KAFKA-18156 VerifiableConsumer should ignore "--session-timeout" when using 
CONSUMER protocol (#18036)
    
    Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/clients/CommonClientConfigs.java   |  3 ++-
 tests/kafkatest/services/verifiable_client.py       |  2 +-
 tests/kafkatest/services/verifiable_consumer.py     | 14 +++++++-------
 .../client/consumer_protocol_migration_test.py      |  2 +-
 tests/kafkatest/tests/client/consumer_test.py       | 21 +++++----------------
 tests/kafkatest/tests/client/pluggable_test.py      |  2 +-
 tests/kafkatest/tests/verifiable_consumer_test.py   | 11 +++++------
 .../org/apache/kafka/tools/VerifiableConsumer.java  | 11 +++++++----
 8 files changed, 29 insertions(+), 37 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index de01396c8c7..aa3b5c9d628 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -206,7 +206,8 @@ public class CommonClientConfigs {
                                                         + "to the broker. If 
no heartbeats are received by the broker before the expiration of this session 
timeout, "
                                                         + "then the broker 
will remove this client from the group and initiate a rebalance. Note that the 
value "
                                                         + "must be in the 
allowable range as configured in the broker configuration by 
<code>group.min.session.timeout.ms</code> "
-                                                        + "and 
<code>group.max.session.timeout.ms</code>.";
+                                                        + "and 
<code>group.max.session.timeout.ms</code>. Note that this configuration is not 
supported when <code>group.protocol</code> "
+                                                        + "is set to 
\"consumer\".";
 
     public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
"heartbeat.interval.ms";
     public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time 
between heartbeats to the consumer "
diff --git a/tests/kafkatest/services/verifiable_client.py 
b/tests/kafkatest/services/verifiable_client.py
index 4971136a64e..4a3ea5e17da 100644
--- a/tests/kafkatest/services/verifiable_client.py
+++ b/tests/kafkatest/services/verifiable_client.py
@@ -70,7 +70,7 @@ Command line arguments:
  * `--group-id <group-id>`
  * `--topic <topic>`
  * `--broker-list <brokers>`
- * `--session-timeout <n>`
+ * `--session-timeout <n>` - note that this configuration is not supported 
when group protocol is consumer
  * `--enable-autocommit`
  * `--max-messages <n>`
  * `--assignment-strategy <s>`
diff --git a/tests/kafkatest/services/verifiable_consumer.py 
b/tests/kafkatest/services/verifiable_consumer.py
index 7e81ca1f7ce..1dc1e4bd4ec 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -231,7 +231,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         }
 
     def __init__(self, context, num_nodes, kafka, topic, group_id,
-                 static_membership=False, max_messages=-1, 
session_timeout_sec=30, enable_autocommit=False,
+                 static_membership=False, max_messages=-1, 
session_timeout_sec=0, enable_autocommit=False,
                  assignment_strategy=None, group_protocol=None, 
group_remote_assignor=None,
                  version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", 
jaas_override_variables=None,
                  on_record_consumed=None, reset_policy="earliest", 
verify_offsets=True):
@@ -251,8 +251,6 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         self.session_timeout_sec = session_timeout_sec
         self.enable_autocommit = enable_autocommit
         self.assignment_strategy = assignment_strategy
-        self.group_protocol = group_protocol
-        self.group_remote_assignor = group_remote_assignor
         self.prop_file = ""
         self.stop_timeout_sec = stop_timeout_sec
         self.on_record_consumed = on_record_consumed
@@ -417,10 +415,12 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         else:
             cmd += " --bootstrap-server %s" % 
self.kafka.bootstrap_servers(self.security_config.security_protocol)
 
-        cmd += " --reset-policy %s --group-id %s --topic %s --session-timeout 
%s" % \
-               (self.reset_policy, self.group_id, self.topic,
-                self.session_timeout_sec*1000)
-               
+        cmd += " --reset-policy %s --group-id %s --topic %s" % \
+                (self.reset_policy, self.group_id, self.topic)
+
+        if self.session_timeout_sec > 0:
+            cmd += " --session-timeout %s" % self.session_timeout_sec*1000
+
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
 
diff --git a/tests/kafkatest/tests/client/consumer_protocol_migration_test.py 
b/tests/kafkatest/tests/client/consumer_protocol_migration_test.py
index 07f501fe0c6..a03228b617a 100644
--- a/tests/kafkatest/tests/client/consumer_protocol_migration_test.py
+++ b/tests/kafkatest/tests/client/consumer_protocol_migration_test.py
@@ -77,7 +77,7 @@ class ConsumerProtocolMigrationTest(VerifiableConsumerTest):
             consumer.stop_node(node, clean_shutdown)
 
             wait_until(lambda: len(consumer.dead_nodes()) == 1,
-                       timeout_sec=self.session_timeout_sec+5,
+                       timeout_sec=60,
                        err_msg="Timed out waiting for the consumer to 
shutdown")
 
             consumer.start_node(node)
diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index 4bd680dd2a0..a96d90bded8 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -39,7 +39,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 consumer.stop_node(node, clean_shutdown)
 
                 wait_until(lambda: len(consumer.dead_nodes()) == 1,
-                           timeout_sec=self.session_timeout_sec+5,
+                           timeout_sec=60,
                            err_msg="Timed out waiting for the consumer to 
shutdown")
 
                 consumer.start_node(node)
@@ -101,14 +101,6 @@ class OffsetValidationTest(VerifiableConsumerTest):
         partition = TopicPartition(self.TOPIC, 0)
 
         producer = self.setup_producer(self.TOPIC)
-        # The consumers' session timeouts must exceed the time it takes for a 
broker to roll.  Consumers are likely
-        # to see cluster metadata consisting of just a single alive broker in 
the case where the cluster has just 2
-        # brokers and the cluster is rolling (which is what is happening 
here).  When the consumer sees a single alive
-        # broker, and then that broker rolls, the consumer will be unable to 
connect to the cluster until that broker
-        # completes its roll.  In the meantime, the consumer group will move 
to the group coordinator on the other
-        # broker, and that coordinator will fail the consumer and trigger a 
group rebalance if its session times out.
-        # This test is asserting that no rebalances occur, so we increase the 
session timeout for this to be the case.
-        self.session_timeout_sec = 30
         consumer = self.setup_consumer(self.TOPIC, 
group_protocol=group_protocol)
 
         producer.start()
@@ -229,7 +221,6 @@ class OffsetValidationTest(VerifiableConsumerTest):
         producer.start()
         self.await_produced_messages(producer)
 
-        self.session_timeout_sec = 60
         consumer = self.setup_consumer(self.TOPIC, 
static_membership=static_membership, group_protocol=group_protocol, 
                                        
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor")
 
@@ -295,7 +286,6 @@ class OffsetValidationTest(VerifiableConsumerTest):
         producer = self.setup_producer(self.TOPIC)
         producer.start()
         self.await_produced_messages(producer)
-        self.session_timeout_sec = 60
         consumer = self.setup_consumer(self.TOPIC, static_membership=True, 
group_protocol=group_protocol)
         consumer.start()
         self.await_all_members(consumer)
@@ -340,7 +330,6 @@ class OffsetValidationTest(VerifiableConsumerTest):
         producer.start()
         self.await_produced_messages(producer)
 
-        self.session_timeout_sec = 60
         consumer = self.setup_consumer(self.TOPIC, static_membership=True, 
group_protocol=group_protocol)
 
         self.num_consumers = num_conflict_consumers
@@ -372,7 +361,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 # Stop existing nodes, so conflicting ones should be able to 
join.
                 consumer.stop_all()
                 wait_until(lambda: len(consumer.dead_nodes()) == 
len(consumer.nodes),
-                           timeout_sec=self.session_timeout_sec+5,
+                           timeout_sec=60,
                            err_msg="Timed out waiting for the consumer to 
shutdown")
                 conflict_consumer.start()
                 self.await_members(conflict_consumer, num_conflict_consumers)
@@ -383,13 +372,13 @@ class OffsetValidationTest(VerifiableConsumerTest):
             conflict_consumer.start()
 
             wait_until(lambda: len(consumer.joined_nodes()) + 
len(conflict_consumer.joined_nodes()) == len(consumer.nodes),
-                       timeout_sec=self.session_timeout_sec*2,
+                       timeout_sec=60,
                        err_msg="Timed out waiting for consumers to join, 
expected total %d joined, but only see %d joined from "
                                "normal consumer group and %d from conflict 
consumer group" % \
                                (len(consumer.nodes), 
len(consumer.joined_nodes()), len(conflict_consumer.joined_nodes()))
                        )
             wait_until(lambda: len(consumer.dead_nodes()) + 
len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes),
-                       timeout_sec=self.session_timeout_sec*2,
+                       timeout_sec=60,
                        err_msg="Timed out waiting for fenced consumers to die, 
expected total %d dead, but only see %d dead in "
                                "normal consumer group and %d dead in conflict 
consumer group" % \
                                (len(conflict_consumer.nodes), 
len(consumer.dead_nodes()), len(conflict_consumer.dead_nodes()))
@@ -427,7 +416,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         # stop the partition owner and await its shutdown
         consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
         wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers 
- 1) and consumer.owner(partition) is not None,
-                   timeout_sec=self.session_timeout_sec*2+5,
+                   timeout_sec=60,
                    err_msg="Timed out waiting for consumer to close")
 
         # ensure that the remaining consumer does some work after rebalancing
diff --git a/tests/kafkatest/tests/client/pluggable_test.py 
b/tests/kafkatest/tests/client/pluggable_test.py
index b2f726e0163..8f74ec1c8a5 100644
--- a/tests/kafkatest/tests/client/pluggable_test.py
+++ b/tests/kafkatest/tests/client/pluggable_test.py
@@ -52,5 +52,5 @@ class PluggableConsumerTest(VerifiableConsumerTest):
 
         self.logger.debug("Waiting for %d nodes to stop" % len(consumer.nodes))
         wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
-                   timeout_sec=self.session_timeout_sec+5,
+                   timeout_sec=60,
                    err_msg="Timed out waiting for consumers to shutdown")
diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py 
b/tests/kafkatest/tests/verifiable_consumer_test.py
index 08da754732d..5353a6e82af 100644
--- a/tests/kafkatest/tests/verifiable_consumer_test.py
+++ b/tests/kafkatest/tests/verifiable_consumer_test.py
@@ -24,13 +24,12 @@ class VerifiableConsumerTest(KafkaTest):
     PRODUCER_REQUEST_TIMEOUT_SEC = 30
 
     def __init__(self, test_context, num_consumers=1, num_producers=0,
-                 group_id="test_group_id", session_timeout_sec=10, **kwargs):
+                 group_id="test_group_id", **kwargs):
         super(VerifiableConsumerTest, self).__init__(test_context, **kwargs)
         self.num_consumers = num_consumers
         self.num_producers = num_producers
         self.group_id = group_id
-        self.session_timeout_sec = session_timeout_sec
-        self.consumption_timeout_sec = max(self.PRODUCER_REQUEST_TIMEOUT_SEC + 
5, 2 * session_timeout_sec)
+        self.consumption_timeout_sec = self.PRODUCER_REQUEST_TIMEOUT_SEC + 5
 
     def _all_partitions(self, topic, num_partitions):
         partitions = set()
@@ -56,7 +55,7 @@ class VerifiableConsumerTest(KafkaTest):
     def setup_consumer(self, topic, static_membership=False, 
enable_autocommit=False,
                        
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", 
group_remote_assignor="range", **kwargs):
         return VerifiableConsumer(self.test_context, self.num_consumers, 
self.kafka,
-                                  topic, self.group_id, 
static_membership=static_membership, 
session_timeout_sec=self.session_timeout_sec,
+                                  topic, self.group_id, 
static_membership=static_membership,
                                   assignment_strategy=assignment_strategy, 
enable_autocommit=enable_autocommit,
                                   group_remote_assignor=group_remote_assignor,
                                   log_level="TRACE", **kwargs)
@@ -81,9 +80,9 @@ class VerifiableConsumerTest(KafkaTest):
     def await_members(self, consumer, num_consumers):
         # Wait until all members have joined the group
         wait_until(lambda: len(consumer.joined_nodes()) == num_consumers,
-                   timeout_sec=self.session_timeout_sec*2,
+                   timeout_sec=60,
                    err_msg="Consumers failed to join in a reasonable amount of 
time")
-        
+
     def await_all_members(self, consumer):
         self.await_members(consumer, self.num_consumers)
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 0436e0d85e8..825e5ed2c6e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -574,11 +574,10 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
         parser.addArgument("--session-timeout")
                 .action(store())
                 .required(false)
-                .setDefault(30000)
                 .type(Integer.class)
                 .metavar("TIMEOUT_MS")
                 .dest("sessionTimeout")
-                .help("Set the consumer's session timeout");
+                .help("Set the consumer's session timeout, note that this 
configuration is not supported when group protocol is consumer");
 
         parser.addArgument("--verbose")
                 .action(storeTrue())
@@ -649,10 +648,15 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
             if (groupRemoteAssignor != null)
                 consumerProps.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, 
groupRemoteAssignor);
         } else {
-            // This means we're using the old consumer group protocol.
+            // This means we're using the CLASSIC consumer group protocol.
             
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
res.getString("assignmentStrategy"));
         }
 
+        Integer sessionTimeout = res.getInt("sessionTimeout");
+        if (sessionTimeout != null) {
+            consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
Integer.toString(sessionTimeout));
+        }
+
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
res.getString("groupId"));
 
         String groupInstanceId = res.getString("groupInstanceId");
@@ -664,7 +668,6 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
 
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
useAutoCommit);
         consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
res.getString("resetPolicy"));
-        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
Integer.toString(res.getInt("sessionTimeout")));
 
         StringDeserializer deserializer = new StringDeserializer();
         KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(consumerProps, deserializer, deserializer);

Reply via email to