This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee3cea05aac KAFKA-16816: Remove unneeded FencedInstanceId support on 
commit path for new consumer (#17559)
ee3cea05aac is described below

commit ee3cea05aac766c60cc858c33c7cd489ddcbdf9f
Author: TaiJuWu <[email protected]>
AuthorDate: Tue Nov 5 21:33:23 2024 +0800

    KAFKA-16816: Remove unneeded FencedInstanceId support on commit path for 
new consumer (#17559)
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../kafka/clients/consumer/KafkaConsumer.java      | 21 ++++--
 .../consumer/internals/AsyncKafkaConsumer.java     | 26 -------
 .../consumer/internals/CommitRequestManager.java   |  5 --
 .../consumer/internals/AsyncKafkaConsumerTest.java | 87 ----------------------
 .../internals/CommitRequestManagerTest.java        | 25 -------
 5 files changed, 14 insertions(+), 150 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1b12ae3d4b4..619659f3274 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -873,7 +873,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             is too large or if the topic does not exist).
      * @throws org.apache.kafka.common.errors.TimeoutException if the timeout 
specified by {@code default.api.timeout.ms} expires
      *            before successful completion of the offset commit
-     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer instance gets fenced by broker.
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer is using the classic group protocol
+     *            and this instance gets fenced by broker.
      */
     @Override
     public void commitSync() {
@@ -916,7 +917,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             is too large or if the topic does not exist).
      * @throws org.apache.kafka.common.errors.TimeoutException if the timeout 
expires before successful completion
      *            of the offset commit
-     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer instance gets fenced by broker.
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer is using the classic group protocol
+     *            and this instance gets fenced by broker.
      */
     @Override
     public void commitSync(Duration timeout) {
@@ -964,7 +966,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             is too large or if the topic does not exist).
      * @throws org.apache.kafka.common.errors.TimeoutException if the timeout 
expires before successful completion
      *            of the offset commit
-     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer instance gets fenced by broker.
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer is using the classic group protocol
+     *            and this instance gets fenced by broker.
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
@@ -1012,7 +1015,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *             is too large or if the topic does not exist).
      * @throws org.apache.kafka.common.errors.TimeoutException if the timeout 
expires before successful completion
      *            of the offset commit
-     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer instance gets fenced by broker.
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer is using the classic group protocol
+     *            and this instance gets fenced by broker.
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> 
offsets, final Duration timeout) {
@@ -1022,7 +1026,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     /**
      * Commit offsets returned on the last {@link #poll(Duration)} for all the 
subscribed list of topics and partition.
      * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
-     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer instance gets fenced by broker.
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer is using the classic group protocol
+     *            and this instance gets fenced by broker.
      */
     @Override
     public void commitAsync() {
@@ -1045,7 +1050,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * (and variants) returns.
      *
      * @param callback Callback to invoke when the commit completes
-     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer instance gets fenced by broker.
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer is using the classic group protocol
+     *             and this instance gets fenced by broker.
      */
     @Override
     public void commitAsync(OffsetCommitCallback callback) {
@@ -1072,7 +1078,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @param offsets A map of offsets by partition with associate metadata. 
This map will be copied internally, so it
      *                is safe to mutate the map after returning.
      * @param callback Callback to invoke when the commit completes
-     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer instance gets fenced by broker.
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer is using the classic group protocol
+     *             and this instance gets fenced by broker.
      */
     @Override
     public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> 
offsets, OffsetCommitCallback callback) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index fccd69c86b8..f4787580361 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -75,7 +75,6 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidTopicException;
@@ -117,7 +116,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -249,7 +247,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private boolean cachedSubscriptionHasAllFetchPositions;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
     private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
-    private final AtomicBoolean asyncCommitFenced;
     // Last triggered async commit future. Used to wait until all previous 
async commits are completed.
     // We only need to keep track of the last one, since they are guaranteed 
to complete in order.
     private CompletableFuture<Void> lastPendingAsyncCommit = null;
@@ -336,7 +333,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
                     backgroundEventHandler);
             this.offsetCommitCallbackInvoker = new 
OffsetCommitCallbackInvoker(interceptors);
-            this.asyncCommitFenced = new AtomicBoolean(false);
             this.groupMetadata.set(initializeGroupMetadata(config, 
groupRebalanceConfig));
             final Supplier<RequestManagers> requestManagersSupplier = 
RequestManagers.supplier(time,
                     logContext,
@@ -448,7 +444,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.clientTelemetryReporter = Optional.empty();
         this.autoCommitEnabled = autoCommitEnabled;
         this.offsetCommitCallbackInvoker = new 
OffsetCommitCallbackInvoker(interceptors);
-        this.asyncCommitFenced = new AtomicBoolean(false);
     }
 
     AsyncKafkaConsumer(LogContext logContext,
@@ -511,7 +506,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             backgroundEventHandler
         );
         this.offsetCommitCallbackInvoker = new 
OffsetCommitCallbackInvoker(interceptors);
-        this.asyncCommitFenced = new AtomicBoolean(false);
         Supplier<RequestManagers> requestManagersSupplier = 
RequestManagers.supplier(
             time,
             logContext,
@@ -766,10 +760,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets);
                 }
 
-                if (t instanceof FencedInstanceIdException) {
-                    asyncCommitFenced.set(true);
-                }
-
                 if (callback == null) {
                     if (t != null) {
                         log.error("Offset commit with offsets {} failed", 
offsets, t);
@@ -786,7 +776,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
         maybeThrowInvalidGroupIdException();
-        maybeThrowFencedInstanceException();
         offsetCommitCallbackInvoker.executeCallbacks();
 
         Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets();
@@ -1657,7 +1646,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
-        maybeThrowFencedInstanceException();
         offsetCommitCallbackInvoker.executeCallbacks();
         try {
             applicationEventHandler.addAndGet(new 
UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
@@ -1940,20 +1928,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         return kafkaConsumerMetrics;
     }
 
-    private void maybeThrowFencedInstanceException() {
-        if (asyncCommitFenced.get()) {
-            String groupInstanceId = "unknown";
-            if (!groupMetadata.get().isPresent()) {
-                log.error("No group metadata found although a group ID was 
provided. This is a bug!");
-            } else if 
(!groupMetadata.get().get().groupInstanceId().isPresent()) {
-                log.error("No group instance ID found although the consumer is 
fenced. This is a bug!");
-            } else {
-                groupInstanceId = 
groupMetadata.get().get().groupInstanceId().get();
-            }
-            throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " + groupInstanceId);
-        }
-    }
-
     // Visible for testing
     SubscriptionState subscriptions() {
         return subscriptions;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 5a9e0455d29..dfa2520a02b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -735,11 +735,6 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                         
coordinatorRequestManager.markCoordinatorUnknown(error.message(), 
currentTimeMs);
                         future.completeExceptionally(error.exception());
                         return;
-                    } else if (error == Errors.FENCED_INSTANCE_ID) {
-                        String fencedError = "OffsetCommit failed due to group 
instance id fenced: " + groupInstanceId;
-                        log.error(fencedError);
-                        future.completeExceptionally(new 
CommitFailedException(fencedError));
-                        return;
                     } else if (error == Errors.OFFSET_METADATA_TOO_LARGE ||
                         error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                         future.completeExceptionally(error.exception());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index f9db6ba05da..380b7082b97 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -57,7 +57,6 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
@@ -334,23 +333,6 @@ public class AsyncKafkaConsumerTest {
                 new GroupAuthorizationException("Group authorization 
exception"));
     }
 
-    @Test
-    public void testCommitAsyncWithFencedException() {
-        consumer = newConsumer();
-        completeCommitSyncApplicationEventSuccessfully();
-        final Map<TopicPartition, OffsetAndMetadata> offsets = 
mockTopicPartitionOffset();
-        MockCommitCallback callback = new MockCommitCallback();
-
-        assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
-
-        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
-        verify(applicationEventHandler).add(commitEventCaptor.capture());
-        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
-        
commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
-
-        assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> 
consumer.commitAsync());
-    }
-
     @Test
     public void testCommitted() {
         time = new MockTime(1);
@@ -610,52 +592,6 @@ public class AsyncKafkaConsumerTest {
         }
     }
 
-    @Test
-    public void testCommitAsyncTriggersFencedExceptionFromCommitAsync() {
-        final String groupId = "consumerGroupA";
-        final String groupInstanceId = "groupInstanceId1";
-        final Properties props = requiredConsumerConfigAndGroupId(groupId);
-        props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
-        final ConsumerConfig config = new ConsumerConfig(props);
-        consumer = newConsumer(config);
-        
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
-        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
-        final TopicPartition tp = new TopicPartition("foo", 0);
-        completeAssignmentChangeEventSuccessfully();
-        consumer.assign(Collections.singleton(tp));
-        completeSeekUnvalidatedEventSuccessfully();
-        consumer.seek(tp, 20);
-
-        assertDoesNotThrow(() -> consumer.commitAsync());
-
-        Exception e = assertThrows(FencedInstanceIdException.class, () -> 
consumer.commitAsync());
-        assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
-    }
-
-    @Test
-    public void testCommitSyncTriggersFencedExceptionFromCommitAsync() {
-        final String groupId = "consumerGroupA";
-        final String groupInstanceId = "groupInstanceId1";
-        final Properties props = requiredConsumerConfigAndGroupId(groupId);
-        props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
-        final ConsumerConfig config = new ConsumerConfig(props);
-        consumer = newConsumer(config);
-        
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
-        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
-        final TopicPartition tp = new TopicPartition("foo", 0);
-        completeAssignmentChangeEventSuccessfully();
-        consumer.assign(Collections.singleton(tp));
-        completeSeekUnvalidatedEventSuccessfully();
-        consumer.seek(tp, 20);
-
-        assertDoesNotThrow(() -> consumer.commitAsync());
-
-        Exception e =  assertThrows(FencedInstanceIdException.class, () -> 
consumer.commitSync());
-        assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
-    }
-
     @Test
     public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
         final TopicPartition tp = new TopicPartition("foo", 0);
@@ -739,29 +675,6 @@ public class AsyncKafkaConsumerTest {
         return allValues.get(allValues.size() - 1);
     }
 
-    @Test
-    public void testPollTriggersFencedExceptionFromCommitAsync() {
-        final String groupId = "consumerGroupA";
-        final String groupInstanceId = "groupInstanceId1";
-        final Properties props = requiredConsumerConfigAndGroupId(groupId);
-        props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
-        final ConsumerConfig config = new ConsumerConfig(props);
-        consumer = newConsumer(config);
-        
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
-        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
-        final TopicPartition tp = new TopicPartition("foo", 0);
-        completeAssignmentChangeEventSuccessfully();
-        consumer.assign(Collections.singleton(tp));
-        completeSeekUnvalidatedEventSuccessfully();
-        consumer.seek(tp, 20);
-
-        assertDoesNotThrow(() -> consumer.commitAsync());
-
-        Exception e = assertThrows(FencedInstanceIdException.class, () -> 
consumer.poll(Duration.ZERO));
-        assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
-    }
-
     @Test
     public void testEnsurePollExecutedCommitAsyncCallbacks() {
         consumer = newConsumer();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 1a805e642fc..9ded26b9e71 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -340,27 +340,8 @@ public class CommitRequestManagerTest {
         assertExceptionHandling(commitRequestManager, error, true);
     }
 
-    @ParameterizedTest
-    @MethodSource("commitSyncExpectedExceptions")
-    public void testCommitSyncFailsWithExpectedException(Errors commitError,
-                                                         Class<? extends 
Exception> expectedException) {
-        CommitRequestManager commitRequestManager = create(false, 100);
-        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
-
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
-            new TopicPartition("topic", 1),
-            new OffsetAndMetadata(0));
-
-        // Send sync offset commit that fails and verify it propagates the 
expected exception.
-        long deadlineMs = time.milliseconds() + retryBackoffMs;
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
-        completeOffsetCommitRequestWithError(commitRequestManager, 
commitError);
-        assertFutureThrows(commitResult, expectedException);
-    }
-
     private static Stream<Arguments> commitSyncExpectedExceptions() {
         return Stream.of(
-            Arguments.of(Errors.FENCED_INSTANCE_ID, 
CommitFailedException.class),
             Arguments.of(Errors.UNKNOWN_MEMBER_ID, 
CommitFailedException.class),
             Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, 
Errors.OFFSET_METADATA_TOO_LARGE.exception().getClass()),
             Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, 
Errors.INVALID_COMMIT_OFFSET_SIZE.exception().getClass()),
@@ -985,10 +966,6 @@ public class CommitRequestManagerTest {
             case INVALID_COMMIT_OFFSET_SIZE:
                 assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
                 break;
-            case FENCED_INSTANCE_ID:
-                // This is a fatal failure, so we should not retry
-                assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
-                break;
             default:
                 if (errors.exception() instanceof RetriableException && 
requestShouldBeRetried) {
                     assertRetryBackOff(commitRequestManager, remainBackoffMs);
@@ -1279,7 +1256,6 @@ public class CommitRequestManagerTest {
             Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
             Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
             Arguments.of(Errors.REQUEST_TIMED_OUT),
-            Arguments.of(Errors.FENCED_INSTANCE_ID),
             Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
             Arguments.of(Errors.STALE_MEMBER_EPOCH),
             Arguments.of(Errors.UNKNOWN_MEMBER_ID));
@@ -1299,7 +1275,6 @@ public class CommitRequestManagerTest {
             Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
             Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
             Arguments.of(Errors.REQUEST_TIMED_OUT),
-            Arguments.of(Errors.FENCED_INSTANCE_ID),
             Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
             Arguments.of(Errors.UNKNOWN_MEMBER_ID),
             // Adding STALE_MEMBER_EPOCH as non-retriable here because it is 
only retried if a new

Reply via email to