This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a64623e408 Set protocol for streams tests (#18160)
7a64623e408 is described below

commit 7a64623e4081583031707be715756b668c9ea7cd
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Dec 12 13:33:43 2024 -0500

    Set protocol for streams tests (#18160)
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../kafka/streams/integration/utils/EmbeddedKafkaCluster.java      | 7 ++++++-
 .../java/org/apache/kafka/tools/AbstractResetIntegrationTest.java  | 3 +++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index dfdbc567d0b..aad73d2d89a 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -71,6 +72,7 @@ import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
@@ -170,7 +172,10 @@ public class EmbeddedKafkaCluster {
     public void verifyClusterReadiness() {
         final UUID uuid = UUID.randomUUID();
         final String consumerGroupId = "group-warmup-" + uuid;
-        final Map<String, Object> consumerConfig = 
Collections.singletonMap(GROUP_ID_CONFIG, consumerGroupId);
+        final Map<String, Object> consumerConfig = Map.of(
+            GROUP_ID_CONFIG, consumerGroupId,
+            GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name()
+        );
         final String topic = "topic-warmup-" + uuid;
 
         createTopic(topic);
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
index 9d0f030b5b1..b0b99d05638 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.tools;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
@@ -127,6 +128,7 @@ public abstract class AbstractResetIntegrationTest {
         resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
         
resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name());
         resultConsumerConfig.putAll(commonClientConfig);
 
         streamsConfig = new Properties();
@@ -406,6 +408,7 @@ public abstract class AbstractResetIntegrationTest {
         final Properties cleanUpConfig = new Properties();
         cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
+        cleanUpConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name);
 
         return new StreamsResetter().execute(parameters, cleanUpConfig) == 0;
     }

Reply via email to