This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b015a83f6d2 KAFKA-17017 AsyncKafkaConsumer#unsubscribe clean the 
assigned partitions (#16449)
b015a83f6d2 is described below

commit b015a83f6d286965af999c39c0902584fb6fb9de
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Jul 17 04:18:33 2024 +0800

    KAFKA-17017 AsyncKafkaConsumer#unsubscribe clean the assigned partitions 
(#16449)
    
    Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 24 ++++----
 .../consumer/internals/MembershipManagerImpl.java  |  1 +
 .../consumer/internals/ShareConsumerImpl.java      |  2 +
 .../events/ApplicationEventProcessor.java          | 25 ++++++---
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  2 +-
 .../consumer/internals/AsyncKafkaConsumerTest.java |  9 +++
 .../consumer/internals/ConsumerTestBuilder.java    |  3 +-
 .../internals/MembershipManagerImplTest.java       | 19 +++++++
 .../internals/ShareConsumerTestBuilder.java        |  3 +-
 .../events/ApplicationEventProcessorTest.java      | 64 +++++++++++++---------
 .../kafka/api/PlaintextConsumerTest.scala          |  4 ++
 11 files changed, 105 insertions(+), 51 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 056c837f80d..cca3fbc44b8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -354,6 +354,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             );
             final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier = 
ApplicationEventProcessor.supplier(logContext,
                     metadata,
+                    subscriptions,
                     requestManagersSupplier);
             this.applicationEventHandler = 
applicationEventHandlerFactory.build(
                     logContext,
@@ -531,6 +532,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier 
= ApplicationEventProcessor.supplier(
                 logContext,
                 metadata,
+                subscriptions,
                 requestManagersSupplier
         );
         this.applicationEventHandler = new ApplicationEventHandler(logContext,
@@ -1476,21 +1478,19 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         acquireAndEnsureOpen();
         try {
             fetchBuffer.retainAll(Collections.emptySet());
-            if (groupMetadata.get().isPresent()) {
-                Timer timer = time.timer(Long.MAX_VALUE);
-                UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
-                applicationEventHandler.add(unsubscribeEvent);
-                log.info("Unsubscribing all topics or patterns and assigned 
partitions {}",
+            Timer timer = time.timer(Long.MAX_VALUE);
+            UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
+            applicationEventHandler.add(unsubscribeEvent);
+            log.info("Unsubscribing all topics or patterns and assigned 
partitions {}",
                     subscriptions.assignedPartitions());
 
-                try {
-                    processBackgroundEvents(unsubscribeEvent.future(), timer);
-                    log.info("Unsubscribed all topics or patterns and assigned 
partitions");
-                } catch (TimeoutException e) {
-                    log.error("Failed while waiting for the unsubscribe event 
to complete");
-                }
-                resetGroupMetadata();
+            try {
+                processBackgroundEvents(unsubscribeEvent.future(), timer);
+                log.info("Unsubscribed all topics or patterns and assigned 
partitions");
+            } catch (TimeoutException e) {
+                log.error("Failed while waiting for the unsubscribe event to 
complete");
             }
+            resetGroupMetadata();
         } catch (Exception e) {
             log.error("Unsubscribe failed", e);
             throw e;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 9f0dfda6a2f..702460e3493 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -663,6 +663,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
                 clearAssignment();
                 transitionTo(MemberState.UNSUBSCRIBED);
             }
+            subscriptions.unsubscribe();
             return CompletableFuture.completedFuture(null);
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index f11ac6b336e..9977d61fe14 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -298,6 +298,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
                     logContext,
                     metadata,
+                    subscriptions,
                     requestManagersSupplier
             );
 
@@ -398,6 +399,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
                 logContext,
                 metadata,
+                subscriptions,
                 requestManagersSupplier
         );
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 5c4649841fd..e0c6911f679 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
 import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicIdPartition;
@@ -49,14 +50,17 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
 
     private final Logger log;
     private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
     private final RequestManagers requestManagers;
 
     public ApplicationEventProcessor(final LogContext logContext,
                                      final RequestManagers requestManagers,
-                                     final ConsumerMetadata metadata) {
+                                     final ConsumerMetadata metadata,
+                                     final SubscriptionState subscriptions) {
         this.log = logContext.logger(ApplicationEventProcessor.class);
         this.requestManagers = requestManagers;
         this.metadata = metadata;
+        this.subscriptions = subscriptions;
     }
 
     @SuppressWarnings({"CyclomaticComplexity"})
@@ -241,14 +245,15 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      *              the group is sent out.
      */
     private void process(final UnsubscribeEvent event) {
-        if (!requestManagers.heartbeatRequestManager.isPresent()) {
-            KafkaException error = new KafkaException("Group membership 
manager not present when processing an unsubscribe event");
-            event.future().completeExceptionally(error);
-            return;
+        if (requestManagers.heartbeatRequestManager.isPresent()) {
+            MembershipManager membershipManager = 
requestManagers.heartbeatRequestManager.get().membershipManager();
+            CompletableFuture<Void> future = membershipManager.leaveGroup();
+            future.whenComplete(complete(event.future()));
+        } else {
+            // If the consumer is not using the group management capabilities, 
we still need to clear all assignments it may have.
+            subscriptions.unsubscribe();
+            event.future().complete(null);
         }
-        MembershipManager membershipManager = 
requestManagers.heartbeatRequestManager.get().membershipManager();
-        CompletableFuture<Void> future = membershipManager.leaveGroup();
-        future.whenComplete(complete(event.future()));
     }
 
     private void process(final ResetPositionsEvent event) {
@@ -393,6 +398,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      */
     public static Supplier<ApplicationEventProcessor> supplier(final 
LogContext logContext,
                                                                final 
ConsumerMetadata metadata,
+                                                               final 
SubscriptionState subscriptions,
                                                                final 
Supplier<RequestManagers> requestManagersSupplier) {
         return new CachedSupplier<ApplicationEventProcessor>() {
             @Override
@@ -401,7 +407,8 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                 return new ApplicationEventProcessor(
                         logContext,
                         requestManagers,
-                        metadata
+                        metadata,
+                        subscriptions
                 );
             }
         };
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 5510e9922da..d8de05a839f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -439,7 +439,7 @@ public class KafkaConsumerTest {
     }
 
     @ParameterizedTest
-    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    @EnumSource(GroupProtocol.class)
     public void testSubscription(GroupProtocol groupProtocol) {
         consumer = newConsumer(groupProtocol, groupId);
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 192f2655a65..7ef0b28108e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -2077,6 +2077,15 @@ public class AsyncKafkaConsumerTest {
         verify(backgroundEventReaper).reap(time.milliseconds());
     }
 
+    @Test
+    public void testUnsubscribeWithoutGroupId() {
+        consumer = newConsumerWithoutGroupId();
+
+        completeUnsubscribeApplicationEventSuccessfully();
+        consumer.unsubscribe();
+        
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
+    }
+
     private void verifyUnsubscribeEvent(SubscriptionState subscriptions) {
         // Check that an unsubscribe event was generated, and that the 
consumer waited for it to
         // complete processing background events.
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index 86cb44d7485..0e6ff3ebd9e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -271,7 +271,8 @@ public class ConsumerTestBuilder implements Closeable {
         this.applicationEventProcessor = spy(new ApplicationEventProcessor(
                 logContext,
                 requestManagers,
-                metadata
+                metadata,
+                subscriptions
             )
         );
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index c00488ef40d..5f828bcd3dd 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -180,6 +180,7 @@ public class MembershipManagerImplTest {
         receiveEmptyAssignment(manager);
         mockLeaveGroup();
         manager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertEquals(MemberState.LEAVING, manager.state());
         manager.onHeartbeatRequestSent();
         assertEquals(MemberState.UNSUBSCRIBED, manager.state());
@@ -235,6 +236,9 @@ public class MembershipManagerImplTest {
         membershipManager.transitionToFatal();
         assertEquals(MemberState.FATAL, membershipManager.state());
         verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+
+        membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
     }
 
     @Test
@@ -286,6 +290,7 @@ public class MembershipManagerImplTest {
 
         mockLeaveGroup();
         membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertEquals(MemberState.LEAVING, membershipManager.state());
         verify(listener).onMemberEpochUpdated(Optional.empty(), 
Optional.empty());
     }
@@ -394,6 +399,7 @@ public class MembershipManagerImplTest {
         // Start leaving group.
         mockLeaveGroup();
         membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertEquals(MemberState.LEAVING, membershipManager.state());
 
         // Get fenced while leaving. Member should not trigger any callback or 
try to
@@ -420,6 +426,7 @@ public class MembershipManagerImplTest {
         MembershipManagerImpl membershipManager = 
createMemberInStableState("instance1");
         mockLeaveGroup();
         membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertEquals(MemberState.LEAVING, membershipManager.state());
         
assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH,
                 membershipManager.memberEpoch());
@@ -428,6 +435,7 @@ public class MembershipManagerImplTest {
         membershipManager = createMemberInStableState(null);
         mockLeaveGroup();
         membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertEquals(MemberState.LEAVING, membershipManager.state());
         assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
                 membershipManager.memberEpoch());
@@ -883,6 +891,7 @@ public class MembershipManagerImplTest {
         mockLeaveGroup();
 
         CompletableFuture<Void> leaveResult = membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
 
         
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(createAssignment(true)).data());
 
@@ -962,6 +971,7 @@ public class MembershipManagerImplTest {
         // callbacks complete and the heartbeat is sent out.
         mockLeaveGroup();
         CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertFalse(leaveResult1.isDone());
         assertEquals(MemberState.LEAVING, membershipManager.state());
         verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
@@ -972,6 +982,7 @@ public class MembershipManagerImplTest {
         // leave operation completes.
         mockLeaveGroup();
         CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup();
+        verify(subscriptionState, never()).unsubscribe();
         verify(subscriptionState, never()).rebalanceListener();
         assertFalse(leaveResult2.isDone());
 
@@ -993,6 +1004,7 @@ public class MembershipManagerImplTest {
         // Leave group triggered and completed
         mockLeaveGroup();
         CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertEquals(MemberState.LEAVING, membershipManager.state());
         membershipManager.onHeartbeatRequestSent();
         assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
@@ -1005,6 +1017,7 @@ public class MembershipManagerImplTest {
         // no assignment updated)
         mockLeaveGroup();
         CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertTrue(leaveResult2.isDone());
         assertFalse(leaveResult2.isCompletedExceptionally());
         assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
@@ -1021,6 +1034,7 @@ public class MembershipManagerImplTest {
 
         mockLeaveGroup();
         membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
         verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
 
@@ -1035,6 +1049,7 @@ public class MembershipManagerImplTest {
 
         mockLeaveGroup();
         CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertTrue(leaveResult1.isDone());
         assertEquals(MemberState.STALE, membershipManager.state());
     }
@@ -1076,6 +1091,7 @@ public class MembershipManagerImplTest {
         // Start leaving group.
         mockLeaveGroup();
         membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertEquals(MemberState.LEAVING, membershipManager.state());
 
         // Get fatal failure while waiting to send the heartbeat to leave. 
Member should
@@ -1095,6 +1111,7 @@ public class MembershipManagerImplTest {
         // Start leaving group.
         mockLeaveGroup();
         membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
         assertEquals(MemberState.LEAVING, membershipManager.state());
 
         // Last heartbeat sent.
@@ -2121,6 +2138,7 @@ public class MembershipManagerImplTest {
         assertTrue(membershipManager.currentAssignment().isNone());
         assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
         assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
membershipManager.memberEpoch());
+        verify(subscriptionState, never()).unsubscribe();
     }
 
     @Test
@@ -2665,6 +2683,7 @@ public class MembershipManagerImplTest {
         mockLeaveGroup();
 
         CompletableFuture<Void> leaveResult = membershipManager.leaveGroup();
+        verify(subscriptionState).unsubscribe();
 
         assertEquals(MemberState.LEAVING, membershipManager.state());
         assertFalse(leaveResult.isDone(), "Leave group result should not 
complete until the " +
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java
index 25c9f5a4a88..ecdd3297de1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java
@@ -213,7 +213,8 @@ public class ShareConsumerTestBuilder implements Closeable {
         this.applicationEventProcessor = spy(new ApplicationEventProcessor(
                         logContext,
                         requestManagers,
-                        metadata
+                        metadata,
+                        subscriptions
                 )
         );
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 9edfab9a0fb..bb0719080c3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -25,62 +25,72 @@ import 
org.apache.kafka.clients.consumer.internals.MembershipManager;
 import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
 import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
 import org.apache.kafka.common.utils.LogContext;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ApplicationEventProcessorTest {
-    private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+    private final CommitRequestManager commitRequestManager = 
mock(CommitRequestManager.class);
+    private final HeartbeatRequestManager heartbeatRequestManager = 
mock(HeartbeatRequestManager.class);
+    private final MembershipManager membershipManager = 
mock(MembershipManager.class);
+    private final SubscriptionState subscriptionState = 
mock(SubscriptionState.class);
     private ApplicationEventProcessor processor;
-    private CommitRequestManager commitRequestManager;
-    private HeartbeatRequestManager heartbeatRequestManager;
-    private MembershipManager membershipManager;
 
-    @BeforeEach
-    public void setup() {
-        LogContext logContext = new LogContext();
-        OffsetsRequestManager offsetsRequestManager = 
mock(OffsetsRequestManager.class);
-        TopicMetadataRequestManager topicMetadataRequestManager = 
mock(TopicMetadataRequestManager.class);
-        FetchRequestManager fetchRequestManager = 
mock(FetchRequestManager.class);
-        CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
-        commitRequestManager = mock(CommitRequestManager.class);
-        heartbeatRequestManager = mock(HeartbeatRequestManager.class);
-        membershipManager = mock(MembershipManager.class);
+    private void setupProcessor(boolean withGroupId) {
         RequestManagers requestManagers = new RequestManagers(
-            logContext,
-            offsetsRequestManager,
-            topicMetadataRequestManager,
-            fetchRequestManager,
-            Optional.of(coordinatorRequestManager),
-            Optional.of(commitRequestManager),
-            Optional.of(heartbeatRequestManager),
-            Optional.of(membershipManager)
-        );
+                new LogContext(),
+                mock(OffsetsRequestManager.class),
+                mock(TopicMetadataRequestManager.class),
+                mock(FetchRequestManager.class),
+                withGroupId ? 
Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(),
+                withGroupId ? Optional.of(commitRequestManager) : 
Optional.empty(),
+                withGroupId ? Optional.of(heartbeatRequestManager) : 
Optional.empty(),
+                withGroupId ? Optional.of(membershipManager) : 
Optional.empty());
         processor = new ApplicationEventProcessor(
-            new LogContext(),
-            requestManagers,
-            metadata
+                new LogContext(),
+                requestManagers,
+                mock(ConsumerMetadata.class),
+                subscriptionState
         );
     }
 
     @Test
     public void testPrepClosingCommitEvents() {
+        setupProcessor(true);
         List<NetworkClientDelegate.UnsentRequest> results = 
mockCommitResults();
         doReturn(new NetworkClientDelegate.PollResult(100, 
results)).when(commitRequestManager).pollOnClose();
         processor.process(new CommitOnCloseEvent());
         verify(commitRequestManager).signalClose();
     }
 
+    @Test
+    public void testProcessUnsubscribeEventWithGroupId() {
+        setupProcessor(true);
+        
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+        
when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null));
+        processor.process(new UnsubscribeEvent(0));
+        verify(membershipManager).leaveGroup();
+    }
+
+    @Test
+    public void testProcessUnsubscribeEventWithoutGroupId() {
+        setupProcessor(false);
+        processor.process(new UnsubscribeEvent(0));
+        verify(subscriptionState).unsubscribe();
+    }
+
     private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
         return 
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index b4617af1503..0fbee055025 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -693,6 +693,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer2.unsubscribe()
     consumer3.unsubscribe()
 
+    assertTrue(consumer1.assignment().isEmpty)
+    assertTrue(consumer2.assignment().isEmpty)
+    assertTrue(consumer3.assignment().isEmpty)
+
     consumer1.close()
     consumer2.close()
     consumer3.close()

Reply via email to