This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b6b2c9ebc45 KAFKA-16985: Ensure consumer attempts to send leave 
request on close even if interrupted (#16686)
b6b2c9ebc45 is described below

commit b6b2c9ebc45bd60572c24355886620dbdc406ce9
Author: Kirk True <[email protected]>
AuthorDate: Wed Nov 13 11:26:40 2024 -0800

    KAFKA-16985: Ensure consumer attempts to send leave request on close even 
if interrupted (#16686)
    
    Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai 
<[email protected]>, Lianet Magrans <[email protected]>, Philip Nee 
<[email protected]>
---
 .../internals/AbstractMembershipManager.java       |  96 ++++++++---
 .../consumer/internals/AsyncKafkaConsumer.java     | 179 +++++++++++++++++----
 .../consumer/internals/MemberStateListener.java    |  15 +-
 .../consumer/internals/SubscriptionState.java      |   2 +-
 .../internals/events/ApplicationEvent.java         |   2 +-
 .../events/ApplicationEventProcessor.java          |  13 ++
 .../internals/events/LeaveGroupOnCloseEvent.java   |  37 +++++
 .../consumer/internals/AsyncKafkaConsumerTest.java |  83 ++++++----
 .../kafka/api/PlaintextConsumerTest.scala          |  49 +++++-
 9 files changed, 385 insertions(+), 91 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 2c4867f2f13..82b4e567d34 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
 
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -130,7 +129,8 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
      * requests in cases where a currently assigned topic is in the target 
assignment (new
      * partition assigned, or revoked), but it is not present the Metadata 
cache at that moment.
      * The cache is cleared when the subscription changes ({@link 
#transitionToJoining()}, the
-     * member fails ({@link #transitionToFatal()} or leaves the group ({@link 
#leaveGroup()}).
+     * member fails ({@link #transitionToFatal()} or leaves the group
+     * ({@link #leaveGroup()}/{@link #leaveGroupOnClose()}).
      */
     private final Map<Uuid, String> assignedTopicNamesCache;
 
@@ -157,9 +157,9 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
     private boolean rejoinedWhileReconciliationInProgress;
 
     /**
-     * If the member is currently leaving the group after a call to {@link 
#leaveGroup()}}, this
-     * will have a future that will complete when the ongoing leave operation 
completes
-     * (callbacks executed and heartbeat request to leave is sent out). This 
will be empty is the
+     * If the member is currently leaving the group after a call to {@link 
#leaveGroup()} or
+     * {@link #leaveGroupOnClose()}, this will have a future that will 
complete when the ongoing leave operation
+     * completes (callbacks executed and heartbeat request to leave is sent 
out). This will be empty is the
      * member is not leaving.
      */
     private Optional<CompletableFuture<Void>> leaveGroupInProgress = 
Optional.empty();
@@ -481,6 +481,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
     private void clearAssignment() {
         if (subscriptions.hasAutoAssignedPartitions()) {
             subscriptions.assignFromSubscribed(Collections.emptySet());
+            notifyAssignmentChange(Collections.emptySet());
         }
         currentAssignment = LocalAssignment.NONE;
         clearPendingAssignmentsAndLocalNamesCache();
@@ -496,8 +497,9 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
      */
     private void 
updateSubscriptionAwaitingCallback(SortedSet<TopicIdPartition> 
assignedPartitions,
                                                     SortedSet<TopicPartition> 
addedPartitions) {
-        Collection<TopicPartition> assignedTopicPartitions = 
toTopicPartitionSet(assignedPartitions);
+        Set<TopicPartition> assignedTopicPartitions = 
toTopicPartitionSet(assignedPartitions);
         
subscriptions.assignFromSubscribedAwaitingCallback(assignedTopicPartitions, 
addedPartitions);
+        notifyAssignmentChange(assignedTopicPartitions);
     }
 
     /**
@@ -523,18 +525,45 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
     /**
      * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
      * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
-     * This is expected to be invoked when the user calls the unsubscribe API.
+     * This is expected to be invoked when the user calls the {@link 
Consumer#close()} API.
+     *
+     * @return Future that will complete when the heartbeat to leave the group 
has been sent out.
+     */
+    public CompletableFuture<Void> leaveGroupOnClose() {
+        return leaveGroup(false);
+    }
+
+    /**
+     * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
+     * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
+     * This is expected to be invoked when the user calls the {@link 
Consumer#unsubscribe()} API.
      *
      * @return Future that will complete when the callback execution completes 
and the heartbeat
      * to leave the group has been sent out.
      */
     public CompletableFuture<Void> leaveGroup() {
+        return leaveGroup(true);
+    }
+
+    /**
+     * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
+     * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
+     * This is expected to be invoked when the user calls the unsubscribe API 
or is closing the consumer.
+     *
+     * @param runCallbacks {@code true} to insert the step to execute the 
{@link ConsumerRebalanceListener} callback,
+     *                     {@code false} to skip
+     *
+     * @return Future that will complete when the callback execution completes 
and the heartbeat
+     * to leave the group has been sent out.
+     */
+    protected CompletableFuture<Void> leaveGroup(boolean runCallbacks) {
         if (isNotInGroup()) {
             if (state == MemberState.FENCED) {
                 clearAssignment();
                 transitionTo(MemberState.UNSUBSCRIBED);
             }
             subscriptions.unsubscribe();
+            notifyAssignmentChange(Collections.emptySet());
             return CompletableFuture.completedFuture(null);
         }
 
@@ -549,31 +578,39 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
         CompletableFuture<Void> leaveResult = new CompletableFuture<>();
         leaveGroupInProgress = Optional.of(leaveResult);
 
-        CompletableFuture<Void> callbackResult = signalMemberLeavingGroup();
-        callbackResult.whenComplete((result, error) -> {
-            if (error != null) {
-                log.error("Member {} callback to release assignment failed. It 
will proceed " +
-                    "to clear its assignment and send a leave group 
heartbeat", memberId, error);
-            } else {
-                log.info("Member {} completed callback to release assignment. 
It will proceed " +
-                    "to clear its assignment and send a leave group 
heartbeat", memberId);
-            }
-
-            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
-            subscriptions.unsubscribe();
-            clearAssignment();
+        if (runCallbacks) {
+            CompletableFuture<Void> callbackResult = 
signalMemberLeavingGroup();
+            callbackResult.whenComplete((result, error) -> {
+                if (error != null) {
+                    log.error("Member {} callback to release assignment 
failed. It will proceed " +
+                        "to clear its assignment and send a leave group 
heartbeat", memberId, error);
+                } else {
+                    log.info("Member {} completed callback to release 
assignment. It will proceed " +
+                        "to clear its assignment and send a leave group 
heartbeat", memberId);
+                }
 
-            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
-            // group (even in the case where the member had no assignment to 
release or when the
-            // callback execution failed.)
-            transitionToSendingLeaveGroup(false);
-        });
+                // Clear the assignment, no matter if the callback execution 
failed or succeeded.
+                clearAssignmentAndLeaveGroup();
+            });
+        } else {
+            clearAssignmentAndLeaveGroup();
+        }
 
         // Return future to indicate that the leave group is done when the 
callbacks
         // complete, and the transition to send the heartbeat has been made.
         return leaveResult;
     }
 
+    private void clearAssignmentAndLeaveGroup() {
+        subscriptions.unsubscribe();
+        clearAssignment();
+
+        // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+        // group (even in the case where the member had no assignment to 
release or when the
+        // callback execution failed.)
+        transitionToSendingLeaveGroup(false);
+    }
+
     /**
      * Reset member epoch to the value required for the leave the group 
heartbeat request, and
      * transition to the {@link MemberState#LEAVING} state so that a heartbeat 
request is sent
@@ -616,6 +653,15 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
         stateUpdatesListeners.forEach(stateListener -> 
stateListener.onMemberEpochUpdated(epoch, memberId));
     }
 
+    /**
+     * Invokes the {@link MemberStateListener#onGroupAssignmentUpdated(Set)} 
callback for each listener when the
+     * set of assigned partitions changes. This includes on assignment 
changes, unsubscribe, and when leaving
+     * the group.
+     */
+    void notifyAssignmentChange(Set<TopicPartition> partitions) {
+        stateUpdatesListeners.forEach(stateListener -> 
stateListener.onGroupAssignmentUpdated(partitions));
+    }
+
     /**
      * @return True if the member should send heartbeat to the coordinator 
without waiting for
      * the interval.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 7fda9a20c05..fcf688b7f8e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -54,6 +54,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEve
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
@@ -112,6 +113,7 @@ import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
@@ -126,6 +128,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
@@ -170,12 +173,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      */
     private class BackgroundEventProcessor implements 
EventProcessor<BackgroundEvent> {
 
-        private final ConsumerRebalanceListenerInvoker 
rebalanceListenerInvoker;
-
-        public BackgroundEventProcessor(final ConsumerRebalanceListenerInvoker 
rebalanceListenerInvoker) {
-            this.rebalanceListenerInvoker = rebalanceListenerInvoker;
-        }
-
         @Override
         public void process(final BackgroundEvent event) {
             switch (event.type()) {
@@ -234,6 +231,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private final IsolationLevel isolationLevel;
 
     private final SubscriptionState subscriptions;
+
+    /**
+     * This is a snapshot of the partitions assigned to this consumer. 
HOWEVER, this is only populated and used in
+     * the case where this consumer is in a consumer group. Self-assigned 
partitions do not appear here.
+     */
+    private final AtomicReference<Set<TopicPartition>> groupAssignmentSnapshot 
= new AtomicReference<>(Collections.emptySet());
     private final ConsumerMetadata metadata;
     private final Metrics metrics;
     private final long retryBackoffMs;
@@ -247,6 +250,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private boolean cachedSubscriptionHasAllFetchPositions;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
     private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
+    private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
     // Last triggered async commit future. Used to wait until all previous 
async commits are completed.
     // We only need to keep track of the last one, since they are guaranteed 
to complete in order.
     private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
lastPendingAsyncCommit = null;
@@ -256,6 +260,18 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
     private final AtomicInteger refCount = new AtomicInteger(0);
 
+    private final MemberStateListener memberStateListener = new 
MemberStateListener() {
+        @Override
+        public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String 
memberId) {
+            updateGroupMetadata(memberEpoch, memberId);
+        }
+
+        @Override
+        public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
+            setGroupAssignmentSnapshot(partitions);
+        }
+    };
+
     AsyncKafkaConsumer(final ConsumerConfig config,
                        final Deserializer<K> keyDeserializer,
                        final Deserializer<V> valueDeserializer) {
@@ -348,7 +364,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     clientTelemetryReporter,
                     metrics,
                     offsetCommitCallbackInvoker,
-                    this::updateGroupMetadata
+                    memberStateListener
             );
             final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier = 
ApplicationEventProcessor.supplier(logContext,
                     metadata,
@@ -363,15 +379,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     networkClientDelegateSupplier,
                     requestManagersSupplier);
 
-            ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new 
ConsumerRebalanceListenerInvoker(
+            this.rebalanceListenerInvoker = new 
ConsumerRebalanceListenerInvoker(
                     logContext,
                     subscriptions,
                     time,
                     new RebalanceCallbackMetricsManager(metrics)
             );
-            this.backgroundEventProcessor = new BackgroundEventProcessor(
-                    rebalanceListenerInvoker
-            );
+            this.backgroundEventProcessor = new BackgroundEventProcessor();
             this.backgroundEventReaper = 
backgroundEventReaperFactory.build(logContext);
 
             // The FetchCollector is only used on the application thread.
@@ -431,7 +445,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.interceptors = Objects.requireNonNull(interceptors);
         this.time = time;
         this.backgroundEventQueue = backgroundEventQueue;
-        this.backgroundEventProcessor = new 
BackgroundEventProcessor(rebalanceListenerInvoker);
+        this.rebalanceListenerInvoker = rebalanceListenerInvoker;
+        this.backgroundEventProcessor = new BackgroundEventProcessor();
         this.backgroundEventReaper = backgroundEventReaper;
         this.metrics = metrics;
         this.groupMetadata.set(initializeGroupMetadata(groupId, 
Optional.empty()));
@@ -490,7 +505,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
         this.backgroundEventQueue = new LinkedBlockingQueue<>();
         BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
-        ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new 
ConsumerRebalanceListenerInvoker(
+        this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
             logContext,
             subscriptions,
             time,
@@ -521,7 +536,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             clientTelemetryReporter,
             metrics,
             offsetCommitCallbackInvoker,
-            this::updateGroupMetadata
+            memberStateListener
         );
         Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier 
= ApplicationEventProcessor.supplier(
                 logContext,
@@ -536,7 +551,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 applicationEventProcessorSupplier,
                 networkClientDelegateSupplier,
                 requestManagersSupplier);
-        this.backgroundEventProcessor = new 
BackgroundEventProcessor(rebalanceListenerInvoker);
+        this.backgroundEventProcessor = new BackgroundEventProcessor();
         this.backgroundEventReaper = new CompletableEventReaper(logContext);
     }
 
@@ -639,6 +654,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         );
     }
 
+    void setGroupAssignmentSnapshot(final Set<TopicPartition> partitions) {
+        groupAssignmentSnapshot.set(Collections.unmodifiableSet(partitions));
+    }
+
     @Override
     public void registerMetricForSubscription(KafkaMetric metric) {
         if (clientTelemetryReporter.isPresent()) {
@@ -1216,6 +1235,68 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
+    /**
+     * Please keep these tenets in mind for the implementation of the {@link 
AsyncKafkaConsumer}’s
+     * {@link #close(Duration)} method. In the future, these tenets may be 
made officially part of the top-level
+     * {@link KafkaConsumer#close(Duration)} API, but for now they remain here.
+     *
+     * <ol>
+     *     <li>
+     *         The execution of the {@link ConsumerRebalanceListener} callback 
(if applicable) must be performed on
+     *         the application thread to ensure it does not interfere with the 
network I/O on the background thread.
+     *     </li>
+     *     <li>
+     *         The {@link ConsumerRebalanceListener} callback execution must 
complete before an attempt to leave
+     *         the consumer group is performed. In this context, “complete” 
does not necessarily imply
+     *         <em>success</em>; execution is “complete” even if the execution 
<em>fails</em> with an error.
+     *     </li>
+     *     <li>
+     *         Any error thrown during the {@link ConsumerRebalanceListener} 
callback execution will be caught to
+     *         ensure it does not prevent execution of the remaining {@link 
#close()} logic.
+     *     </li>
+     *     <li>
+     *         The application thread will be blocked during the entire 
duration of the execution of the
+     *         {@link ConsumerRebalanceListener}. The consumer does not employ 
a mechanism to short-circuit the
+     *         callback execution, so execution is not bound by the timeout in 
{@link #close(Duration)}.
+     *     </li>
+     *     <li>
+     *         A given {@link ConsumerRebalanceListener} implementation may be 
affected by the application thread's
+     *         interrupt state. If the callback implementation performs any 
blocking operations, it may result in
+     *         an error. An implementation may choose to preemptively check 
the thread's interrupt flag via
+     *         {@link Thread#isInterrupted()} or {@link 
Thread#isInterrupted()} and alter its behavior.
+     *     </li>
+     *     <li>
+     *         If the application thread was interrupted <em>prior</em> to the 
execution of the
+     *         {@link ConsumerRebalanceListener} callback, the thread's 
interrupt state will be preserved for the
+     *         {@link ConsumerRebalanceListener} execution.
+     *     </li>
+     *     <li>
+     *         If the application thread was interrupted <em>prior</em> to the 
execution of the
+     *         {@link ConsumerRebalanceListener} callback <em>but</em> the 
callback cleared out the interrupt state,
+     *         the {@link #close()} method will not make any effort to restore 
the application thread's interrupt
+     *         state for the remainder of the execution of {@link #close()}.
+     *     </li>
+     *     <li>
+     *         Leaving the consumer group is achieved by issuing a ‘leave 
group‘ network request. The consumer will
+     *         attempt to leave the group on a “best-case” basis. There is no 
stated guarantee that the consumer will
+     *         have successfully left the group before the {@link #close()} 
method completes processing.
+     *     </li>
+     *     <li>
+     *         The consumer will attempt to leave the group regardless of the 
timeout elapsing or the application
+     *         thread receiving an {@link InterruptException} or {@link 
InterruptedException}.
+     *     </li>
+     *     <li>
+     *         The application thread will wait for confirmation that the 
consumer left the group until one of the
+     *         following occurs:
+     *
+     *         <ol>
+     *             <li>Confirmation that the ’leave group‘ response was 
received from the group coordinator</li>
+     *             <li>The timeout provided by the user elapses</li>
+     *             <li>An {@link InterruptException} or {@link 
InterruptedException} is thrown</li>
+     *         </ol>
+     *     </li>
+     * </ol>
+     */
     private void close(Duration timeout, boolean swallowException) {
         log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
@@ -1227,9 +1308,15 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
         closeTimer.update();
         // Prepare shutting down the network thread
-        swallow(log, Level.ERROR, "Failed to release assignment before closing 
consumer",
-            () -> releaseAssignmentAndLeaveGroup(closeTimer), firstException);
-        swallow(log, Level.ERROR, "Failed invoking asynchronous commit 
callback.",
+        // Prior to closing the network thread, we need to make sure the 
following operations happen in the right
+        // sequence...
+        swallow(log, Level.ERROR, "Failed to auto-commit offsets",
+            () -> autoCommitOnClose(closeTimer), firstException);
+        swallow(log, Level.ERROR, "Failed to release group assignment",
+            () -> runRebalanceCallbacksOnClose(closeTimer), firstException);
+        swallow(log, Level.ERROR, "Failed to leave group while closing 
consumer",
+            () -> leaveGroupOnClose(closeTimer), firstException);
+        swallow(log, Level.ERROR, "Failed invoking asynchronous commit 
callbacks while closing consumer",
             () -> 
awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), 
firstException);
         if (applicationEventHandler != null)
             closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
@@ -1257,13 +1344,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
-    /**
-     * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
-     * 1. autocommit offsets
-     * 2. release assignment. This is done via a background unsubscribe event 
that will
-     * trigger the callbacks, clear the assignment on the subscription state 
and send the leave group request to the broker
-     */
-    private void releaseAssignmentAndLeaveGroup(final Timer timer) {
+    private void autoCommitOnClose(final Timer timer) {
         if (!groupMetadata.get().isPresent())
             return;
 
@@ -1271,18 +1352,48 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             commitSyncAllConsumed(timer);
 
         applicationEventHandler.add(new CommitOnCloseEvent());
+    }
+
+    private void runRebalanceCallbacksOnClose(final Timer timer) {
+        if (groupMetadata.get().isEmpty())
+            return;
+
+        int memberEpoch = groupMetadata.get().get().generationId();
+
+        Set<TopicPartition> assignedPartitions = groupAssignmentSnapshot.get();
+
+        if (assignedPartitions.isEmpty())
+            // Nothing to revoke.
+            return;
+
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        droppedPartitions.addAll(assignedPartitions);
+
+        try {
+            final Exception error;
+
+            if (memberEpoch > 0)
+                error = 
rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
+            else
+                error = 
rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
+
+            if (error != null)
+                throw ConsumerUtils.maybeWrapAsKafkaException(error);
+        } finally {
+            timer.update();
+        }
+    }
+
+    private void leaveGroupOnClose(final Timer timer) {
+        if (!groupMetadata.get().isPresent())
+            return;
 
-        log.info("Releasing assignment and leaving group before closing 
consumer");
-        UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
-        applicationEventHandler.add(unsubscribeEvent);
+        log.debug("Leaving the consumer group during consumer close");
         try {
-            // If users subscribe to an invalid topic name, they will get 
InvalidTopicException in error events,
-            // because network thread keeps trying to send MetadataRequest in 
the background.
-            // Ignore it to avoid unsubscribe failed.
-            processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e 
instanceof InvalidTopicException);
-            log.info("Completed releasing assignment and sending leave group 
to close consumer");
+            applicationEventHandler.addAndGet(new 
LeaveGroupOnCloseEvent(calculateDeadlineMs(timer)));
+            log.info("Completed leaving the group");
         } catch (TimeoutException e) {
-            log.warn("Consumer triggered an unsubscribe event to leave the 
group but couldn't " +
+            log.warn("Consumer attempted to leave the group but couldn't " +
                 "complete it within {} ms. It will proceed to close.", 
timer.timeoutMs());
         } finally {
             timer.update();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
index 8b977eb5c35..98b6271fcc0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
@@ -17,10 +17,13 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.TopicPartition;
+
 import java.util.Optional;
+import java.util.Set;
 
 /**
- * Listener for getting notified of member epoch changes.
+ * Listener for getting notified of membership state changes.
  */
 public interface MemberStateListener {
 
@@ -34,4 +37,14 @@ public interface MemberStateListener {
      * @param memberId    Current member ID. It won't change until the process 
is terminated.
      */
     void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId);
+
+    /**
+     * This callback is invoked when a group member's assigned set of 
partitions changes. Assignments can change via
+     * group coordinator partition assignment changes, unsubscribing, and when 
leaving the group.
+     *
+     * @param partitions New assignment, can be empty, but not {@code null}
+     */
+    default void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
+
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 310c7a3b8b1..8871694db45 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -100,7 +100,7 @@ public class SubscriptionState {
     private final OffsetResetStrategy defaultResetStrategy;
 
     /* User-provided listener to be invoked when assignment changes */
-    private Optional<ConsumerRebalanceListener> rebalanceListener;
+    private Optional<ConsumerRebalanceListener> rebalanceListener = 
Optional.empty();
 
     private int assignmentId = 0;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index e11e702388c..9bd229a3fe8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -33,7 +33,7 @@ public abstract class ApplicationEvent {
         LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, 
TOPIC_METADATA, ALL_TOPICS_METADATA,
         TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, 
UPDATE_SUBSCRIPTION_METADATA,
         UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
-        COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS,
+        COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE,
         SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
         SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
         SHARE_ACKNOWLEDGE_ON_CLOSE,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 642e5e0cca2..7bd2d1f28b7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -136,6 +136,10 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                 process((CommitOnCloseEvent) event);
                 return;
 
+            case LEAVE_GROUP_ON_CLOSE:
+                process((LeaveGroupOnCloseEvent) event);
+                return;
+
             case CREATE_FETCH_REQUESTS:
                 process((CreateFetchRequestsEvent) event);
                 return;
@@ -394,6 +398,15 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         requestManagers.commitRequestManager.get().signalClose();
     }
 
+    private void process(final LeaveGroupOnCloseEvent event) {
+        if (!requestManagers.consumerMembershipManager.isPresent())
+            return;
+
+        log.debug("Signal the ConsumerMembershipManager to leave the consumer 
group since the consumer is closing");
+        CompletableFuture<Void> future = 
requestManagers.consumerMembershipManager.get().leaveGroupOnClose();
+        future.whenComplete(complete(event.future()));
+    }
+
     /**
      * Process event that tells the share consume request manager to fetch 
more records.
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
new file mode 100644
index 00000000000..4afc00390d4
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+
+import java.time.Duration;
+
+/**
+ * When the user calls {@link Consumer#close()}, this event is sent to signal 
the {@link ConsumerMembershipManager}
+ * to perform the necessary steps to leave the consumer group cleanly, if 
possible. The event's timeout is based on
+ * either the user-provided value to {@link Consumer#close(Duration)} or
+ * {@link ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} if {@link Consumer#close()} 
was called. The event is considered
+ * complete when the membership manager receives the heartbeat response that 
it has left the group.
+ */
+public class LeaveGroupOnCloseEvent extends CompletableApplicationEvent<Void> {
+
+    public LeaveGroupOnCloseEvent(final long deadlineMs) {
+        super(Type.LEAVE_GROUP_ON_CLOSE, deadlineMs);
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 54a41587b06..eaf974b91c4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -44,6 +44,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEve
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
@@ -80,6 +81,7 @@ import org.junit.jupiter.api.function.Executable;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
 import org.mockito.MockedStatic;
@@ -636,12 +638,13 @@ public class AsyncKafkaConsumerTest {
         completeUnsubscribeApplicationEventSuccessfully();
         doReturn(null).when(applicationEventHandler).addAndGet(any());
         consumer.close();
-        verify(applicationEventHandler).add(any(UnsubscribeEvent.class));
         verify(applicationEventHandler).add(any(CommitOnCloseEvent.class));
+        
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
     }
 
-    @Test
-    public void testUnsubscribeOnClose() {
+    @ParameterizedTest
+    @ValueSource(longs = {0, ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS})
+    public void testCloseLeavesGroup(long timeoutMs) {
         SubscriptionState subscriptions = mock(SubscriptionState.class);
         consumer = spy(newConsumer(
             mock(FetchBuffer.class),
@@ -651,29 +654,64 @@ public class AsyncKafkaConsumerTest {
             "group-id",
             "client-id",
             false));
-        completeUnsubscribeApplicationEventSuccessfully();
-        consumer.close(Duration.ZERO);
-        verifyUnsubscribeEvent(subscriptions);
+        consumer.close(Duration.ofMillis(timeoutMs));
+        
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
     }
 
     @Test
-    public void testFailedPartitionRevocationOnClose() {
+    public void testCloseLeavesGroupDespiteOnPartitionsLostError() {
         // If rebalance listener failed to execute during close, we still send 
the leave group,
         // and proceed with closing the consumer.
+        Throwable rootError = new KafkaException("Intentional error");
+        Set<TopicPartition> partitions = singleton(new 
TopicPartition("topic1", 0));
         SubscriptionState subscriptions = mock(SubscriptionState.class);
+        when(subscriptions.assignedPartitions()).thenReturn(partitions);
+        ConsumerRebalanceListenerInvoker invoker = 
mock(ConsumerRebalanceListenerInvoker.class);
+        doAnswer(invocation -> 
rootError).when(invoker).invokePartitionsLost(any(SortedSet.class));
+
         consumer = spy(newConsumer(
             mock(FetchBuffer.class),
             new ConsumerInterceptors<>(Collections.emptyList()),
+            invoker,
+            subscriptions,
+            "group-id",
+            "client-id",
+            false));
+        consumer.setGroupAssignmentSnapshot(partitions);
+
+        Throwable t = assertThrows(KafkaException.class, () -> 
consumer.close(Duration.ZERO));
+        assertNotNull(t.getCause());
+        assertEquals(rootError, t.getCause());
+
+        
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
+    }
+
+    @ParameterizedTest
+    @ValueSource(longs = {0, ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS})
+    public void testCloseLeavesGroupDespiteInterrupt(long timeoutMs) {
+        Set<TopicPartition> partitions = singleton(new 
TopicPartition("topic1", 0));
+        SubscriptionState subscriptions = mock(SubscriptionState.class);
+        when(subscriptions.assignedPartitions()).thenReturn(partitions);
+        
when(applicationEventHandler.addAndGet(any(CompletableApplicationEvent.class))).thenThrow(InterruptException.class);
+        consumer = spy(newConsumer(
+            mock(FetchBuffer.class),
+            mock(ConsumerInterceptors.class),
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
             "group-id",
             "client-id",
             false));
-        doThrow(new 
KafkaException()).when(consumer).processBackgroundEvents(any(), any(), any());
-        assertThrows(KafkaException.class, () -> 
consumer.close(Duration.ZERO));
-        verifyUnsubscribeEvent(subscriptions);
-        // Close operation should carry on even if the unsubscribe fails
-        verify(applicationEventHandler).close(any(Duration.class));
+
+        Duration timeout = Duration.ofMillis(timeoutMs);
+
+        try {
+            assertThrows(InterruptException.class, () -> 
consumer.close(timeout));
+        } finally {
+            Thread.interrupted();
+        }
+
+        verify(applicationEventHandler).add(any(CommitOnCloseEvent.class));
+        
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
     }
 
     @Test
@@ -1577,9 +1615,11 @@ public class AsyncKafkaConsumerTest {
         final OffsetAndMetadata nextOffsetAndMetadata = new 
OffsetAndMetadata(4, Optional.of(0), "");
 
         // On the first iteration, return no data; on the second, return two 
records
+        Set<TopicPartition> partitions = singleton(tp);
         doAnswer(invocation -> {
             // Mock the subscription being assigned as the first fetch is 
collected
-            
consumer.subscriptions().assignFromSubscribed(Collections.singleton(tp));
+            consumer.subscriptions().assignFromSubscribed(partitions);
+            consumer.setGroupAssignmentSnapshot(partitions);
             return Fetch.empty();
         }).doAnswer(invocation ->
             Fetch.forPartition(tp, records, true, nextOffsetAndMetadata)
@@ -1593,7 +1633,7 @@ public class AsyncKafkaConsumerTest {
         assertEquals(Optional.of(0), 
returnedRecords.nextOffsets().get(tp).leaderEpoch());
 
         assertEquals(singleton(topicName), consumer.subscription());
-        assertEquals(singleton(tp), consumer.assignment());
+        assertEquals(partitions, consumer.assignment());
     }
 
     /**
@@ -1771,18 +1811,6 @@ public class AsyncKafkaConsumerTest {
         assertEquals(OffsetResetStrategy.LATEST, 
resetOffsetEvent.offsetResetStrategy());
     }
 
-    private void verifyUnsubscribeEvent(SubscriptionState subscriptions) {
-        // Check that an unsubscribe event was generated, and that the 
consumer waited for it to
-        // complete processing background events.
-        verify(applicationEventHandler).add(any(UnsubscribeEvent.class));
-        verify(consumer).processBackgroundEvents(any(), any(), any());
-
-        // The consumer should not clear the assignment in the app thread. The 
unsubscribe
-        // event is the one responsible for updating the assignment in the 
background when it
-        // completes.
-        verify(subscriptions, never()).assignFromSubscribed(any());
-    }
-
     private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
         final TopicPartition t0 = new TopicPartition("t0", 2);
         final TopicPartition t1 = new TopicPartition("t0", 3);
@@ -1875,7 +1903,8 @@ public class AsyncKafkaConsumerTest {
     private void completeAssignmentChangeEventSuccessfully() {
         doAnswer(invocation -> {
             AssignmentChangeEvent event = invocation.getArgument(0);
-            consumer.subscriptions().assignFromUser(new 
HashSet<>(event.partitions()));
+            HashSet<TopicPartition> partitions = new 
HashSet<>(event.partitions());
+            consumer.subscriptions().assignFromUser(partitions);
             event.future().complete(null);
             return null;
         
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(AssignmentChangeEvent.class));
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 6ce0f6c00de..cbf763ec581 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin.{NewPartitions, 
NewTopic}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.errors.{InvalidGroupIdException, 
InvalidTopicException, TimeoutException, WakeupException}
+import org.apache.kafka.common.errors.{InterruptException, 
InvalidGroupIdException, InvalidTopicException, TimeoutException, 
WakeupException}
 import org.apache.kafka.common.record.{CompressionType, TimestampType}
 import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.{MetricName, TopicPartition}
@@ -35,7 +35,7 @@ import org.junit.jupiter.api.Timeout
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
-import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
 import scala.jdk.CollectionConverters._
 
 @Timeout(600)
@@ -824,4 +824,49 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     assertThrows(classOf[WakeupException], () => 
consumer.position(topicPartition, Duration.ofSeconds(100)))
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCloseLeavesGroupOnInterrupt(quorum: String, groupProtocol: String): 
Unit = {
+    val adminClient = createAdminClient()
+    val consumer = createConsumer()
+    val listener = new TestConsumerReassignmentListener()
+    consumer.subscribe(List(topic).asJava, listener)
+    awaitRebalance(consumer, listener)
+
+    assertEquals(1, listener.callsToAssigned)
+    assertEquals(0, listener.callsToRevoked)
+
+    try {
+      Thread.currentThread().interrupt()
+      assertThrows(classOf[InterruptException], () => consumer.close())
+    } finally {
+      // Clear the interrupted flag so we don't create problems for subsequent 
tests.
+      Thread.interrupted()
+    }
+
+    assertEquals(1, listener.callsToAssigned)
+    assertEquals(1, listener.callsToRevoked)
+
+    val config = new ConsumerConfig(consumerConfig)
+
+    // Set the wait timeout to be only *half* the configured session timeout. 
This way we can make sure that the
+    // consumer explicitly left the group as opposed to being kicked out by 
the broker.
+    val leaveGroupTimeoutMs = 
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG) / 2
+
+    TestUtils.waitUntilTrue(
+      () => {
+        try {
+          val groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG)
+          val groupDescription = adminClient.describeConsumerGroups 
(Collections.singletonList (groupId) ).describedGroups.get (groupId).get
+          groupDescription.members.isEmpty
+        } catch {
+          case _: ExecutionException | _: InterruptedException =>
+            false
+        }
+      },
+      msg=s"Consumer did not leave the consumer group within 
$leaveGroupTimeoutMs ms of close",
+      waitTimeMs=leaveGroupTimeoutMs
+    )
+  }
 }


Reply via email to