This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 84bc0c26ee0 KAFKA-18224: Explicit group protocol setting in streams
resetter (#18172)
84bc0c26ee0 is described below
commit 84bc0c26ee096a44730bbcae9d996ad6987541c2
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Dec 13 14:31:50 2024 -0500
KAFKA-18224: Explicit group protocol setting in streams resetter (#18172)
Reviewers: Matthias J. Sax <[email protected]>
---
.../integration/utils/IntegrationTestUtils.java | 5 +-
.../org/apache/kafka/tools/StreamsResetter.java | 9 ++++
.../kafka/tools/AbstractResetIntegrationTest.java | 1 -
.../apache/kafka/tools/ResetIntegrationTest.java | 54 ++++++++++++++++++++++
4 files changed, 67 insertions(+), 2 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 6fbdfeaf79b..a7b3f838f2a 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -1189,7 +1190,8 @@ public class IntegrationTestUtils {
/**
* Sets up a {@link KafkaConsumer} from a copy of the given configuration
that has
* {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} set to "earliest" and
{@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG}
- * set to "true" to prevent missing events as well as repeat consumption.
+ * set to "true" to prevent missing events as well as repeat consumption.
This also sets
+ * {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG} to "classic".
* @param consumerConfig Consumer configuration
* @return Consumer
*/
@@ -1198,6 +1200,7 @@ public class IntegrationTestUtils {
filtered.putAll(consumerConfig);
filtered.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
filtered.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ filtered.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name);
return new KafkaConsumer<>(filtered);
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
index a100110517e..4a115f5b641 100644
--- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaFuture;
@@ -60,6 +61,8 @@ import joptsimple.OptionException;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+
/**
* {@link StreamsResetter} resets the processing state of a Kafka Streams
application so that, for example,
@@ -152,6 +155,12 @@ public class StreamsResetter {
}
final HashMap<Object, Object> consumerConfig = new
HashMap<>(config);
+ if (consumerConfig.containsKey(GROUP_PROTOCOL_CONFIG) &&
+
!consumerConfig.get(GROUP_PROTOCOL_CONFIG).toString().equalsIgnoreCase(GroupProtocol.CLASSIC.name())
+ ) {
+ System.out.println("WARNING: provided group protocol will
be ignored. Using supported " + GroupProtocol.CLASSIC.name() + " protocol
instead");
+ }
+ consumerConfig.put(GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name());
consumerConfig.putAll(properties);
int exitCode =
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, options);
exitCode |= maybeDeleteInternalTopics(adminClient, options);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
index b0b99d05638..dbfba1ad241 100644
---
a/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
@@ -408,7 +408,6 @@ public abstract class AbstractResetIntegrationTest {
final Properties cleanUpConfig = new Properties();
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
- cleanUpConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name);
return new StreamsResetter().execute(parameters, cleanUpConfig) == 0;
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
index fc2e8ffcfdf..b8b9f312e8c 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.tools;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -136,6 +137,59 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
assertEquals(1, exitCode);
}
+ @Test
+ public void shouldDefaultToClassicGroupProtocol(final TestInfo testInfo) {
+ final String appID = safeUniqueTestName(testInfo);
+ final String[] parameters = new String[] {
+ "--application-id", appID,
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--input-topics", INPUT_TOPIC
+ };
+ final Properties cleanUpConfig = new Properties();
+
+ // Set properties that are only allowed under the CLASSIC group
protocol.
+ cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
+ final int exitCode = new StreamsResetter().execute(parameters,
cleanUpConfig);
+ assertEquals(0, exitCode, "Resetter should use the CLASSIC group
protocol");
+ }
+
+ @Test
+ public void shouldAllowGroupProtocolClassic(final TestInfo testInfo) {
+ final String appID = safeUniqueTestName(testInfo);
+ final String[] parameters = new String[] {
+ "--application-id", appID,
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--input-topics", INPUT_TOPIC
+ };
+ final Properties cleanUpConfig = new Properties();
+
+ // Protocol config CLASSIC not needed but allowed.
+ cleanUpConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name());
+ cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
+ int exitCode = new StreamsResetter().execute(parameters,
cleanUpConfig);
+ assertEquals(0, exitCode, "Resetter should allow setting group
protocol to CLASSIC");
+ }
+
+ @Test
+ public void shouldOverwriteGroupProtocolOtherThanClassic(final TestInfo
testInfo) {
+ final String appID = safeUniqueTestName(testInfo);
+ final String[] parameters = new String[] {
+ "--application-id", appID,
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--input-topics", INPUT_TOPIC
+ };
+ final Properties cleanUpConfig = new Properties();
+
+ // Protocol config other than CLASSIC allowed but overwritten to
CLASSIC.
+ cleanUpConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name());
+ cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
+ int exitCode = new StreamsResetter().execute(parameters,
cleanUpConfig);
+ assertEquals(0, exitCode, "Resetter should overwrite the group
protocol to CLASSIC");
+ }
+
@Test
public void shouldNotAllowToResetWhenIntermediateTopicAbsent(final
TestInfo testInfo) {
final String appID = safeUniqueTestName(testInfo);