This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 84bc0c26ee0 KAFKA-18224: Explicit group protocol setting in streams 
resetter (#18172)
84bc0c26ee0 is described below

commit 84bc0c26ee096a44730bbcae9d996ad6987541c2
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Dec 13 14:31:50 2024 -0500

    KAFKA-18224: Explicit group protocol setting in streams resetter (#18172)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../integration/utils/IntegrationTestUtils.java    |  5 +-
 .../org/apache/kafka/tools/StreamsResetter.java    |  9 ++++
 .../kafka/tools/AbstractResetIntegrationTest.java  |  1 -
 .../apache/kafka/tools/ResetIntegrationTest.java   | 54 ++++++++++++++++++++++
 4 files changed, 67 insertions(+), 2 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 6fbdfeaf79b..a7b3f838f2a 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -1189,7 +1190,8 @@ public class IntegrationTestUtils {
     /**
      * Sets up a {@link KafkaConsumer} from a copy of the given configuration 
that has
      * {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} set to "earliest" and 
{@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG}
-     * set to "true" to prevent missing events as well as repeat consumption.
+     * set to "true" to prevent missing events as well as repeat consumption. 
This also sets
+     * {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG} to "classic".
      * @param consumerConfig Consumer configuration
      * @return Consumer
      */
@@ -1198,6 +1200,7 @@ public class IntegrationTestUtils {
         filtered.putAll(consumerConfig);
         filtered.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         filtered.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        filtered.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name);
         return new KafkaConsumer<>(filtered);
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java 
b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
index a100110517e..4a115f5b641 100644
--- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.KafkaFuture;
@@ -60,6 +61,8 @@ import joptsimple.OptionException;
 import joptsimple.OptionSpec;
 import joptsimple.OptionSpecBuilder;
 
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+
 
 /**
  * {@link StreamsResetter} resets the processing state of a Kafka Streams 
application so that, for example,
@@ -152,6 +155,12 @@ public class StreamsResetter {
                 }
 
                 final HashMap<Object, Object> consumerConfig = new 
HashMap<>(config);
+                if (consumerConfig.containsKey(GROUP_PROTOCOL_CONFIG) &&
+                    
!consumerConfig.get(GROUP_PROTOCOL_CONFIG).toString().equalsIgnoreCase(GroupProtocol.CLASSIC.name())
+                ) {
+                    System.out.println("WARNING: provided group protocol will 
be ignored. Using supported " + GroupProtocol.CLASSIC.name() + " protocol 
instead");
+                }
+                consumerConfig.put(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name());
                 consumerConfig.putAll(properties);
                 int exitCode = 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, options);
                 exitCode |= maybeDeleteInternalTopics(adminClient, options);
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
index b0b99d05638..dbfba1ad241 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
@@ -408,7 +408,6 @@ public abstract class AbstractResetIntegrationTest {
         final Properties cleanUpConfig = new Properties();
         cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
-        cleanUpConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name);
 
         return new StreamsResetter().execute(parameters, cleanUpConfig) == 0;
     }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
index fc2e8ffcfdf..b8b9f312e8c 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.network.SocketServerConfigs;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -136,6 +137,59 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         assertEquals(1, exitCode);
     }
 
+    @Test
+    public void shouldDefaultToClassicGroupProtocol(final TestInfo testInfo) {
+        final String appID = safeUniqueTestName(testInfo);
+        final String[] parameters = new String[] {
+            "--application-id", appID,
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--input-topics", INPUT_TOPIC
+        };
+        final Properties cleanUpConfig = new Properties();
+
+        // Set properties that are only allowed under the CLASSIC group 
protocol.
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
+        final int exitCode = new StreamsResetter().execute(parameters, 
cleanUpConfig);
+        assertEquals(0, exitCode, "Resetter should use the CLASSIC group 
protocol");
+    }
+
+    @Test
+    public void shouldAllowGroupProtocolClassic(final TestInfo testInfo) {
+        final String appID = safeUniqueTestName(testInfo);
+        final String[] parameters = new String[] {
+            "--application-id", appID,
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--input-topics", INPUT_TOPIC
+        };
+        final Properties cleanUpConfig = new Properties();
+
+        // Protocol config CLASSIC not needed but allowed.
+        cleanUpConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name());
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
+        int exitCode = new StreamsResetter().execute(parameters, 
cleanUpConfig);
+        assertEquals(0, exitCode, "Resetter should allow setting group 
protocol to CLASSIC");
+    }
+
+    @Test
+    public void shouldOverwriteGroupProtocolOtherThanClassic(final TestInfo 
testInfo) {
+        final String appID = safeUniqueTestName(testInfo);
+        final String[] parameters = new String[] {
+            "--application-id", appID,
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--input-topics", INPUT_TOPIC
+        };
+        final Properties cleanUpConfig = new Properties();
+
+        // Protocol config other than CLASSIC allowed but overwritten to 
CLASSIC.
+        cleanUpConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
+        int exitCode = new StreamsResetter().execute(parameters, 
cleanUpConfig);
+        assertEquals(0, exitCode, "Resetter should overwrite the group 
protocol to CLASSIC");
+    }
+
     @Test
     public void shouldNotAllowToResetWhenIntermediateTopicAbsent(final 
TestInfo testInfo) {
         final String appID = safeUniqueTestName(testInfo);

Reply via email to