This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7a64623e408 Set protocol for streams tests (#18160)
7a64623e408 is described below
commit 7a64623e4081583031707be715756b668c9ea7cd
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Dec 12 13:33:43 2024 -0500
Set protocol for streams tests (#18160)
Reviewers: Bill Bejeck <[email protected]>
---
.../kafka/streams/integration/utils/EmbeddedKafkaCluster.java | 7 ++++++-
.../java/org/apache/kafka/tools/AbstractResetIntegrationTest.java | 3 +++
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index dfdbc567d0b..aad73d2d89a 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -71,6 +72,7 @@ import static
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET
import static
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.common.utils.Utils.mkProperties;
@@ -170,7 +172,10 @@ public class EmbeddedKafkaCluster {
public void verifyClusterReadiness() {
final UUID uuid = UUID.randomUUID();
final String consumerGroupId = "group-warmup-" + uuid;
- final Map<String, Object> consumerConfig =
Collections.singletonMap(GROUP_ID_CONFIG, consumerGroupId);
+ final Map<String, Object> consumerConfig = Map.of(
+ GROUP_ID_CONFIG, consumerGroupId,
+ GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name()
+ );
final String topic = "topic-warmup-" + uuid;
createTopic(topic);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
index 9d0f030b5b1..b0b99d05638 100644
---
a/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.tools;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
@@ -127,6 +128,7 @@ public abstract class AbstractResetIntegrationTest {
resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class);
resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class);
+ resultConsumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name());
resultConsumerConfig.putAll(commonClientConfig);
streamsConfig = new Properties();
@@ -406,6 +408,7 @@ public abstract class AbstractResetIntegrationTest {
final Properties cleanUpConfig = new Properties();
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
+ cleanUpConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name);
return new StreamsResetter().execute(parameters, cleanUpConfig) == 0;
}