This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b015a83f6d2 KAFKA-17017 AsyncKafkaConsumer#unsubscribe clean the
assigned partitions (#16449)
b015a83f6d2 is described below
commit b015a83f6d286965af999c39c0902584fb6fb9de
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Jul 17 04:18:33 2024 +0800
KAFKA-17017 AsyncKafkaConsumer#unsubscribe clean the assigned partitions
(#16449)
Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 24 ++++----
.../consumer/internals/MembershipManagerImpl.java | 1 +
.../consumer/internals/ShareConsumerImpl.java | 2 +
.../events/ApplicationEventProcessor.java | 25 ++++++---
.../kafka/clients/consumer/KafkaConsumerTest.java | 2 +-
.../consumer/internals/AsyncKafkaConsumerTest.java | 9 +++
.../consumer/internals/ConsumerTestBuilder.java | 3 +-
.../internals/MembershipManagerImplTest.java | 19 +++++++
.../internals/ShareConsumerTestBuilder.java | 3 +-
.../events/ApplicationEventProcessorTest.java | 64 +++++++++++++---------
.../kafka/api/PlaintextConsumerTest.scala | 4 ++
11 files changed, 105 insertions(+), 51 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 056c837f80d..cca3fbc44b8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -354,6 +354,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
);
final Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier =
ApplicationEventProcessor.supplier(logContext,
metadata,
+ subscriptions,
requestManagersSupplier);
this.applicationEventHandler =
applicationEventHandlerFactory.build(
logContext,
@@ -531,6 +532,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier
= ApplicationEventProcessor.supplier(
logContext,
metadata,
+ subscriptions,
requestManagersSupplier
);
this.applicationEventHandler = new ApplicationEventHandler(logContext,
@@ -1476,21 +1478,19 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
acquireAndEnsureOpen();
try {
fetchBuffer.retainAll(Collections.emptySet());
- if (groupMetadata.get().isPresent()) {
- Timer timer = time.timer(Long.MAX_VALUE);
- UnsubscribeEvent unsubscribeEvent = new
UnsubscribeEvent(calculateDeadlineMs(timer));
- applicationEventHandler.add(unsubscribeEvent);
- log.info("Unsubscribing all topics or patterns and assigned
partitions {}",
+ Timer timer = time.timer(Long.MAX_VALUE);
+ UnsubscribeEvent unsubscribeEvent = new
UnsubscribeEvent(calculateDeadlineMs(timer));
+ applicationEventHandler.add(unsubscribeEvent);
+ log.info("Unsubscribing all topics or patterns and assigned
partitions {}",
subscriptions.assignedPartitions());
- try {
- processBackgroundEvents(unsubscribeEvent.future(), timer);
- log.info("Unsubscribed all topics or patterns and assigned
partitions");
- } catch (TimeoutException e) {
- log.error("Failed while waiting for the unsubscribe event
to complete");
- }
- resetGroupMetadata();
+ try {
+ processBackgroundEvents(unsubscribeEvent.future(), timer);
+ log.info("Unsubscribed all topics or patterns and assigned
partitions");
+ } catch (TimeoutException e) {
+ log.error("Failed while waiting for the unsubscribe event to
complete");
}
+ resetGroupMetadata();
} catch (Exception e) {
log.error("Unsubscribe failed", e);
throw e;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 9f0dfda6a2f..702460e3493 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -663,6 +663,7 @@ public class MembershipManagerImpl implements
MembershipManager {
clearAssignment();
transitionTo(MemberState.UNSUBSCRIBED);
}
+ subscriptions.unsubscribe();
return CompletableFuture.completedFuture(null);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index f11ac6b336e..9977d61fe14 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -298,6 +298,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
logContext,
metadata,
+ subscriptions,
requestManagersSupplier
);
@@ -398,6 +399,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
logContext,
metadata,
+ subscriptions,
requestManagersSupplier
);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 5c4649841fd..e0c6911f679 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -27,6 +27,7 @@ import
org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicIdPartition;
@@ -49,14 +50,17 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
private final Logger log;
private final ConsumerMetadata metadata;
+ private final SubscriptionState subscriptions;
private final RequestManagers requestManagers;
public ApplicationEventProcessor(final LogContext logContext,
final RequestManagers requestManagers,
- final ConsumerMetadata metadata) {
+ final ConsumerMetadata metadata,
+ final SubscriptionState subscriptions) {
this.log = logContext.logger(ApplicationEventProcessor.class);
this.requestManagers = requestManagers;
this.metadata = metadata;
+ this.subscriptions = subscriptions;
}
@SuppressWarnings({"CyclomaticComplexity"})
@@ -241,14 +245,15 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* the group is sent out.
*/
private void process(final UnsubscribeEvent event) {
- if (!requestManagers.heartbeatRequestManager.isPresent()) {
- KafkaException error = new KafkaException("Group membership
manager not present when processing an unsubscribe event");
- event.future().completeExceptionally(error);
- return;
+ if (requestManagers.heartbeatRequestManager.isPresent()) {
+ MembershipManager membershipManager =
requestManagers.heartbeatRequestManager.get().membershipManager();
+ CompletableFuture<Void> future = membershipManager.leaveGroup();
+ future.whenComplete(complete(event.future()));
+ } else {
+ // If the consumer is not using the group management capabilities,
we still need to clear all assignments it may have.
+ subscriptions.unsubscribe();
+ event.future().complete(null);
}
- MembershipManager membershipManager =
requestManagers.heartbeatRequestManager.get().membershipManager();
- CompletableFuture<Void> future = membershipManager.leaveGroup();
- future.whenComplete(complete(event.future()));
}
private void process(final ResetPositionsEvent event) {
@@ -393,6 +398,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
*/
public static Supplier<ApplicationEventProcessor> supplier(final
LogContext logContext,
final
ConsumerMetadata metadata,
+ final
SubscriptionState subscriptions,
final
Supplier<RequestManagers> requestManagersSupplier) {
return new CachedSupplier<ApplicationEventProcessor>() {
@Override
@@ -401,7 +407,8 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
return new ApplicationEventProcessor(
logContext,
requestManagers,
- metadata
+ metadata,
+ subscriptions
);
}
};
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 5510e9922da..d8de05a839f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -439,7 +439,7 @@ public class KafkaConsumerTest {
}
@ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ @EnumSource(GroupProtocol.class)
public void testSubscription(GroupProtocol groupProtocol) {
consumer = newConsumer(groupProtocol, groupId);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 192f2655a65..7ef0b28108e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -2077,6 +2077,15 @@ public class AsyncKafkaConsumerTest {
verify(backgroundEventReaper).reap(time.milliseconds());
}
+ @Test
+ public void testUnsubscribeWithoutGroupId() {
+ consumer = newConsumerWithoutGroupId();
+
+ completeUnsubscribeApplicationEventSuccessfully();
+ consumer.unsubscribe();
+
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
+ }
+
private void verifyUnsubscribeEvent(SubscriptionState subscriptions) {
// Check that an unsubscribe event was generated, and that the
consumer waited for it to
// complete processing background events.
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index 86cb44d7485..0e6ff3ebd9e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -271,7 +271,8 @@ public class ConsumerTestBuilder implements Closeable {
this.applicationEventProcessor = spy(new ApplicationEventProcessor(
logContext,
requestManagers,
- metadata
+ metadata,
+ subscriptions
)
);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index c00488ef40d..5f828bcd3dd 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -180,6 +180,7 @@ public class MembershipManagerImplTest {
receiveEmptyAssignment(manager);
mockLeaveGroup();
manager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertEquals(MemberState.LEAVING, manager.state());
manager.onHeartbeatRequestSent();
assertEquals(MemberState.UNSUBSCRIBED, manager.state());
@@ -235,6 +236,9 @@ public class MembershipManagerImplTest {
membershipManager.transitionToFatal();
assertEquals(MemberState.FATAL, membershipManager.state());
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+
+ membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
}
@Test
@@ -286,6 +290,7 @@ public class MembershipManagerImplTest {
mockLeaveGroup();
membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertEquals(MemberState.LEAVING, membershipManager.state());
verify(listener).onMemberEpochUpdated(Optional.empty(),
Optional.empty());
}
@@ -394,6 +399,7 @@ public class MembershipManagerImplTest {
// Start leaving group.
mockLeaveGroup();
membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertEquals(MemberState.LEAVING, membershipManager.state());
// Get fenced while leaving. Member should not trigger any callback or
try to
@@ -420,6 +426,7 @@ public class MembershipManagerImplTest {
MembershipManagerImpl membershipManager =
createMemberInStableState("instance1");
mockLeaveGroup();
membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertEquals(MemberState.LEAVING, membershipManager.state());
assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH,
membershipManager.memberEpoch());
@@ -428,6 +435,7 @@ public class MembershipManagerImplTest {
membershipManager = createMemberInStableState(null);
mockLeaveGroup();
membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertEquals(MemberState.LEAVING, membershipManager.state());
assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
membershipManager.memberEpoch());
@@ -883,6 +891,7 @@ public class MembershipManagerImplTest {
mockLeaveGroup();
CompletableFuture<Void> leaveResult = membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(createAssignment(true)).data());
@@ -962,6 +971,7 @@ public class MembershipManagerImplTest {
// callbacks complete and the heartbeat is sent out.
mockLeaveGroup();
CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertFalse(leaveResult1.isDone());
assertEquals(MemberState.LEAVING, membershipManager.state());
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
@@ -972,6 +982,7 @@ public class MembershipManagerImplTest {
// leave operation completes.
mockLeaveGroup();
CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup();
+ verify(subscriptionState, never()).unsubscribe();
verify(subscriptionState, never()).rebalanceListener();
assertFalse(leaveResult2.isDone());
@@ -993,6 +1004,7 @@ public class MembershipManagerImplTest {
// Leave group triggered and completed
mockLeaveGroup();
CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertEquals(MemberState.LEAVING, membershipManager.state());
membershipManager.onHeartbeatRequestSent();
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
@@ -1005,6 +1017,7 @@ public class MembershipManagerImplTest {
// no assignment updated)
mockLeaveGroup();
CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertTrue(leaveResult2.isDone());
assertFalse(leaveResult2.isCompletedExceptionally());
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
@@ -1021,6 +1034,7 @@ public class MembershipManagerImplTest {
mockLeaveGroup();
membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
@@ -1035,6 +1049,7 @@ public class MembershipManagerImplTest {
mockLeaveGroup();
CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertTrue(leaveResult1.isDone());
assertEquals(MemberState.STALE, membershipManager.state());
}
@@ -1076,6 +1091,7 @@ public class MembershipManagerImplTest {
// Start leaving group.
mockLeaveGroup();
membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertEquals(MemberState.LEAVING, membershipManager.state());
// Get fatal failure while waiting to send the heartbeat to leave.
Member should
@@ -1095,6 +1111,7 @@ public class MembershipManagerImplTest {
// Start leaving group.
mockLeaveGroup();
membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertEquals(MemberState.LEAVING, membershipManager.state());
// Last heartbeat sent.
@@ -2121,6 +2138,7 @@ public class MembershipManagerImplTest {
assertTrue(membershipManager.currentAssignment().isNone());
assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
assertEquals(LEAVE_GROUP_MEMBER_EPOCH,
membershipManager.memberEpoch());
+ verify(subscriptionState, never()).unsubscribe();
}
@Test
@@ -2665,6 +2683,7 @@ public class MembershipManagerImplTest {
mockLeaveGroup();
CompletableFuture<Void> leaveResult = membershipManager.leaveGroup();
+ verify(subscriptionState).unsubscribe();
assertEquals(MemberState.LEAVING, membershipManager.state());
assertFalse(leaveResult.isDone(), "Leave group result should not
complete until the " +
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java
index 25c9f5a4a88..ecdd3297de1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java
@@ -213,7 +213,8 @@ public class ShareConsumerTestBuilder implements Closeable {
this.applicationEventProcessor = spy(new ApplicationEventProcessor(
logContext,
requestManagers,
- metadata
+ metadata,
+ subscriptions
)
);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 9edfab9a0fb..bb0719080c3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -25,62 +25,72 @@ import
org.apache.kafka.clients.consumer.internals.MembershipManager;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
import org.apache.kafka.common.utils.LogContext;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class ApplicationEventProcessorTest {
- private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+ private final CommitRequestManager commitRequestManager =
mock(CommitRequestManager.class);
+ private final HeartbeatRequestManager heartbeatRequestManager =
mock(HeartbeatRequestManager.class);
+ private final MembershipManager membershipManager =
mock(MembershipManager.class);
+ private final SubscriptionState subscriptionState =
mock(SubscriptionState.class);
private ApplicationEventProcessor processor;
- private CommitRequestManager commitRequestManager;
- private HeartbeatRequestManager heartbeatRequestManager;
- private MembershipManager membershipManager;
- @BeforeEach
- public void setup() {
- LogContext logContext = new LogContext();
- OffsetsRequestManager offsetsRequestManager =
mock(OffsetsRequestManager.class);
- TopicMetadataRequestManager topicMetadataRequestManager =
mock(TopicMetadataRequestManager.class);
- FetchRequestManager fetchRequestManager =
mock(FetchRequestManager.class);
- CoordinatorRequestManager coordinatorRequestManager =
mock(CoordinatorRequestManager.class);
- commitRequestManager = mock(CommitRequestManager.class);
- heartbeatRequestManager = mock(HeartbeatRequestManager.class);
- membershipManager = mock(MembershipManager.class);
+ private void setupProcessor(boolean withGroupId) {
RequestManagers requestManagers = new RequestManagers(
- logContext,
- offsetsRequestManager,
- topicMetadataRequestManager,
- fetchRequestManager,
- Optional.of(coordinatorRequestManager),
- Optional.of(commitRequestManager),
- Optional.of(heartbeatRequestManager),
- Optional.of(membershipManager)
- );
+ new LogContext(),
+ mock(OffsetsRequestManager.class),
+ mock(TopicMetadataRequestManager.class),
+ mock(FetchRequestManager.class),
+ withGroupId ?
Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(),
+ withGroupId ? Optional.of(commitRequestManager) :
Optional.empty(),
+ withGroupId ? Optional.of(heartbeatRequestManager) :
Optional.empty(),
+ withGroupId ? Optional.of(membershipManager) :
Optional.empty());
processor = new ApplicationEventProcessor(
- new LogContext(),
- requestManagers,
- metadata
+ new LogContext(),
+ requestManagers,
+ mock(ConsumerMetadata.class),
+ subscriptionState
);
}
@Test
public void testPrepClosingCommitEvents() {
+ setupProcessor(true);
List<NetworkClientDelegate.UnsentRequest> results =
mockCommitResults();
doReturn(new NetworkClientDelegate.PollResult(100,
results)).when(commitRequestManager).pollOnClose();
processor.process(new CommitOnCloseEvent());
verify(commitRequestManager).signalClose();
}
+ @Test
+ public void testProcessUnsubscribeEventWithGroupId() {
+ setupProcessor(true);
+
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+
when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null));
+ processor.process(new UnsubscribeEvent(0));
+ verify(membershipManager).leaveGroup();
+ }
+
+ @Test
+ public void testProcessUnsubscribeEventWithoutGroupId() {
+ setupProcessor(false);
+ processor.process(new UnsubscribeEvent(0));
+ verify(subscriptionState).unsubscribe();
+ }
+
private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index b4617af1503..0fbee055025 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -693,6 +693,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer2.unsubscribe()
consumer3.unsubscribe()
+ assertTrue(consumer1.assignment().isEmpty)
+ assertTrue(consumer2.assignment().isEmpty)
+ assertTrue(consumer3.assignment().isEmpty)
+
consumer1.close()
consumer2.close()
consumer3.close()