This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 403f6c1caa4 KAFKA-19840: Handle exceptions in acknowledgement callback
(#20880)
403f6c1caa4 is described below
commit 403f6c1caa4aa63c14c8da32ad98eff3a58d5853
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Nov 14 19:03:36 2025 +0000
KAFKA-19840: Handle exceptions in acknowledgement callback (#20880)
Catch exceptions from the acknowledgement commit callback. Previously,
the exceptions were thrown by the methods of the KafkaShareConsumer
interface, but this was not described in the KIP and did not make sense
in practice.
Reviewers: Apoorv Mittal <[email protected]>, ShivsundarR
<[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 28 ++--
.../kafka/clients/consumer/KafkaShareConsumer.java | 2 +-
.../AcknowledgementCommitCallbackHandler.java | 13 +-
.../consumer/internals/RequestManagers.java | 4 +-
.../internals/ShareConsumeRequestManager.java | 74 +++-------
.../consumer/internals/ShareConsumerImpl.java | 161 ++++++++++++---------
.../consumer/internals/events/BackgroundEvent.java | 2 -
...ckEvent.java => ShareAcknowledgementEvent.java} | 21 ++-
.../events/ShareAcknowledgementEventHandler.java | 60 ++++++++
.../ShareRenewAcknowledgementsCompleteEvent.java | 42 ------
.../internals/ShareConsumeRequestManagerTest.java | 30 ++--
.../consumer/internals/ShareConsumerImplTest.java | 15 +-
12 files changed, 236 insertions(+), 216 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 4cc6c195eac..da253fa1c42 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -58,16 +58,15 @@ import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
-import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
@@ -1621,10 +1620,9 @@ public class ShareConsumerTest {
}
/**
- * Test to verify that the acknowledgement commit callback can throw an
exception, and it is propagated
+ * Test to verify that the acknowledgement commit callback can throw an
exception, and it is not propagated
* to the caller of poll().
*/
- @Flaky("KAFKA-19840") // https://issues.apache.org/jira/browse/KAFKA-19840
@ClusterTest
public void testAcknowledgementCommitCallbackThrowsException() throws
InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
@@ -1635,28 +1633,35 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
- shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallbackThrows<>());
+ AtomicBoolean callbackCalled = new AtomicBoolean(false);
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallbackThrows(callbackCalled));
shareConsumer.subscribe(Set.of(tp.topic()));
TestUtils.waitForCondition(() ->
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records
for share consumer");
- AtomicBoolean exceptionThrown = new AtomicBoolean(false);
TestUtils.waitForCondition(() -> {
try {
shareConsumer.poll(Duration.ofMillis(500));
- } catch
(org.apache.kafka.common.errors.OutOfOrderSequenceException e) {
- exceptionThrown.set(true);
+ } catch (Exception e) {
+ throw new NoRetryException(e);
}
- return exceptionThrown.get();
- }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected
exception");
+ return callbackCalled.get();
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Received unexpected exception
or callback not called");
verifyShareGroupStateTopicRecordsProduced();
}
}
- private static class TestableAcknowledgementCommitCallbackThrows<K, V>
implements AcknowledgementCommitCallback {
+ private static class TestableAcknowledgementCommitCallbackThrows
implements AcknowledgementCommitCallback {
+ private final AtomicBoolean callbackCalled;
+
+ public TestableAcknowledgementCommitCallbackThrows(AtomicBoolean
callbackCalled) {
+ this.callbackCalled = callbackCalled;
+ }
+
@Override
public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap,
Exception exception) {
+ callbackCalled.set(true);
throw new
org.apache.kafka.common.errors.OutOfOrderSequenceException("Exception thrown in
TestableAcknowledgementCommitCallbackThrows.onComplete");
}
}
@@ -2339,7 +2344,6 @@ public class ShareConsumerTest {
verifyShareGroupStateTopicRecordsProduced();
}
- @Disabled("KAFKA-19840") //
https://issues.apache.org/jira/browse/KAFKA-19840
@ClusterTest(
brokers = 1,
serverProperties = {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index 7bab150f17f..0d6b08910d2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -555,7 +555,7 @@ public class KafkaShareConsumer<K, V> implements
ShareConsumer<K, V> {
* the acknowledgements to commit have been indicated using {@link
#acknowledge(ConsumerRecord)} or
* {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer
is using implicit acknowledgement,
* all the records returned by the latest call to {@link #poll(Duration)}
are acknowledged.
-
+ *
* <p>
* This is a synchronous commit and will block until either the commit
succeeds, an unrecoverable error is
* encountered (in which case it is thrown to the caller), or the timeout
expires.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java
index b746e1a2135..2f4d7b4c2be 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java
@@ -23,8 +23,6 @@ import org.apache.kafka.common.TopicIdPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,23 +42,18 @@ public class AcknowledgementCommitCallbackHandler {
}
void onComplete(List<Map<TopicIdPartition, Acknowledgements>>
acknowledgementsMapList) {
- final ArrayList<Throwable> exceptions = new ArrayList<>();
acknowledgementsMapList.forEach(acknowledgementsMap ->
acknowledgementsMap.forEach((partition, acknowledgements) -> {
KafkaException exception =
acknowledgements.getAcknowledgeException();
Set<Long> offsets =
acknowledgements.getAcknowledgementsTypeMap().keySet();
- Set<Long> offsetsCopy = Collections.unmodifiableSet(offsets);
+ Set<Long> offsetsCopy = Set.copyOf(offsets);
enteredCallback = true;
try {
-
acknowledgementCommitCallback.onComplete(Collections.singletonMap(partition,
offsetsCopy), exception);
- } catch (Throwable e) {
+ acknowledgementCommitCallback.onComplete(Map.of(partition,
offsetsCopy), exception);
+ } catch (Exception e) {
LOG.error("Exception thrown by acknowledgement commit
callback", e);
- exceptions.add(e);
} finally {
enteredCallback = false;
}
}));
- if (!exceptions.isEmpty()) {
- throw ConsumerUtils.maybeWrapAsKafkaException(exceptions.get(0),
"Exception thrown by acknowledgement commit callback");
- }
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 7f7dbe2fa5e..e9585d2d806 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
@@ -317,6 +318,7 @@ public class RequestManagers implements Closeable {
@SuppressWarnings({"checkstyle:ParameterNumber"})
public static Supplier<RequestManagers> supplier(final Time time,
final LogContext
logContext,
+ final
ShareAcknowledgementEventHandler shareAcknowledgementEventHandler,
final
BackgroundEventHandler backgroundEventHandler,
final
ShareConsumerMetadata metadata,
final SubscriptionState
subscriptions,
@@ -371,7 +373,7 @@ public class RequestManagers implements Closeable {
subscriptions,
shareFetchConfig,
fetchBuffer,
- backgroundEventHandler,
+ shareAcknowledgementEventHandler,
shareFetchMetricsManager,
retryBackoffMs,
retryBackoffMaxMs);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index eff925d342e..332492245e7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -20,9 +20,8 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
-import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
@@ -83,7 +82,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
private final SubscriptionState subscriptions;
private final ShareFetchConfig shareFetchConfig;
protected final ShareFetchBuffer shareFetchBuffer;
- private final BackgroundEventHandler backgroundEventHandler;
+ private final ShareAcknowledgementEventHandler acknowledgeEventHandler;
private final Map<Integer, ShareSessionHandler> sessionHandlers;
private final Set<Integer> nodesWithPendingRequests;
private final ShareFetchMetricsManager metricsManager;
@@ -108,7 +107,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
final SubscriptionState subscriptions,
final ShareFetchConfig shareFetchConfig,
final ShareFetchBuffer shareFetchBuffer,
- final BackgroundEventHandler
backgroundEventHandler,
+ final ShareAcknowledgementEventHandler
acknowledgeEventHandler,
final ShareFetchMetricsManager metricsManager,
final long retryBackoffMs,
final long retryBackoffMaxMs) {
@@ -120,7 +119,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
this.subscriptions = subscriptions;
this.shareFetchConfig = shareFetchConfig;
this.shareFetchBuffer = shareFetchBuffer;
- this.backgroundEventHandler = backgroundEventHandler;
+ this.acknowledgeEventHandler = acknowledgeEventHandler;
this.metricsManager = metricsManager;
this.retryBackoffMs = retryBackoffMs;
this.retryBackoffMaxMs = retryBackoffMaxMs;
@@ -228,8 +227,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
} else {
log.debug("Leader for the partition is down or
has changed, failing Acknowledgements for partition {}", tip);
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
-
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acks));
+ maybeSendShareAcknowledgementEvent(Map.of(tip,
acks), true);
}
});
@@ -276,8 +274,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
// Failing the acknowledgements as we cannot have piggybacked
acknowledgements in the initial ShareFetchRequest.
log.debug("Cannot send acknowledgements on initial epoch for
ShareSession for partition {}", tip);
acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
- sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip,
acknowledgements));
- maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip,
acknowledgements));
+ maybeSendShareAcknowledgementEvent(Map.of(tip, acknowledgements),
true);
return false;
} else {
metricsManager.recordAcknowledgementSent(acknowledgements.size());
@@ -385,18 +382,13 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
this.isAcknowledgementCommitCallbackRegistered =
isAcknowledgementCommitCallbackRegistered;
}
- private void
maybeSendShareAcknowledgeCommitCallbackEvent(Map<TopicIdPartition,
Acknowledgements> acknowledgementsMap) {
- if (isAcknowledgementCommitCallbackRegistered) {
- ShareAcknowledgementCommitCallbackEvent event = new
ShareAcknowledgementCommitCallbackEvent(acknowledgementsMap);
- backgroundEventHandler.add(event);
+ private void maybeSendShareAcknowledgementEvent(Map<TopicIdPartition,
Acknowledgements> acknowledgementsMap, boolean checkForRenewAcknowledgements) {
+ if (isAcknowledgementCommitCallbackRegistered ||
checkForRenewAcknowledgements) {
+ ShareAcknowledgementEvent event = new
ShareAcknowledgementEvent(acknowledgementsMap, checkForRenewAcknowledgements);
+ acknowledgeEventHandler.add(event);
}
}
- private void
sendShareRenewAcknowledgementsCompleteEvent(Map<TopicIdPartition,
Acknowledgements> acknowledgementsMap) {
- ShareRenewAcknowledgementsCompleteEvent event = new
ShareRenewAcknowledgementsCompleteEvent(acknowledgementsMap);
- backgroundEventHandler.add(event);
- }
-
/**
*
* @param acknowledgeRequestState Contains the acknowledgements to be sent.
@@ -544,8 +536,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
resultCount.incrementAndGet();
} else {
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()));
-
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()));
+ maybeSendShareAcknowledgementEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()), true);
}
}
}
@@ -564,7 +555,6 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
));
}
}
-
});
resultHandler.completeIfEmpty();
@@ -621,8 +611,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
} else {
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()));
-
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()));
+ maybeSendShareAcknowledgementEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()), true);
}
}
}
@@ -660,8 +649,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
} else {
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
- sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip,
nodeAcks.acknowledgements()));
- maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip,
nodeAcks.acknowledgements()));
+ maybeSendShareAcknowledgementEvent(Map.of(tip,
nodeAcks.acknowledgements()), true);
}
});
@@ -680,8 +668,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
} else {
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
-
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acks));
+ maybeSendShareAcknowledgementEvent(Map.of(tip,
acks), true);
}
});
}
@@ -770,17 +757,13 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
metadata.requestUpdate(false);
}
// Complete any in-flight acknowledgements with the error code
from the response.
- Map<TopicIdPartition, Acknowledgements>
nodeAcknowledgementsInFlight =
fetchAcknowledgementsInFlight.get(fetchTarget.id());
+ Map<TopicIdPartition, Acknowledgements>
nodeAcknowledgementsInFlight =
fetchAcknowledgementsInFlight.remove(fetchTarget.id());
if (nodeAcknowledgementsInFlight != null) {
nodeAcknowledgementsInFlight.forEach((tip, acks) -> {
acks.complete(Errors.forCode(response.error().code()).exception());
metricsManager.recordFailedAcknowledgements(acks.size());
});
- if (requestData.isRenewAck()) {
-
sendShareRenewAcknowledgementsCompleteEvent(nodeAcknowledgementsInFlight);
- }
-
maybeSendShareAcknowledgeCommitCallbackEvent(nodeAcknowledgementsInFlight);
- nodeAcknowledgementsInFlight.clear();
+
maybeSendShareAcknowledgementEvent(nodeAcknowledgementsInFlight,
requestData.isRenewAck());
}
return;
}
@@ -818,10 +801,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
acks.complete(Errors.forCode(partitionData.acknowledgeErrorCode())
.exception(partitionData.acknowledgeErrorMessage()));
Map<TopicIdPartition, Acknowledgements> acksMap =
Map.of(tip, acks);
- if (requestData.isRenewAck()) {
-
sendShareRenewAcknowledgementsCompleteEvent(acksMap);
- }
- maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
+ maybeSendShareAcknowledgementEvent(acksMap,
requestData.isRenewAck());
}
}
@@ -858,8 +838,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) {
fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition,
acknowledgements) -> {
acknowledgements.complete(new
InvalidRecordStateException(INVALID_RESPONSE));
-
sendShareRenewAcknowledgementsCompleteEvent(Map.of(partition,
acknowledgements));
-
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition,
acknowledgements));
+ maybeSendShareAcknowledgementEvent(Map.of(partition,
acknowledgements), true);
});
}
@@ -906,10 +885,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
acks.complete(Errors.UNKNOWN_SERVER_ERROR.exception());
}
Map<TopicIdPartition, Acknowledgements> acksMap =
Map.of(tip, acks);
- if (requestData.isRenewAck()) {
-
sendShareRenewAcknowledgementsCompleteEvent(nodeAcknowledgementsInFlight);
- }
- maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
+ maybeSendShareAcknowledgementEvent(acksMap,
requestData.isRenewAck());
}
}
}));
@@ -1439,20 +1415,14 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
public void complete(TopicIdPartition partition, Acknowledgements
acknowledgements, AcknowledgeRequestType type, boolean isRenewAck) {
if (type.equals(AcknowledgeRequestType.COMMIT_ASYNC)) {
if (acknowledgements != null) {
- if (isRenewAck) {
-
sendShareRenewAcknowledgementsCompleteEvent(Map.of(partition,
acknowledgements));
- }
-
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition,
acknowledgements));
+ maybeSendShareAcknowledgementEvent(Map.of(partition,
acknowledgements), isRenewAck);
}
} else {
if (acknowledgements != null) {
result.put(partition, acknowledgements);
}
if (remainingResults != null &&
remainingResults.decrementAndGet() == 0) {
- if (isRenewAck) {
- sendShareRenewAcknowledgementsCompleteEvent(result);
- }
- maybeSendShareAcknowledgeCommitCallbackEvent(result);
+ maybeSendShareAcknowledgementEvent(result, isRenewAck);
future.ifPresent(future -> future.complete(result));
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index a2274b8e9cd..079a6280055 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ShareConsumer;
@@ -41,11 +40,11 @@ import
org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -121,16 +120,34 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
private static final long NO_CURRENT_THREAD = -1L;
+ /**
+ * An {@link
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is
created and executes in the
+ * application thread for the purpose of processing {@link
ShareAcknowledgementEvent share acknowledgement events}
+ * generated by the {@link ConsumerNetworkThread network thread}. These
are distinct from background events because
+ * they do not carry exception information which needs to be thrown in the
application thread. As a result, they
+ * are held on a separate queue. Share acknowledgement events are used
when there is an acknowledgement commit
+ * callback registered, and when renew acknowledgements are being used.
+ */
+ private class ShareAcknowledgementEventProcessor implements
EventProcessor<ShareAcknowledgementEvent> {
+
+ public ShareAcknowledgementEventProcessor() {}
+
+ @Override
+ public void process(final ShareAcknowledgementEvent event) {
+ if (acknowledgementCommitCallbackHandler != null) {
+ completedAcknowledgements.add(event.acknowledgementsMap());
+ }
+ if (event.checkForRenewAcknowledgements()) {
+ currentFetch.renew(event.acknowledgementsMap());
+ }
+ }
+ }
+
/**
* An {@link
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is
created and executes in the
* application thread for the purpose of processing {@link BackgroundEvent
background events} generated by the
- * {@link ConsumerNetworkThread network thread}.
- * Those events are generally of two types:
- *
- * <ul>
- * <li>Errors that occur in the network thread that need to be
propagated to the application thread</li>
- * <li>{@link ConsumerRebalanceListener} callbacks that are to be
executed on the application thread</li>
- * </ul>
+ * {@link ConsumerNetworkThread network thread}. These are errors that
occur in the network thread that need to be
+ * propagated to the application thread.
*/
private class BackgroundEventProcessor implements
EventProcessor<BackgroundEvent> {
@@ -138,37 +155,16 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
@Override
public void process(final BackgroundEvent event) {
- switch (event.type()) {
- case ERROR:
- process((ErrorEvent) event);
- break;
-
- case SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK:
- process((ShareAcknowledgementCommitCallbackEvent) event);
- break;
-
- case SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE:
- process((ShareRenewAcknowledgementsCompleteEvent) event);
- break;
-
- default:
- throw new IllegalArgumentException("Background event type
" + event.type() + " was not expected");
+ if (event.type() == BackgroundEvent.Type.ERROR) {
+ process((ErrorEvent) event);
+ } else {
+ throw new IllegalArgumentException("Background event type " +
event.type() + " was not expected");
}
}
private void process(final ErrorEvent event) {
throw event.error();
}
-
- private void process(final ShareAcknowledgementCommitCallbackEvent
event) {
- if (acknowledgementCommitCallbackHandler != null) {
- completedAcknowledgements.add(event.acknowledgementsMap());
- }
- }
-
- private void process(final ShareRenewAcknowledgementsCompleteEvent
event) {
- currentFetch.renew(event.acknowledgementsMap());
- }
}
private final ApplicationEventHandler applicationEventHandler;
@@ -178,6 +174,9 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
private Logger log;
private final String clientId;
private final String groupId;
+ private final ShareAcknowledgementEventHandler acknowledgementEventHandler;
+ private final BlockingQueue<ShareAcknowledgementEvent>
acknowledgementEventQueue;
+ private final ShareAcknowledgementEventProcessor
acknowledgementEventProcessor;
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
private final BackgroundEventHandler backgroundEventHandler;
private final BackgroundEventProcessor backgroundEventProcessor;
@@ -225,6 +224,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
ApplicationEventHandler::new,
CompletableEventReaper::new,
ShareFetchCollector::new,
+ new LinkedBlockingQueue<>(),
new LinkedBlockingQueue<>()
);
}
@@ -237,6 +237,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final ApplicationEventHandlerFactory
applicationEventHandlerFactory,
final AsyncKafkaConsumer.CompletableEventReaperFactory
backgroundEventReaperFactory,
final ShareFetchCollectorFactory<K, V>
fetchCollectorFactory,
+ final LinkedBlockingQueue<ShareAcknowledgementEvent>
acknowledgementEventQueue,
final LinkedBlockingQueue<BackgroundEvent>
backgroundEventQueue) {
try {
GroupRebalanceConfig groupRebalanceConfig = new
GroupRebalanceConfig(
@@ -247,6 +248,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
maybeThrowInvalidGroupIdException();
LogContext logContext = createLogContext(clientId, groupId);
+ this.acknowledgementEventQueue = acknowledgementEventQueue;
this.backgroundEventQueue = backgroundEventQueue;
this.log = logContext.logger(getClass());
@@ -274,6 +276,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
ShareFetchMetricsManager shareFetchMetricsManager =
createShareFetchMetricsManager(metrics);
ApiVersions apiVersions = new ApiVersions();
final BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
+ this.acknowledgementEventHandler = new
ShareAcknowledgementEventHandler(acknowledgementEventQueue);
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
@@ -297,6 +300,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final Supplier<RequestManagers> requestManagersSupplier =
RequestManagers.supplier(
time,
logContext,
+ acknowledgementEventHandler,
backgroundEventHandler,
metadata,
subscriptions,
@@ -324,6 +328,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
requestManagersSupplier,
asyncConsumerMetrics);
+ this.acknowledgementEventProcessor = new
ShareAcknowledgementEventProcessor();
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper =
backgroundEventReaperFactory.build(logContext);
@@ -389,6 +394,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_SHARE_METRIC_GROUP);
final BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
+ this.acknowledgementEventQueue = new LinkedBlockingQueue<>();
+ this.acknowledgementEventHandler = new
ShareAcknowledgementEventHandler(acknowledgementEventQueue);
this.backgroundEventQueue = new LinkedBlockingQueue<>();
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
@@ -402,6 +409,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final Supplier<RequestManagers> requestManagersSupplier =
RequestManagers.supplier(
time,
logContext,
+ acknowledgementEventHandler,
backgroundEventHandler,
metadata,
subscriptions,
@@ -430,6 +438,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
requestManagersSupplier,
asyncConsumerMetrics);
+ this.acknowledgementEventProcessor = new
ShareAcknowledgementEventProcessor();
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = new CompletableEventReaper(logContext);
@@ -447,6 +456,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final ShareFetchCollector<K, V> fetchCollector,
final Time time,
final ApplicationEventHandler applicationEventHandler,
+ final BlockingQueue<ShareAcknowledgementEvent>
acknowledgementEventQueue,
final BlockingQueue<BackgroundEvent>
backgroundEventQueue,
final CompletableEventReaper backgroundEventReaper,
final Metrics metrics,
@@ -463,6 +473,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.fetchBuffer = fetchBuffer;
this.fetchCollector = fetchCollector;
this.time = time;
+ this.acknowledgementEventQueue = acknowledgementEventQueue;
+ this.acknowledgementEventProcessor = new
ShareAcknowledgementEventProcessor();
this.backgroundEventQueue = backgroundEventQueue;
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = backgroundEventReaper;
@@ -478,6 +490,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.clientTelemetryReporter = Optional.empty();
this.completedAcknowledgements = Collections.emptyList();
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_SHARE_METRIC_GROUP);
+ this.acknowledgementEventHandler = new
ShareAcknowledgementEventHandler(acknowledgementEventQueue);
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
}
@@ -579,8 +592,11 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
acquireAndEnsureOpen();
try {
+ // Throw any errors notified by the background thread
+ processBackgroundEvents();
+
// Handle any completed acknowledgements for which we already have
the responses
- handleCompletedAcknowledgements(false);
+ handleCompletedAcknowledgements();
// If using implicit acknowledgement, acknowledge the previously
fetched records
acknowledgeBatchIfImplicitAcknowledgement();
@@ -600,8 +616,6 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
// Make sure the network thread can tell the application is
actively polling
applicationEventHandler.add(new
SharePollEvent(timer.currentTimeMs()));
- processBackgroundEvents();
-
// We must not allow wake-ups between polling for fetches and
returning the records.
// A wake-up between returned fetches and returning records
would lead to never
// returning the records in the fetches. Thus, we trigger a
possible wake-up before we poll fetches.
@@ -610,14 +624,18 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final ShareFetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
currentFetch = fetch;
+ handleCompletedAcknowledgements();
return new ConsumerRecords<>(fetch.records(), Map.of());
}
+ // Throw any errors notified by the background thread
+ processBackgroundEvents();
metadata.maybeThrowAnyException();
// We will wait for retryBackoffMs
} while (timer.notExpired());
+ handleCompletedAcknowledgements();
return ConsumerRecords.empty();
} catch (ShareFetchException e) {
currentFetch = (ShareFetch<K, V>) e.shareFetch();
@@ -625,14 +643,6 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
} finally {
kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
- // Handle any acknowledgements which completed while we were
waiting, but do not throw
- // the exception because the fetched records would then not be
returned to the caller
- try {
- handleCompletedAcknowledgements(false);
- } catch (Throwable t) {
- log.warn("Exception thrown in acknowledgement commit
callback", t);
- }
-
release();
}
}
@@ -776,7 +786,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
acquireAndEnsureOpen();
try {
// Handle any completed acknowledgements for which we already have
the responses
- handleCompletedAcknowledgements(false);
+ handleCompletedAcknowledgements();
// If using implicit acknowledgement, acknowledge the previously
fetched records
acknowledgeBatchIfImplicitAcknowledgement();
@@ -803,7 +813,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
});
// Handle any acknowledgements which completed while we
were waiting
- handleCompletedAcknowledgements(false);
+ handleCompletedAcknowledgements();
return result;
} finally {
@@ -823,7 +833,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
acquireAndEnsureOpen();
try {
// Handle any completed acknowledgements for which we already have
the responses
- handleCompletedAcknowledgements(false);
+ handleCompletedAcknowledgements();
// If using implicit acknowledgement, acknowledge the previously
fetched records
acknowledgeBatchIfImplicitAcknowledgement();
@@ -963,7 +973,9 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
swallow(log, Level.ERROR, "Failed to stop finding coordinator",
this::stopFindCoordinatorOnClose, firstException);
swallow(log, Level.ERROR, "Failed invoking acknowledgement commit
callback",
- () -> handleCompletedAcknowledgements(true), firstException);
+ this::handleCompletedAcknowledgements, firstException);
+ swallow(log, Level.ERROR, "Failed processing background events",
+ this::processBackgroundEventsOnClose, firstException);
if (applicationEventHandler != null)
closeQuietly(() ->
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())),
"Failed shutting down network thread", firstException);
closeTimer.update();
@@ -1101,18 +1113,13 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
* Handles any completed acknowledgements. If there is an acknowledgement
commit callback registered,
* call it. Otherwise, discard the information about completed
acknowledgements because the application
* is not interested.
- * <p>
- * If the acknowledgement commit callback throws an exception, this method
will throw an exception.
*/
- private void handleCompletedAcknowledgements(boolean onClose) {
- if (backgroundEventQueue == null || backgroundEventReaper == null ||
backgroundEventProcessor == null) {
+ private void handleCompletedAcknowledgements() {
+ if (acknowledgementEventQueue == null || acknowledgementEventHandler
== null) {
return;
}
- // If the user gets any fatal errors, they will get these exceptions
in the background queue.
- // While closing, we ignore these exceptions so that the consumers
close successfully.
- processBackgroundEvents(onClose ? e -> (e instanceof
GroupAuthorizationException
- || e instanceof TopicAuthorizationException
- || e instanceof InvalidTopicException) : e -> false);
+
+ processAcknowledgementEvents();
if (!completedAcknowledgements.isEmpty()) {
try {
@@ -1171,20 +1178,44 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
return ShareAcknowledgementMode.fromString(s);
}
- private void processBackgroundEvents(final Predicate<Exception>
ignoreErrorEventException) {
+ /**
+ * Process acknowledgement events, if any, that were produced by the
{@link ConsumerNetworkThread network thread}.
+ */
+ void processAcknowledgementEvents() {
+ List<ShareAcknowledgementEvent> events =
acknowledgementEventHandler.drainEvents();
+ if (!events.isEmpty()) {
+ for (ShareAcknowledgementEvent event : events) {
+ try {
+ acknowledgementEventProcessor.process(event);
+ } catch (Exception e) {
+ log.warn("An error occurred when processing the
acknowledgement event: {}", e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Process background events on close. Except some expected exceptions
which might occur
+ * during close, exceptions encountered are thrown.
+ */
+ void processBackgroundEventsOnClose() {
+ if (backgroundEventQueue == null || backgroundEventHandler == null) {
+ return;
+ }
+
try {
processBackgroundEvents();
} catch (Exception e) {
- if (!ignoreErrorEventException.test(e))
+ if (!(e instanceof GroupAuthorizationException || e instanceof
TopicAuthorizationException || e instanceof InvalidTopicException))
throw e;
}
}
/**
- * Process the events—if any—that were produced by the {@link
ConsumerNetworkThread network thread}.
- * It is possible that {@link ErrorEvent an error}
- * could occur when processing the events. In such cases, the processor
will take a reference to the first
- * error, continue to process the remaining events, and then throw the
first error that occurred.
+ * Process background events, if any, that were produced by the {@link
ConsumerNetworkThread network thread}.
+ * It is possible that {@link ErrorEvent an error} could occur when
processing the events. In such
+ * cases, the processor will take a reference to the first error, continue
to process the remaining
+ * events, and then throw the first error that occurred.
*
* Visible for testing.
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 9f84df9414d..dc33a134bcf 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -28,8 +28,6 @@ public abstract class BackgroundEvent {
public enum Type {
ERROR,
CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED,
- SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK,
- SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE,
STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED,
STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED,
STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementCommitCallbackEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
similarity index 62%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementCommitCallbackEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
index 3f52f853208..9dfc06b1f42 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementCommitCallbackEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
@@ -17,26 +17,31 @@
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.common.TopicIdPartition;
-import java.util.HashMap;
import java.util.Map;
-public class ShareAcknowledgementCommitCallbackEvent extends BackgroundEvent {
+/**
+ * This is the class of events created by the {@link ConsumerNetworkThread
network thread} to indicate completion
+ * of acknowledgements.
+ */
+public class ShareAcknowledgementEvent {
private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
+ private final boolean checkForRenewAcknowledgements;
- public ShareAcknowledgementCommitCallbackEvent(Map<TopicIdPartition,
Acknowledgements> acknowledgementsMap) {
- super(Type.SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK);
- this.acknowledgementsMap = new HashMap<>(acknowledgementsMap);
+ public ShareAcknowledgementEvent(Map<TopicIdPartition, Acknowledgements>
acknowledgementsMap,
+ boolean checkForRenewAcknowledgements) {
+ this.acknowledgementsMap = acknowledgementsMap;
+ this.checkForRenewAcknowledgements = checkForRenewAcknowledgements;
}
public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}
- @Override
- public String toStringBase() {
- return super.toStringBase() + ", acknowledgementsMap=" +
acknowledgementsMap;
+ public boolean checkForRenewAcknowledgements() {
+ return checkForRenewAcknowledgements;
}
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEventHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEventHandler.java
new file mode 100644
index 00000000000..c1c2d40e00b
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEventHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * An event handler that receives {@link BackgroundEvent background events}
from the
+ * {@link ConsumerNetworkThread network thread} which are then made available
to the application thread
+ * via an {@link EventProcessor}.
+ */
+
+public class ShareAcknowledgementEventHandler {
+
+ private final BlockingQueue<ShareAcknowledgementEvent> eventQueue;
+
+ public ShareAcknowledgementEventHandler(final
BlockingQueue<ShareAcknowledgementEvent> eventQueue) {
+ this.eventQueue = eventQueue;
+ }
+
+ /**
+ * Add a {@link ShareAcknowledgementEvent} to the handler.
+ *
+ * @param event A {@link ShareAcknowledgementEvent} created by the {@link
ConsumerNetworkThread network thread}
+ */
+ public void add(ShareAcknowledgementEvent event) {
+ Objects.requireNonNull(event, "ShareAcknowledgementCompleteEvent
provided to add must be non-null");
+ eventQueue.add(event);
+ }
+
+ /**
+ * Drain all the {@link ShareAcknowledgementEvent events} from the handler.
+ *
+ * @return A list of {@link ShareAcknowledgementEvent events} that were
drained
+ */
+ public List<ShareAcknowledgementEvent> drainEvents() {
+ List<ShareAcknowledgementEvent> events = new ArrayList<>();
+ eventQueue.drainTo(events);
+ return events;
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
deleted file mode 100644
index 1151ac04a52..00000000000
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.clients.consumer.internals.Acknowledgements;
-import org.apache.kafka.common.TopicIdPartition;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class ShareRenewAcknowledgementsCompleteEvent extends BackgroundEvent {
-
- private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
-
- public ShareRenewAcknowledgementsCompleteEvent(Map<TopicIdPartition,
Acknowledgements> acknowledgementsMap) {
- super(Type.SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE);
- this.acknowledgementsMap = new HashMap<>(acknowledgementsMap);
- }
-
- public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
- return acknowledgementsMap;
- }
-
- @Override
- public String toStringBase() {
- return super.toStringBase() + ", acknowledgementsMap=" +
acknowledgementsMap;
- }
-}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index d2abe110ceb..f199999a8f0 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -25,10 +25,9 @@ import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ShareAcquireMode;
-import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
@@ -2763,7 +2762,7 @@ public class ShareConsumeRequestManagerTest {
subscriptions,
shareFetchConfig,
deserializers);
- BackgroundEventHandler backgroundEventHandler = new
TestableBackgroundEventHandler(time, completedAcknowledgements, renewedRecords);
+ ShareAcknowledgementEventHandler acknowledgementEventHandler = new
TestableShareAcknowledgementEventHandler(completedAcknowledgements,
renewedRecords);
shareConsumeRequestManager = spy(new
TestableShareConsumeRequestManager<>(
logContext,
groupId,
@@ -2771,7 +2770,7 @@ public class ShareConsumeRequestManagerTest {
subscriptionState,
shareFetchConfig,
new ShareFetchBuffer(logContext),
- backgroundEventHandler,
+ acknowledgementEventHandler,
metricsManager,
shareFetchCollector));
}
@@ -2810,11 +2809,11 @@ public class ShareConsumeRequestManagerTest {
SubscriptionState
subscriptions,
ShareFetchConfig
shareFetchConfig,
ShareFetchBuffer
shareFetchBuffer,
- BackgroundEventHandler
backgroundEventHandler,
+
ShareAcknowledgementEventHandler acknowledgementEventHandler,
ShareFetchMetricsManager
metricsManager,
ShareFetchCollector<K, V>
fetchCollector) {
super(time, logContext, groupId, metadata, subscriptions,
shareFetchConfig, shareFetchBuffer,
- backgroundEventHandler, metricsManager, retryBackoffMs,
1000);
+ acknowledgementEventHandler, metricsManager, retryBackoffMs,
1000);
this.shareFetchCollector = fetchCollector;
onMemberEpochUpdated(Optional.empty(),
Uuid.randomUuid().toString());
}
@@ -2955,23 +2954,20 @@ public class ShareConsumeRequestManagerTest {
}
}
- private static class TestableBackgroundEventHandler extends
BackgroundEventHandler {
+ private static class TestableShareAcknowledgementEventHandler extends
ShareAcknowledgementEventHandler {
List<Map<TopicIdPartition, Acknowledgements>>
completedAcknowledgements;
Set<Long> renewedRecords;
- public TestableBackgroundEventHandler(Time time,
List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements,
Set<Long> renewedRecords) {
- super(new LinkedBlockingQueue<>(), time,
mock(AsyncConsumerMetrics.class));
+ public
TestableShareAcknowledgementEventHandler(List<Map<TopicIdPartition,
Acknowledgements>> completedAcknowledgements, Set<Long> renewedRecords) {
+ super(new LinkedBlockingQueue<>());
this.completedAcknowledgements = completedAcknowledgements;
this.renewedRecords = renewedRecords;
}
- public void add(BackgroundEvent event) {
- if (event.type() ==
BackgroundEvent.Type.SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK) {
- ShareAcknowledgementCommitCallbackEvent
shareAcknowledgementCommitCallbackEvent =
(ShareAcknowledgementCommitCallbackEvent) event;
-
completedAcknowledgements.add(shareAcknowledgementCommitCallbackEvent.acknowledgementsMap());
- } else if (event.type() ==
BackgroundEvent.Type.SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE) {
- ShareRenewAcknowledgementsCompleteEvent
shareRenewAcknowledgementsCompleteEvent =
(ShareRenewAcknowledgementsCompleteEvent) event;
-
shareRenewAcknowledgementsCompleteEvent.acknowledgementsMap().values().forEach(acks
->
+ public void add(ShareAcknowledgementEvent event) {
+ completedAcknowledgements.add(event.acknowledgementsMap());
+ if (event.checkForRenewAcknowledgements()) {
+ event.acknowledgementsMap().values().forEach(acks ->
acks.getAcknowledgementsTypeMap().forEach((offset,
ackType) -> renewedRecords.add(offset)));
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 6e819a6888a..489b0d78200 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -27,11 +27,10 @@ import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -41,6 +40,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -107,6 +107,7 @@ public class ShareConsumerImplTest {
private final ShareFetchCollector<String, String> fetchCollector =
mock(ShareFetchCollector.class);
private final ShareConsumerMetadata metadata =
mock(ShareConsumerMetadata.class);
private final ApplicationEventHandler applicationEventHandler =
mock(ApplicationEventHandler.class);
+ private final LinkedBlockingQueue<ShareAcknowledgementEvent>
acknowledgementEventQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue =
new LinkedBlockingQueue<>();
private final CompletableEventReaper backgroundEventReaper =
mock(CompletableEventReaper.class);
@@ -147,6 +148,7 @@ public class ShareConsumerImplTest {
(a, b, c, d, e, f, g, h) -> applicationEventHandler,
a -> backgroundEventReaper,
(a, b, c, d, e) -> fetchCollector,
+ acknowledgementEventQueue,
backgroundEventQueue
);
}
@@ -181,6 +183,7 @@ public class ShareConsumerImplTest {
fetchCollector,
time,
applicationEventHandler,
+ acknowledgementEventQueue,
backgroundEventQueue,
backgroundEventReaper,
new Metrics(),
@@ -517,8 +520,8 @@ public class ShareConsumerImplTest {
Acknowledgements acks = Acknowledgements.empty();
acks.add(0, AcknowledgeType.RENEW);
acks.complete(null);
- ShareRenewAcknowledgementsCompleteEvent e = new
ShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
- backgroundEventQueue.add(e);
+ ShareAcknowledgementEvent e = new
ShareAcknowledgementEvent(Map.of(tip, acks), true);
+ acknowledgementEventQueue.add(e);
records = consumer.poll(Duration.ofMillis(100));
assertEquals(1, records.count(), "Should have received 1 record");
@@ -898,13 +901,13 @@ public class ShareConsumerImplTest {
Metrics metrics = consumer.metricsRegistry();
AsyncConsumerMetrics asyncConsumerMetrics =
consumer.asyncConsumerMetrics();
- ShareAcknowledgementCommitCallbackEvent event = new
ShareAcknowledgementCommitCallbackEvent(Map.of());
+ ErrorEvent event = new ErrorEvent(new InvalidRecordStateException("The
record is in the wrong state"));
backgroundEventQueue.add(event);
asyncConsumerMetrics.recordBackgroundEventQueueSize(1);
assertEquals(1, (double)
metrics.metric(metrics.metricName("background-event-queue-size",
CONSUMER_SHARE_METRIC_GROUP)).metricValue());
- consumer.processBackgroundEvents();
+ assertThrows(InvalidRecordStateException.class, () ->
consumer.processBackgroundEvents());
assertEquals(0, (double)
metrics.metric(metrics.metricName("background-event-queue-size",
CONSUMER_SHARE_METRIC_GROUP)).metricValue());
}