This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 247c271 MINOR: retry when deleting offsets for named topologies
(#11604)
247c271 is described below
commit 247c271353752a588162983a1a6f7eb96cf9870f
Author: Walker Carlson <[email protected]>
AuthorDate: Thu Dec 16 21:39:55 2021 -0600
MINOR: retry when deleting offsets for named topologies (#11604)
When this was made I didn't expect deleteOffsetsResult to be set if an
exception was thrown. But it is and to retry we need to reset it to null.
Changing the KafkaStreamsNamedTopologyWrapper for remove topology when
resetting offsets to retry upon GroupSubscribedToTopicException and
swallow/complete upon GroupIdNotFoundException
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@ache.>
---
.../internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java | 6 ++++++
1 file changed, 6 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index 0dd7eca..b332f94 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -20,6 +20,7 @@ import
org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
@@ -221,9 +222,14 @@ public class KafkaStreamsNamedTopologyWrapper extends
KafkaStreams {
.getMessage()
.equals("Deleting offsets of a topic is
forbidden while the consumer group is actively subscribed to it.")) {
ex.printStackTrace();
+ } else if (ex.getCause() != null &&
+ ex.getCause() instanceof
GroupIdNotFoundException) {
+ log.debug("The offsets have been reset by
another client or the group has been deleted, no need to retry further.");
+ break;
} else {
future.completeExceptionally(ex);
}
+ deleteOffsetsResult = null;
}
try {
Thread.sleep(100);