This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 64cb839041f KAFKA-17853: Fix termination issue in ConsoleConsumer and
ConsoleShareConsumer (#19886)
64cb839041f is described below
commit 64cb839041facd4beddbb95656d0185807de8f3d
Author: Shivsundar R <[email protected]>
AuthorDate: Thu Nov 20 06:02:29 2025 -0500
KAFKA-17853: Fix termination issue in ConsoleConsumer and
ConsoleShareConsumer (#19886)
https://issues.apache.org/jira/browse/KAFKA-17853 -
- There is an issue with the console share consumer where if the broker
is unavailable, even after force terminating using ctrl-c, the consumer
does not shut down immediately. It takes around ~30 seconds to close
once the broker shuts down.
- The console consumer on the other hand, was supposedly shutting down
immediately once we press ctrl-c. On reproducing the issue with a local
kafka server, I observed the issue was present in both the console
consumer and the console share consumer.
Issue :
- On seeing the client debug logs, this issue seemed related to network
thread sending repeated `FindCoordinator` requests until the timer
expired. This was happening in both the console-consumer and
console-share-consumer.
- Debug logs showed that when the broker is shut down, the heartbeat
fails with a `DisconnectException`(which is retriable), this triggers a
`findCoordinator` request on the network thread which retries until the
default timeout expires.
- This request is sent even before we trigger a close on the consumer,
so once we press ctrl-c, although the `ConsumerNetworkThread::close()`
is triggered, it waits for the default timeout until all the requests
are sent out for a graceful shutdown.
PR aims to fix this issue by adding a check in `NetworkClientDelegate`
to remove any pending unsent requests(with empty node values) during
close. This would avoid unnecessary retries and the consumers would
shut down immediately upon termination.
Share consumers shutting down after the fix.
```
[2025-06-03 16:23:42,175] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
Removing unsent request
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-share-consumer',
keyType=0, coordinatorKeys=[]),
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@2b351de8,
node=Optional.empty, remainingMs=28565} because the client is closing
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
[2025-06-03 16:23:42,175] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
FindCoordinator request failed due to retriable exception
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.
[2025-06-03 16:23:42,176] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Closing
RequestManagers
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:23:42,177] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
RequestManagers has been closed
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:23:42,179] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Closed
the consumer network thread
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
[2025-06-03 16:23:42,181] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Kafka
share consumer has been closed
(org.apache.kafka.clients.consumer.internals.ShareConsumerImpl)
Processed a total of 0 messages
```
Regular consumers shutting down after the fix.
```
[2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Removing unsent request
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-consumer-5671',
keyType=0, coordinatorKeys=[]),
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@3770591b,
node=Optional.empty, remainingMs=29160} because the client is closing
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
[2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] FindCoordinator request failed due to
retriable exception
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Closing RequestManagers
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Removing test-topic-23-0 from buffered
fetch data as it is not in the set of partitions to retain ([])
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] RequestManagers has been closed
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:24:27,200] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Closed the consumer network thread
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
[2025-06-03 16:24:27,202] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Kafka consumer has been closed
(org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer)
Processed a total of 0 messages
```
Reviewers: Lianet Magrans <[email protected]>, Kirk True
<[email protected]>, Andrew Schofield <[email protected]>
---
.../clients/consumer/PlaintextConsumerTest.java | 59 ++++++++++++++++++++++
.../kafka/clients/consumer/ShareConsumerTest.java | 23 +++++++++
.../consumer/internals/ConsumerNetworkThread.java | 2 +-
.../consumer/internals/NetworkClientDelegate.java | 24 +++++++--
.../internals/ShareConsumeRequestManager.java | 3 ++
.../internals/ConsumerNetworkThreadTest.java | 3 +-
.../internals/FetchRequestManagerTest.java | 2 +-
.../internals/NetworkClientDelegateTest.java | 46 +++++++++++++++++
.../internals/ShareConsumeRequestManagerTest.java | 43 ++++++++++++----
9 files changed, 190 insertions(+), 15 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index bd92f0c5685..37b5e809d21 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -87,6 +87,7 @@ import static
org.apache.kafka.clients.CommonClientConfigs.METADATA_MAX_AGE_CONF
import static
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
@@ -109,6 +110,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -181,6 +183,63 @@ public class PlaintextConsumerTest {
testCoordinatorFailover(cluster, config);
}
+ @ClusterTest(
+ brokers = 1,
+ serverProperties = {
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG,
value = "1"),
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ @ClusterConfigProperty(key =
"transaction.state.log.replication.factor", value = "1"),
+ @ClusterConfigProperty(key = "transaction.state.log.min.isr",
value = "1")
+ }
+ )
+ public void testClassicConsumerCloseOnBrokerShutdown() {
+ Map<String, Object> config = Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ );
+ testConsumerCloseOnBrokerShutdown(config);
+ }
+
+ @ClusterTest(
+ brokers = 1,
+ serverProperties = {
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG,
value = "1"),
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ @ClusterConfigProperty(key =
"transaction.state.log.replication.factor", value = "1"),
+ @ClusterConfigProperty(key = "transaction.state.log.min.isr",
value = "1")
+ }
+ )
+ public void testAsyncConsumerCloseOnBrokerShutdown() {
+ Map<String, Object> config = Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ ENABLE_AUTO_COMMIT_CONFIG, false
+ );
+ // Disabling auto commit so that commitSync() does not block the close
timeout.
+ testConsumerCloseOnBrokerShutdown(config);
+ }
+
+ private void testConsumerCloseOnBrokerShutdown(Map<String, Object>
consumerConfig) {
+ try (Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)) {
+ consumer.subscribe(List.of(TOPIC));
+
+ // Force consumer to discover coordinator by doing a poll
+ // This ensures coordinator is discovered before we shutdown the
broker
+ consumer.poll(Duration.ofMillis(100));
+
+ // Now shutdown broker.
+ assertEquals(1, cluster.brokers().size());
+ KafkaBroker broker = cluster.brokers().get(0);
+ cluster.shutdownBroker(0);
+ broker.awaitShutdown();
+
+ // Do another poll to force the consumer to retry finding the
coordinator.
+ consumer.poll(Duration.ofMillis(100));
+
+ // Close should not hang waiting for retries when broker is
already down
+ assertTimeoutPreemptively(Duration.ofSeconds(5), () ->
consumer.close(),
+ "Consumer close should not wait for full timeout when
broker is already shutdown");
+ }
+ }
+
@ClusterTest
public void testClassicConsumerHeaders() throws Exception {
testHeaders(Map.of(
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 47b35492d35..3437624c499 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -113,6 +113,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -1252,6 +1253,28 @@ public class ShareConsumerTest {
}
}
+ @ClusterTest
+ public void testConsumerCloseOnBrokerShutdown() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1");
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ // To ensure coordinator discovery is complete before shutting down
the broker
+ shareConsumer.poll(Duration.ofMillis(100));
+
+ // Shutdown the broker.
+ assertEquals(1, cluster.brokers().size());
+ KafkaBroker broker = cluster.brokers().get(0);
+ cluster.shutdownBroker(0);
+
+ broker.awaitShutdown();
+
+ // Assert that close completes in less than 5 seconds, not the full
30-second timeout.
+ assertTimeoutPreemptively(Duration.ofSeconds(5), () -> {
+ shareConsumer.close();
+ }, "Consumer close should not wait for full timeout when broker is
already shutdown");
+ }
+
@ClusterTest
public void testMultipleConsumersInGroupSequentialConsumption() {
alterShareAutoOffsetReset("group1", "earliest");
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index 47a7627d294..4e944b2a4e6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -395,7 +395,7 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
return;
do {
- networkClientDelegate.poll(timer.remainingMs(),
timer.currentTimeMs());
+ networkClientDelegate.poll(timer.remainingMs(),
timer.currentTimeMs(), true);
timer.update();
} while (timer.notExpired() &&
networkClientDelegate.hasAnyPendingRequests());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 762718b2035..42c1c73acb5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
@@ -137,13 +138,25 @@ public class NetworkClientDelegate implements
AutoCloseable {
}
/**
- * Returns the responses of the sent requests. This method will try to
send the unsent requests, poll for responses,
+ * This method will try to send the unsent requests, poll for responses,
* and check the disconnected nodes.
*
* @param timeoutMs timeout time
* @param currentTimeMs current time
*/
public void poll(final long timeoutMs, final long currentTimeMs) {
+ poll(timeoutMs, currentTimeMs, false);
+ }
+
+ /**
+ * This method will try to send the unsent requests, poll for responses,
+ * and check the disconnected nodes.
+ *
+ * @param timeoutMs timeout time
+ * @param currentTimeMs current time
+ * @param onClose True when the network thread is closing.
+ */
+ public void poll(final long timeoutMs, final long currentTimeMs, boolean
onClose) {
trySend(currentTimeMs);
long pollTimeoutMs = timeoutMs;
@@ -152,7 +165,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
}
this.client.poll(pollTimeoutMs, currentTimeMs);
maybePropagateMetadataError();
- checkDisconnects(currentTimeMs);
+ checkDisconnects(currentTimeMs, onClose);
asyncConsumerMetrics.recordUnsentRequestsQueueSize(unsentRequests.size(),
currentTimeMs);
}
@@ -219,7 +232,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
return true;
}
- protected void checkDisconnects(final long currentTimeMs) {
+ protected void checkDisconnects(final long currentTimeMs, boolean onClose)
{
// Check the connection of the unsent request. Disconnect the
disconnected node if it is unable to be connected.
Iterator<UnsentRequest> iter = unsentRequests.iterator();
while (iter.hasNext()) {
@@ -229,6 +242,11 @@ public class NetworkClientDelegate implements
AutoCloseable {
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
AuthenticationException authenticationException =
client.authenticationException(u.node.get());
u.handler.onFailure(currentTimeMs, authenticationException);
+ } else if (u.node.isEmpty() && onClose) {
+ log.debug("Removing unsent request {} because the client is
closing", u);
+ iter.remove();
+
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
+ u.handler.onFailure(currentTimeMs,
Errors.NETWORK_EXCEPTION.exception());
}
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index db68877556b..c727cfad625 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -136,6 +136,9 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
@Override
public PollResult poll(long currentTimeMs) {
if (memberId == null) {
+ if (closing && !closeFuture.isDone()) {
+ closeFuture.complete(null);
+ }
return PollResult.EMPTY;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 88004ebbcd7..142f3f9bf8f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentMatchers;
import java.time.Duration;
import java.util.LinkedList;
@@ -203,7 +204,7 @@ public class ConsumerNetworkThreadTest {
public void testSendUnsentRequests() {
when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false);
consumerNetworkThread.cleanup();
- verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
+ verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong(),
ArgumentMatchers.booleanThat(onClose -> onClose));
}
@ParameterizedTest
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index 23a63985883..f8e53148b01 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -4266,7 +4266,7 @@ public class FetchRequestManagerTest {
}
@Override
- protected void checkDisconnects(final long currentTimeMs) {
+ protected void checkDisconnects(final long currentTimeMs, boolean
onClose) {
// any disconnects affecting requests that have already been
transmitted will be handled
// by NetworkClient, so we just need to check whether connections
for any of the unsent
// requests have been disconnected; if they have, then we complete
the corresponding future
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
index 0347423137b..9ff52599b57 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
@@ -27,6 +27,7 @@ import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.metrics.Metrics;
@@ -282,6 +283,51 @@ public class NetworkClientDelegateTest {
}
}
+ @Test
+ public void testPollWithOnClose() throws Exception {
+ try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
+ NetworkClientDelegate.UnsentRequest unsentRequest =
newUnsentFindCoordinatorRequest();
+ ncd.add(unsentRequest);
+
+ // First poll without onClose
+ ncd.poll(0, time.milliseconds());
+ assertTrue(ncd.hasAnyPendingRequests());
+
+ // Poll with onClose=true
+ ncd.poll(0, time.milliseconds(), true);
+ assertTrue(ncd.hasAnyPendingRequests());
+
+ // Complete the request
+
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, GROUP_ID,
mockNode()));
+ ncd.poll(0, time.milliseconds(), true);
+ assertFalse(ncd.hasAnyPendingRequests());
+ }
+ }
+
+ @Test
+ public void testCheckDisconnectsWithOnClose() throws Exception {
+ try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
+ NetworkClientDelegate.UnsentRequest unsentRequest =
newUnsentFindCoordinatorRequest();
+ ncd.add(unsentRequest);
+
+ // Mark node as disconnected
+ Node node = mockNode();
+ client.setUnreachable(node, REQUEST_TIMEOUT_MS);
+
+ // Poll with onClose=false (default)
+ ncd.poll(0, time.milliseconds());
+ assertTrue(ncd.hasAnyPendingRequests());
+
+ // Poll with onClose=true
+ ncd.poll(0, time.milliseconds(), true);
+
+ // Verify the request is absent since we're removing unsent
requests on close.
+ assertFalse(ncd.hasAnyPendingRequests());
+ assertTrue(unsentRequest.future().isDone());
+ TestUtils.assertFutureThrows(NetworkException.class,
unsentRequest.future());
+ }
+ }
+
public NetworkClientDelegate newNetworkClientDelegate(boolean
notifyMetadataErrorsViaErrorQueue) {
return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue,
mock(AsyncConsumerMetrics.class));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index dc05257bd6e..e024818c427 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -402,7 +402,7 @@ public class ShareConsumeRequestManagerTest {
// Remaining acknowledgements sent with close().
Acknowledgements acknowledgements2 = getAcknowledgements(2,
AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
- shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements2)),
+ CompletableFuture<Void> closeFuture =
shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements2)),
calculateDeadlineMs(time.timer(100)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@@ -416,6 +416,25 @@ public class ShareConsumeRequestManagerTest {
// Verifying that all 3 offsets were acknowledged as part of the final
ShareAcknowledge on close.
assertEquals(mergedAcks.getAcknowledgementsTypeMap(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap());
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ // Polling once more to complete the closeFuture.
+ shareConsumeRequestManager.sendFetches();
+ assertTrue(closeFuture.isDone());
+ }
+
+ @Test
+ public void testCloseFutureCompletedWhenMemberIdIsNull() {
+ buildRequestManager(new MetricConfig(), new ByteArrayDeserializer(),
new ByteArrayDeserializer(), null, ShareAcquireMode.BATCH_OPTIMIZED);
+ assignFromSubscribed(Collections.singleton(tp0));
+
+ CompletableFuture<Void> closeFuture =
shareConsumeRequestManager.acknowledgeOnClose(Map.of(),
+ calculateDeadlineMs(time.timer(100)));
+
+ assertFalse(closeFuture.isDone());
+
+ // The subsequent poll should complete the closeFuture as the memberId
is null.
+ shareConsumeRequestManager.sendFetches();
+ assertTrue(closeFuture.isDone());
}
@Test
@@ -2466,7 +2485,7 @@ public class ShareConsumeRequestManagerTest {
.setErrorCode(Errors.NONE.code()));
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
partitionData = buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
-
+
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2731,17 +2750,18 @@ public class ShareConsumeRequestManagerTest {
private <K, V> void buildRequestManager(Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
ShareAcquireMode shareAcquireMode)
{
- buildRequestManager(new MetricConfig(), keyDeserializer,
valueDeserializer, shareAcquireMode);
+ buildRequestManager(new MetricConfig(), keyDeserializer,
valueDeserializer, Uuid.randomUuid().toString(), shareAcquireMode);
}
private <K, V> void buildRequestManager(MetricConfig metricConfig,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
+ String memberId,
ShareAcquireMode shareAcquireMode)
{
LogContext logContext = new LogContext();
SubscriptionState subscriptionState = new
SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST);
buildRequestManager(metricConfig, keyDeserializer, valueDeserializer,
- subscriptionState, logContext, shareAcquireMode);
+ subscriptionState, logContext, memberId, shareAcquireMode);
}
private <K, V> void buildRequestManager(MetricConfig metricConfig,
@@ -2749,7 +2769,8 @@ public class ShareConsumeRequestManagerTest {
Deserializer<V> valueDeserializer,
SubscriptionState
subscriptionState,
LogContext logContext,
- ShareAcquireMode shareAcquireMode)
{
+ String memberId,
+
ShareAcquireMode shareAcquireMode) {
buildDependencies(metricConfig, subscriptionState, logContext);
Deserializers<K, V> deserializers = new
Deserializers<>(keyDeserializer, valueDeserializer, metrics);
int maxWaitMs = 0;
@@ -2781,7 +2802,8 @@ public class ShareConsumeRequestManagerTest {
new ShareFetchBuffer(logContext),
acknowledgementEventHandler,
metricsManager,
- shareFetchCollector));
+ shareFetchCollector,
+ memberId));
}
private void buildDependencies(MetricConfig metricConfig,
@@ -2820,11 +2842,14 @@ public class ShareConsumeRequestManagerTest {
ShareFetchBuffer
shareFetchBuffer,
ShareAcknowledgementEventHandler acknowledgementEventHandler,
ShareFetchMetricsManager
metricsManager,
- ShareFetchCollector<K, V>
fetchCollector) {
+ ShareFetchCollector<K, V>
fetchCollector,
+ String memberId) {
super(time, logContext, groupId, metadata, subscriptions,
shareFetchConfig, shareFetchBuffer,
acknowledgementEventHandler, metricsManager, retryBackoffMs,
1000);
this.shareFetchCollector = fetchCollector;
- onMemberEpochUpdated(Optional.empty(),
Uuid.randomUuid().toString());
+ if (memberId != null) {
+ onMemberEpochUpdated(Optional.empty(), memberId);
+ }
}
private ShareFetch<K, V> collectFetch() {
@@ -2914,7 +2939,7 @@ public class ShareConsumeRequestManagerTest {
}
@Override
- protected void checkDisconnects(final long currentTimeMs) {
+ protected void checkDisconnects(final long currentTimeMs, boolean
onClose) {
// any disconnects affecting requests that have already been
transmitted will be handled
// by NetworkClient, so we just need to check whether connections
for any of the unsent
// requests have been disconnected; if they have, then we complete
the corresponding future