This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b213c64f97f KAFKA-17480: New consumer commit all consumed should
retrieve offsets in background thread (#17150)
b213c64f97f is described below
commit b213c64f97fa6b4ab3b79b828e37095603841d6f
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Nov 7 22:45:44 2024 +0800
KAFKA-17480: New consumer commit all consumed should retrieve offsets in
background thread (#17150)
Reviewers: Lianet Magrans <[email protected]>, Kirk True
<[email protected]>, TengYao Chi <[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 59 +++----
.../consumer/internals/CommitRequestManager.java | 72 +++++---
.../consumer/internals/RequestManagers.java | 3 +-
.../events/ApplicationEventProcessor.java | 33 ++--
.../internals/events/AsyncCommitEvent.java | 4 +-
.../consumer/internals/events/CommitEvent.java | 19 ++-
.../consumer/internals/events/SyncCommitEvent.java | 5 +-
.../consumer/internals/AsyncKafkaConsumerTest.java | 163 ++++++------------
.../internals/CommitRequestManagerTest.java | 182 ++++++++++++++++++---
.../events/ApplicationEventProcessorTest.java | 115 ++++++++++++-
10 files changed, 434 insertions(+), 221 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index c1e84a4eae5..7fda9a20c05 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -249,7 +249,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
// Last triggered async commit future. Used to wait until all previous
async commits are completed.
// We only need to keep track of the last one, since they are guaranteed
to complete in order.
- private CompletableFuture<Void> lastPendingAsyncCommit = null;
+ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
lastPendingAsyncCommit = null;
// currentThread holds the threadId of the current thread accessing the
AsyncKafkaConsumer
// and is used to prevent multithreaded access
@@ -752,43 +752,43 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public void commitAsync(OffsetCommitCallback callback) {
- commitAsync(subscriptions.allConsumed(), callback);
+ commitAsync(Optional.empty(), callback);
}
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback) {
+ commitAsync(Optional.of(offsets), callback);
+ }
+
+ private void commitAsync(Optional<Map<TopicPartition, OffsetAndMetadata>>
offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
- lastPendingAsyncCommit = commit(asyncCommitEvent).whenComplete((r,
t) -> {
+ lastPendingAsyncCommit =
commit(asyncCommitEvent).whenComplete((committedOffsets, throwable) -> {
- if (t == null) {
-
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets);
+ if (throwable == null) {
+
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(committedOffsets);
}
if (callback == null) {
- if (t != null) {
- log.error("Offset commit with offsets {} failed",
offsets, t);
+ if (throwable != null) {
+ log.error("Offset commit with offsets {} failed",
committedOffsets, throwable);
}
return;
}
-
offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback, offsets,
(Exception) t);
+
offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback,
committedOffsets, (Exception) throwable);
});
} finally {
release();
}
}
- private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
+ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commit(final CommitEvent commitEvent) {
maybeThrowInvalidGroupIdException();
offsetCommitCallbackInvoker.executeCallbacks();
- Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets();
- log.debug("Committing offsets: {}", offsets);
- offsets.forEach(this::updateLastSeenEpochIfNewer);
-
- if (offsets.isEmpty()) {
+ if (commitEvent.offsets().isPresent() &&
commitEvent.offsets().get().isEmpty()) {
return CompletableFuture.completedFuture(null);
}
@@ -828,7 +828,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
} else {
log.info("Seeking to offset {} for partition {}", offset,
partition);
}
- updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
Timer timer = time.timer(defaultApiTimeoutMs);
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new
SeekUnvalidatedEvent(
@@ -914,9 +913,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
calculateDeadlineMs(time, timeout));
wakeupTrigger.setActiveTask(event.future());
try {
- final Map<TopicPartition, OffsetAndMetadata> committedOffsets
= applicationEventHandler.addAndGet(event);
- committedOffsets.forEach(this::updateLastSeenEpochIfNewer);
- return committedOffsets;
+ return applicationEventHandler.addAndGet(event);
} catch (TimeoutException e) {
throw new TimeoutException("Timeout of " + timeout.toMillis()
+ "ms expired before the last " +
"committed offset for partitions " + partitions + " could
be determined. Try tuning " +
@@ -1294,13 +1291,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// Visible for testing
void commitSyncAllConsumed(final Timer timer) {
- Map<TopicPartition, OffsetAndMetadata> allConsumed =
subscriptions.allConsumed();
- log.debug("Sending synchronous auto-commit of offsets {} on closing",
allConsumed);
+ log.debug("Sending synchronous auto-commit on closing");
try {
- commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
+ commitSync(Duration.ofMillis(timer.remainingMs()));
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate
the exception
- log.warn("Synchronous auto-commit of offsets {} failed: {}",
allConsumed, e.getMessage());
+ log.warn("Synchronous auto-commit failed", e);
}
timer.update();
}
@@ -1318,28 +1314,32 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
*/
@Override
public void commitSync(final Duration timeout) {
- commitSync(subscriptions.allConsumed(), timeout);
+ commitSync(Optional.empty(), timeout);
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
- commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
+ commitSync(Optional.of(offsets),
Duration.ofMillis(defaultApiTimeoutMs));
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets,
Duration timeout) {
+ commitSync(Optional.of(offsets), timeout);
+ }
+
+ private void commitSync(Optional<Map<TopicPartition, OffsetAndMetadata>>
offsets, Duration timeout) {
acquireAndEnsureOpen();
long commitStart = time.nanoseconds();
try {
SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets,
calculateDeadlineMs(time, timeout));
- CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitFuture = commit(syncCommitEvent);
Timer requestTimer = time.timer(timeout.toMillis());
awaitPendingAsyncCommitsAndExecuteCommitCallbacks(requestTimer,
true);
wakeupTrigger.setActiveTask(commitFuture);
- ConsumerUtils.getResult(commitFuture, requestTimer);
- interceptors.onCommit(offsets);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets =
ConsumerUtils.getResult(commitFuture, requestTimer);
+ interceptors.onCommit(committedOffsets);
} finally {
wakeupTrigger.clearTask();
kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() -
commitStart);
@@ -1588,11 +1588,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
return groupMetadata.get().isPresent();
}
- private void updateLastSeenEpochIfNewer(TopicPartition topicPartition,
OffsetAndMetadata offsetAndMetadata) {
- if (offsetAndMetadata != null)
- offsetAndMetadata.leaderEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
- }
-
/**
* This method signals the background thread to {@link
CreateFetchRequestsEvent create fetch requests}.
*
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index dfa2520a02b..a5cb4753b38 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -72,6 +72,7 @@ import static
org.apache.kafka.common.protocol.Errors.COORDINATOR_LOAD_IN_PROGRE
public class CommitRequestManager implements RequestManager,
MemberStateListener {
private final Time time;
private final SubscriptionState subscriptions;
+ private final ConsumerMetadata metadata;
private final LogContext logContext;
private final Logger log;
private final Optional<AutoCommitState> autoCommitState;
@@ -102,15 +103,16 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
private final MemberInfo memberInfo;
public CommitRequestManager(
- final Time time,
- final LogContext logContext,
- final SubscriptionState subscriptions,
- final ConsumerConfig config,
- final CoordinatorRequestManager coordinatorRequestManager,
- final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
- final String groupId,
- final Optional<String> groupInstanceId,
- final Metrics metrics) {
+ final Time time,
+ final LogContext logContext,
+ final SubscriptionState subscriptions,
+ final ConsumerConfig config,
+ final CoordinatorRequestManager coordinatorRequestManager,
+ final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
+ final String groupId,
+ final Optional<String> groupInstanceId,
+ final Metrics metrics,
+ final ConsumerMetadata metadata) {
this(time,
logContext,
subscriptions,
@@ -122,7 +124,8 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG),
OptionalDouble.empty(),
- metrics);
+ metrics,
+ metadata);
}
// Visible for testing
@@ -138,7 +141,8 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
final long retryBackoffMs,
final long retryBackoffMaxMs,
final OptionalDouble jitter,
- final Metrics metrics) {
+ final Metrics metrics,
+ final ConsumerMetadata metadata) {
Objects.requireNonNull(coordinatorRequestManager, "Coordinator is
needed upon committing offsets");
this.time = time;
this.logContext = logContext;
@@ -155,6 +159,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
this.groupId = groupId;
this.groupInstanceId = groupInstanceId;
this.subscriptions = subscriptions;
+ this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.retryBackoffMaxMs = retryBackoffMaxMs;
this.jitter = jitter;
@@ -381,20 +386,22 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
* exceptionally depending on the response. If the request fails with a
retriable error, the
* future will be completed with a {@link RetriableCommitFailedException}.
*/
- public CompletableFuture<Void> commitAsync(final Map<TopicPartition,
OffsetAndMetadata> offsets) {
- if (offsets.isEmpty()) {
+ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
offsets.orElseGet(subscriptions::allConsumed);
+ if (commitOffsets.isEmpty()) {
log.debug("Skipping commit of empty offsets");
- return CompletableFuture.completedFuture(null);
+ return CompletableFuture.completedFuture(Map.of());
}
- OffsetCommitRequestState commitRequest =
createOffsetCommitRequest(offsets, Long.MAX_VALUE);
+ maybeUpdateLastSeenEpochIfNewer(commitOffsets);
+ OffsetCommitRequestState commitRequest =
createOffsetCommitRequest(commitOffsets, Long.MAX_VALUE);
pendingRequests.addOffsetCommitRequest(commitRequest);
- CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
asyncCommitResult = new CompletableFuture<>();
commitRequest.future.whenComplete((committedOffsets, error) -> {
if (error != null) {
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
} else {
- asyncCommitResult.complete(null);
+ asyncCommitResult.complete(commitOffsets);
}
});
return asyncCommitResult;
@@ -403,15 +410,20 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
/**
* Commit offsets, retrying on expected retriable errors while the retry
timeout hasn't expired.
*
- * @param offsets Offsets to commit
- * @param deadlineMs Time until which the request will be
retried if it fails with
- * an expected retriable error.
+ * @param offsets Offsets to commit
+ * @param deadlineMs Time until which the request will be retried if it
fails with
+ * an expected retriable error.
* @return Future that will complete when a successful response
*/
- public CompletableFuture<Void> commitSync(final Map<TopicPartition,
OffsetAndMetadata> offsets,
- final long deadlineMs) {
- CompletableFuture<Void> result = new CompletableFuture<>();
- OffsetCommitRequestState requestState =
createOffsetCommitRequest(offsets, deadlineMs);
+ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitSync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets,
+
final long deadlineMs) {
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
offsets.orElseGet(subscriptions::allConsumed);
+ if (commitOffsets.isEmpty()) {
+ return CompletableFuture.completedFuture(Map.of());
+ }
+ maybeUpdateLastSeenEpochIfNewer(commitOffsets);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new
CompletableFuture<>();
+ OffsetCommitRequestState requestState =
createOffsetCommitRequest(commitOffsets, deadlineMs);
commitSyncWithRetries(requestState, result);
return result;
}
@@ -439,14 +451,14 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
}
private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt,
- CompletableFuture<Void> result) {
+ CompletableFuture<Map<TopicPartition,
OffsetAndMetadata>> result) {
pendingRequests.addOffsetCommitRequest(requestAttempt);
// Retry the same commit request while it fails with
RetriableException and the retry
// timeout hasn't expired.
requestAttempt.future.whenComplete((res, error) -> {
if (error == null) {
- result.complete(null);
+ result.complete(requestAttempt.offsets);
} else {
if (error instanceof RetriableException) {
if (requestAttempt.isExpired()) {
@@ -531,6 +543,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
"outbound buffer:" + fetchRequest);
}
if (error == null) {
+ maybeUpdateLastSeenEpochIfNewer(res);
result.complete(res);
} else {
if (error instanceof RetriableException ||
isStaleEpochErrorAndValidEpochAvailable(error)) {
@@ -615,6 +628,13 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests);
}
+ private void maybeUpdateLastSeenEpochIfNewer(final Map<TopicPartition,
OffsetAndMetadata> offsets) {
+ offsets.forEach((topicPartition, offsetAndMetadata) -> {
+ if (offsetAndMetadata != null)
+ offsetAndMetadata.leaderEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
+ });
+ }
+
class OffsetCommitRequestState extends RetriableRequestState {
private Map<TopicPartition, OffsetAndMetadata> offsets;
private final String groupId;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 7339a49ed6d..7f682c81fc4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -205,7 +205,8 @@ public class RequestManagers implements Closeable {
offsetCommitCallbackInvoker,
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId,
- metrics);
+ metrics,
+ metadata);
membershipManager = new ConsumerMembershipManager(
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 2879e471c7a..642e5e0cca2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -198,29 +198,41 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
private void process(final AsyncCommitEvent event) {
- if (!requestManagers.commitRequestManager.isPresent()) {
+ if (requestManagers.commitRequestManager.isEmpty()) {
+ event.future().completeExceptionally(new KafkaException("Unable to
async commit " +
+ "offset because the CommitRequestManager is not available.
Check if group.id was set correctly"));
return;
}
- CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Void> future = manager.commitAsync(event.offsets());
- future.whenComplete(complete(event.future()));
+ try {
+ CommitRequestManager manager =
requestManagers.commitRequestManager.get();
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitAsync(event.offsets());
+ future.whenComplete(complete(event.future()));
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
}
private void process(final SyncCommitEvent event) {
- if (!requestManagers.commitRequestManager.isPresent()) {
+ if (requestManagers.commitRequestManager.isEmpty()) {
+ event.future().completeExceptionally(new KafkaException("Unable to
sync commit " +
+ "offset because the CommitRequestManager is not available.
Check if group.id was set correctly"));
return;
}
- CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Void> future = manager.commitSync(event.offsets(),
event.deadlineMs());
- future.whenComplete(complete(event.future()));
+ try {
+ CommitRequestManager manager =
requestManagers.commitRequestManager.get();
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitSync(event.offsets(), event.deadlineMs());
+ future.whenComplete(complete(event.future()));
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
}
private void process(final FetchCommittedOffsetsEvent event) {
- if (!requestManagers.commitRequestManager.isPresent()) {
+ if (requestManagers.commitRequestManager.isEmpty()) {
event.future().completeExceptionally(new KafkaException("Unable to
fetch committed " +
- "offset because the CommittedRequestManager is not
available. Check if group.id was set correctly"));
+ "offset because the CommitRequestManager is not available.
Check if group.id was set correctly"));
return;
}
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
@@ -523,6 +535,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
private void process(final SeekUnvalidatedEvent event) {
try {
+ event.offsetEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(event.partition(), epoch));
SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
event.offset(),
event.offsetEpoch(),
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
index c36f0534b36..238aa1979d0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
@@ -20,13 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
+import java.util.Optional;
/**
* Event to commit offsets without waiting for a response, so the request
won't be retried.
+ * If no offsets are provided, this event will commit all consumed offsets.
*/
public class AsyncCommitEvent extends CommitEvent {
- public AsyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata>
offsets) {
+ public AsyncCommitEvent(final Optional<Map<TopicPartition,
OffsetAndMetadata>> offsets) {
super(Type.COMMIT_ASYNC, offsets, Long.MAX_VALUE);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
index dc863b0ee65..9c94d3b9d3b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
@@ -21,15 +21,16 @@ import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
-public abstract class CommitEvent extends CompletableApplicationEvent<Void> {
+public abstract class CommitEvent extends
CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
/**
* Offsets to commit per partition.
*/
- private final Map<TopicPartition, OffsetAndMetadata> offsets;
+ private final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets;
- protected CommitEvent(final Type type, final Map<TopicPartition,
OffsetAndMetadata> offsets, final long deadlineMs) {
+ protected CommitEvent(final Type type, final Optional<Map<TopicPartition,
OffsetAndMetadata>> offsets, final long deadlineMs) {
super(type, deadlineMs);
this.offsets = validate(offsets);
}
@@ -38,17 +39,21 @@ public abstract class CommitEvent extends
CompletableApplicationEvent<Void> {
* Validates the offsets are not negative and then returns the given
offset map as
* {@link Collections#unmodifiableMap(Map) as unmodifiable}.
*/
- private static Map<TopicPartition, OffsetAndMetadata> validate(final
Map<TopicPartition, OffsetAndMetadata> offsets) {
- for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {
+ private static Optional<Map<TopicPartition, OffsetAndMetadata>>
validate(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
+ if (!offsets.isPresent()) {
+ return Optional.empty();
+ }
+
+ for (OffsetAndMetadata offsetAndMetadata : offsets.get().values()) {
if (offsetAndMetadata.offset() < 0) {
throw new IllegalArgumentException("Invalid offset: " +
offsetAndMetadata.offset());
}
}
- return Collections.unmodifiableMap(offsets);
+ return Optional.of(Collections.unmodifiableMap(offsets.get()));
}
- public Map<TopicPartition, OffsetAndMetadata> offsets() {
+ public Optional<Map<TopicPartition, OffsetAndMetadata>> offsets() {
return offsets;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
index 7dc7a023a8d..5b02af2c0c7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
@@ -20,14 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
+import java.util.Optional;
/**
* Event to commit offsets waiting for a response and retrying on expected
retriable errors until
- * the timer expires.
+ * the timer expires. If no offsets are provided, this event will commit all
consumed offsets.
*/
public class SyncCommitEvent extends CommitEvent {
- public SyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata>
offsets, final long deadlineMs) {
+ public SyncCommitEvent(final Optional<Map<TopicPartition,
OffsetAndMetadata>> offsets, final long deadlineMs) {
super(Type.COMMIT_SYNC, offsets, deadlineMs);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 380b7082b97..54a41587b06 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -54,7 +54,6 @@ import
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChang
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -185,8 +184,22 @@ public class AsyncKafkaConsumerTest {
}
private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
+ // disable auto-commit by default, so we don't need to handle
SyncCommitEvent for each case
+ if (!props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ }
final ConsumerConfig config = new ConsumerConfig(props);
- return newConsumer(config);
+ return new AsyncKafkaConsumer<>(
+ config,
+ new StringDeserializer(),
+ new StringDeserializer(),
+ time,
+ (a, b, c, d, e, f, g) -> applicationEventHandler,
+ a -> backgroundEventReaper,
+ (a, b, c, d, e, f, g) -> fetchCollector,
+ (a, b, c, d) -> metadata,
+ backgroundEventQueue
+ );
}
private AsyncKafkaConsumer<String, String> newConsumer(ConsumerConfig
config) {
@@ -209,10 +222,10 @@ public class AsyncKafkaConsumerTest {
ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
SubscriptionState subscriptions,
String groupId,
- String clientId) {
+ String clientId,
+ boolean autoCommitEnabled) {
long retryBackoffMs = 100L;
int defaultApiTimeoutMs = 1000;
- boolean autoCommitEnabled = true;
return new AsyncKafkaConsumer<>(
new LogContext(),
clientId,
@@ -281,8 +294,10 @@ public class AsyncKafkaConsumerTest {
final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
verify(applicationEventHandler).add(commitEventCaptor.capture());
final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
- assertEquals(offsets, commitEvent.offsets());
- assertDoesNotThrow(() -> commitEvent.future().complete(null));
+ assertTrue(commitEvent.offsets().isPresent());
+ assertEquals(offsets, commitEvent.offsets().get());
+
+ commitEvent.future().complete(offsets);
assertDoesNotThrow(() -> consumer.commitAsync(offsets, null));
// Clean-up. Close the consumer here as we know it will cause a
TimeoutException to be thrown.
@@ -347,25 +362,6 @@ public class AsyncKafkaConsumerTest {
assertTrue((double) metric.metricValue() > 0);
}
- @Test
- public void testCommittedLeaderEpochUpdate() {
- consumer = newConsumer();
- final TopicPartition t0 = new TopicPartition("t0", 2);
- final TopicPartition t1 = new TopicPartition("t0", 3);
- final TopicPartition t2 = new TopicPartition("t0", 4);
- HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new
HashMap<>();
- topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L,
Optional.of(2), ""));
- topicPartitionOffsets.put(t1, null);
- topicPartitionOffsets.put(t2, new OffsetAndMetadata(20L,
Optional.of(3), ""));
-
completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets);
-
- assertDoesNotThrow(() ->
consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000)));
-
- verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
- verify(metadata).updateLastSeenEpochIfNewer(t2, 3);
-
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
- }
-
@Test
public void testCommittedExceptionThrown() {
consumer = newConsumer();
@@ -388,7 +384,6 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition(topicName, partition);
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
- completeCommitSyncApplicationEventSuccessfully();
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
@@ -410,7 +405,6 @@ public class AsyncKafkaConsumerTest {
return Fetch.empty();
}).doAnswer(invocation ->
Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
- completeCommitSyncApplicationEventSuccessfully();
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
@@ -434,7 +428,6 @@ public class AsyncKafkaConsumerTest {
return Fetch.forPartition(tp, records, true, new
OffsetAndMetadata(4, Optional.of(0), ""));
}).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
- completeCommitSyncApplicationEventSuccessfully();
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
@@ -492,7 +485,6 @@ public class AsyncKafkaConsumerTest {
doReturn(Fetch.forPartition(tp, records, true, new
OffsetAndMetadata(4, Optional.of(0), "")))
.when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
- completeCommitSyncApplicationEventSuccessfully();
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
@@ -527,71 +519,6 @@ public class AsyncKafkaConsumerTest {
assertThrows(callbackException.getClass(), () ->
consumer.commitSync());
}
- @Test
- public void testCommitSyncLeaderEpochUpdate() {
- consumer = newConsumer();
- final TopicPartition t0 = new TopicPartition("t0", 2);
- final TopicPartition t1 = new TopicPartition("t0", 3);
- HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new
HashMap<>();
- topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L,
Optional.of(2), ""));
- topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L,
Optional.of(1), ""));
- completeCommitSyncApplicationEventSuccessfully();
-
- completeAssignmentChangeEventSuccessfully();
- consumer.assign(Arrays.asList(t0, t1));
-
- assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets));
-
- verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
- verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
-
verify(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
- }
-
- @Test
- public void testCommitAsyncLeaderEpochUpdate() {
- SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), OffsetResetStrategy.NONE);
- consumer = newConsumer(
- mock(FetchBuffer.class),
- new ConsumerInterceptors<>(Collections.emptyList()),
- mock(ConsumerRebalanceListenerInvoker.class),
- subscriptions,
- "group-id",
- "client-id");
- completeCommitSyncApplicationEventSuccessfully();
- final TopicPartition t0 = new TopicPartition("t0", 2);
- final TopicPartition t1 = new TopicPartition("t0", 3);
- HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new
HashMap<>();
- topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L,
Optional.of(2), ""));
- topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L,
Optional.of(1), ""));
- when(metadata.currentLeader(t0)).thenReturn(
- new LeaderAndEpoch(Optional.of(
- new Node(1, "host", 9000)), Optional.of(1)));
- when(metadata.currentLeader(t1)).thenReturn(
- new LeaderAndEpoch(Optional.of(
- new Node(1, "host", 9000)), Optional.of(1)));
- completeAssignmentChangeEventSuccessfully();
- consumer.assign(Arrays.asList(t0, t1));
- completeSeekUnvalidatedEventSuccessfully();
- consumer.seek(t0, 10);
- consumer.seek(t1, 20);
-
- MockCommitCallback callback = new MockCommitCallback();
- assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets,
callback));
-
- verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
- verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
-
verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
-
- // Clean-Up. Close the consumer here as we know it will cause a
TimeoutException to be thrown.
- // If we get an error *other* than the TimeoutException, we'll fail
the test.
- try {
- Exception e = assertThrows(KafkaException.class, () ->
consumer.close(Duration.ZERO));
- assertInstanceOf(TimeoutException.class, e.getCause());
- } finally {
- consumer = null;
- }
- }
-
@Test
public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
final TopicPartition tp = new TopicPartition("foo", 0);
@@ -722,7 +649,8 @@ public class AsyncKafkaConsumerTest {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
- "client-id"));
+ "client-id",
+ false));
completeUnsubscribeApplicationEventSuccessfully();
consumer.close(Duration.ZERO);
verifyUnsubscribeEvent(subscriptions);
@@ -739,7 +667,8 @@ public class AsyncKafkaConsumerTest {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
- "client-id"));
+ "client-id",
+ false));
doThrow(new
KafkaException()).when(consumer).processBackgroundEvents(any(), any(), any());
assertThrows(KafkaException.class, () ->
consumer.close(Duration.ZERO));
verifyUnsubscribeEvent(subscriptions);
@@ -748,8 +677,7 @@ public class AsyncKafkaConsumerTest {
}
@Test
- public void testAutoCommitSyncEnabled() {
- completeCommitSyncApplicationEventSuccessfully();
+ public void testCommitSyncAllConsumed() {
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), OffsetResetStrategy.NONE);
consumer = newConsumer(
mock(FetchBuffer.class),
@@ -757,14 +685,19 @@ public class AsyncKafkaConsumerTest {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
- "client-id");
+ "client-id",
+ false);
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singleton("topic"),
mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
consumer.commitSyncAllConsumed(time.timer(100));
- verify(applicationEventHandler).add(any(SyncCommitEvent.class));
+
+ ArgumentCaptor<SyncCommitEvent> eventCaptor =
ArgumentCaptor.forClass(SyncCommitEvent.class);
+ verify(applicationEventHandler).add(eventCaptor.capture());
+ SyncCommitEvent capturedEvent = eventCaptor.getValue();
+ assertFalse(capturedEvent.offsets().isPresent(), "Expected empty
optional offsets");
}
@Test
@@ -776,12 +709,15 @@ public class AsyncKafkaConsumerTest {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
- "client-id");
+ "client-id",
+ false);
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singleton("topic"),
mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
+ completeUnsubscribeApplicationEventSuccessfully();
+ consumer.close();
verify(applicationEventHandler,
never()).add(any(SyncCommitEvent.class));
}
@@ -1035,7 +971,9 @@ public class AsyncKafkaConsumerTest {
@Test
public void testNoWakeupInCloseCommit() {
TopicPartition tp = new TopicPartition("topic1", 0);
- consumer = newConsumer();
+ Properties props = requiredConsumerConfigAndGroupId("consumer-group");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+ consumer = newConsumer(props);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
@@ -1280,8 +1218,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
final String groupId = "consumerGroupA";
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
- consumer = newConsumer(config);
+ consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
@@ -1297,8 +1234,7 @@ public class AsyncKafkaConsumerTest {
final String groupInstanceId = "groupInstanceId1";
final Properties props = requiredConsumerConfigAndGroupId(groupId);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
- final ConsumerConfig config = new ConsumerConfig(props);
- consumer = newConsumer(config);
+ consumer = newConsumer(props);
final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
@@ -1333,9 +1269,8 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupMetadataUpdate() {
final String groupId = "consumerGroupA";
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
try (final MockedStatic<RequestManagers> requestManagers =
mockStatic(RequestManagers.class)) {
- consumer = newConsumer(config);
+ consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
final ConsumerGroupMetadata oldGroupMetadata =
consumer.groupMetadata();
final MemberStateListener groupMetadataUpdateListener =
captureGroupMetadataUpdateListener(requestManagers);
final int expectedMemberEpoch = 42;
@@ -1355,9 +1290,8 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupMetadataIsResetAfterUnsubscribe() {
final String groupId = "consumerGroupA";
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
try (final MockedStatic<RequestManagers> requestManagers =
mockStatic(RequestManagers.class)) {
- consumer = newConsumer(config);
+ consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
final MemberStateListener groupMetadataUpdateListener =
captureGroupMetadataUpdateListener(requestManagers);
consumer.subscribe(singletonList("topic"));
final int memberEpoch = 42;
@@ -1479,8 +1413,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testBackgroundError() {
final String groupId = "consumerGroupA";
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
- consumer = newConsumer(config);
+ consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
final KafkaException expectedException = new KafkaException("Nobody
expects the Spanish Inquisition");
final ErrorEvent errorEvent = new ErrorEvent(expectedException);
@@ -1495,8 +1428,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testMultipleBackgroundErrors() {
final String groupId = "consumerGroupA";
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
- consumer = newConsumer(config);
+ consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
final KafkaException expectedException1 = new KafkaException("Nobody
expects the Spanish Inquisition");
final ErrorEvent errorEvent1 = new ErrorEvent(expectedException1);
@@ -1589,7 +1521,8 @@ public class AsyncKafkaConsumerTest {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
- "client-id");
+ "client-id",
+ false);
final TopicPartition tp = new TopicPartition("topic", 0);
final List<ConsumerRecord<String, String>> records = singletonList(
new ConsumerRecord<>("topic", 0, 2, "key1", "value1"));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 9ded26b9e71..0ecd99afbd4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -87,6 +87,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -103,6 +104,7 @@ public class CommitRequestManagerTest {
private static final String DEFAULT_GROUP_INSTANCE_ID =
"group-instance-id";
private final Node mockedNode = new Node(1, "host1", 9092);
private SubscriptionState subscriptionState;
+ private ConsumerMetadata metadata;
private LogContext logContext;
private MockTime time;
private CoordinatorRequestManager coordinatorRequestManager;
@@ -118,6 +120,7 @@ public class CommitRequestManagerTest {
this.logContext = new LogContext();
this.time = new MockTime(0);
this.subscriptionState = new SubscriptionState(new LogContext(),
OffsetResetStrategy.EARLIEST);
+ this.metadata = mock(ConsumerMetadata.class);
this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
this.offsetCommitCallbackInvoker =
mock(OffsetCommitCallbackInvoker.class);
this.props = new Properties();
@@ -142,7 +145,8 @@ public class CommitRequestManagerTest {
retryBackoffMs,
retryBackoffMaxMs,
OptionalDouble.of(0),
- metrics);
+ metrics,
+ metadata);
commitRequestManager.onMemberEpochUpdated(Optional.of(1),
Uuid.randomUuid().toString());
Set<TopicPartition> requestedPartitions = Collections.singleton(new
TopicPartition("topic-1", 1));
@@ -175,7 +179,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(offsets);
+ commitRequestManager.commitAsync(Optional.of(offsets));
assertPoll(false, 0, commitRequestManager);
}
@@ -186,7 +190,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(offsets);
+ commitRequestManager.commitAsync(Optional.of(offsets));
assertPoll(false, 0, commitRequestManager);
assertPoll(true, 1, commitRequestManager);
}
@@ -198,7 +202,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(offsets);
+ commitRequestManager.commitAsync(Optional.of(offsets));
assertPoll(1, commitRequestManager);
}
@@ -239,9 +243,9 @@ public class CommitRequestManagerTest {
// Add the requests to the CommitRequestManager and store their futures
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- commitManager.commitSync(offsets1, deadlineMs);
+ commitManager.commitSync(Optional.of(offsets1), deadlineMs);
commitManager.fetchOffsets(Collections.singleton(new
TopicPartition("test", 0)), deadlineMs);
- commitManager.commitSync(offsets2, deadlineMs);
+ commitManager.commitSync(Optional.of(offsets2), deadlineMs);
commitManager.fetchOffsets(Collections.singleton(new
TopicPartition("test", 1)), deadlineMs);
// Poll the CommitRequestManager and verify that the
inflightOffsetFetches size is correct
@@ -274,13 +278,146 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(offsets);
+ commitRequestManager.commitAsync(Optional.of(offsets));
assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
assertEquals(1,
commitRequestManager.poll(time.milliseconds()).unsentRequests.size());
assertTrue(commitRequestManager.unsentOffsetCommitRequests().isEmpty());
assertEmptyPendingRequests(commitRequestManager);
}
+ @Test
+ public void testCommitSync() {
+ subscriptionState = mock(SubscriptionState.class);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+ TopicPartition tp = new TopicPartition("topic", 1);
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
+ Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
+
+ CommitRequestManager commitRequestManager = create(false, 100);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitSync(
+ Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
+ assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
+ List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
+ pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
+ "topic",
+ 1,
+ (short) 1,
+ Errors.NONE)));
+
+ verify(subscriptionState, never()).allConsumed();
+ verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
+ assertTrue(future.isDone());
+ assertEquals(offsets, commitOffsets);
+ }
+
+ @Test
+ public void testCommitSyncWithEmptyOffsets() {
+ subscriptionState = mock(SubscriptionState.class);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+ TopicPartition tp = new TopicPartition("topic", 1);
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
+ Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
+ doReturn(offsets).when(subscriptionState).allConsumed();
+
+ CommitRequestManager commitRequestManager = create(false, 100);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitSync(
+ Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
+ assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
+ List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
+ pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
+ "topic",
+ 1,
+ (short) 1,
+ Errors.NONE)));
+
+ verify(subscriptionState).allConsumed();
+ verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
+ assertTrue(future.isDone());
+ assertEquals(offsets, commitOffsets);
+ }
+
+ @Test
+ public void testCommitSyncWithEmptyAllConsumedOffsets() {
+ subscriptionState = mock(SubscriptionState.class);
+ doReturn(Map.of()).when(subscriptionState).allConsumed();
+
+ CommitRequestManager commitRequestManager = create(true, 100);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitSync(
+ Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
+
+ verify(subscriptionState).allConsumed();
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
+ assertTrue(future.isDone());
+ assertTrue(commitOffsets.isEmpty());
+ }
+
+ @Test
+ public void testCommitAsync() {
+ subscriptionState = mock(SubscriptionState.class);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+ TopicPartition tp = new TopicPartition("topic", 1);
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
+ Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
+
+ CommitRequestManager commitRequestManager = create(true, 100);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(Optional.of(offsets));
+ assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
+ List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
+ pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
+ "topic",
+ 1,
+ (short) 1,
+ Errors.NONE)));
+
+ verify(subscriptionState, never()).allConsumed();
+ verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+ assertTrue(future.isDone());
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
+ assertEquals(offsets, commitOffsets);
+ }
+
+ @Test
+ public void testCommitAsyncWithEmptyOffsets() {
+ subscriptionState = mock(SubscriptionState.class);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+ TopicPartition tp = new TopicPartition("topic", 1);
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
+ Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
+ doReturn(offsets).when(subscriptionState).allConsumed();
+
+ CommitRequestManager commitRequestManager = create(true, 100);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(Optional.empty());
+ assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
+ List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
+ pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
+ "topic",
+ 1,
+ (short) 1,
+ Errors.NONE)));
+
+ verify(subscriptionState).allConsumed();
+ verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+ assertTrue(future.isDone());
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
+ assertEquals(offsets, commitOffsets);
+ }
+
+ @Test
+ public void testCommitAsyncWithEmptyAllConsumedOffsets() {
+ subscriptionState = mock(SubscriptionState.class);
+ doReturn(Map.of()).when(subscriptionState).allConsumed();
+
+ CommitRequestManager commitRequestManager = create(true, 100);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(Optional.empty());
+
+ verify(subscriptionState).allConsumed();
+ assertTrue(future.isDone());
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
+ assertTrue(commitOffsets.isEmpty());
+ }
+
// This is the case of the async auto commit sent on calls to assign
(async commit that
// should not be retried).
@Test
@@ -333,7 +470,7 @@ public class CommitRequestManagerTest {
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- CompletableFuture<Void> commitResult =
commitRequestManager.commitSync(offsets, deadlineMs);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManager,
error, commitResult);
// We expect that request should have been retried on this sync commit.
@@ -359,7 +496,7 @@ public class CommitRequestManagerTest {
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- CompletableFuture<Void> commitResult =
commitRequestManager.commitSync(offsets, deadlineMs);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager,
Errors.UNKNOWN_MEMBER_ID);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
@@ -378,8 +515,8 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
// Send commit request expected to be retried on retriable errors
- CompletableFuture<Void> commitResult = commitRequestManager.commitSync(
- offsets, time.milliseconds() + defaultApiTimeoutMs);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(
+ Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
completeOffsetCommitRequestWithError(commitRequestManager,
Errors.STALE_MEMBER_EPOCH);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@@ -431,7 +568,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
// Async commit that won't be retried.
- CompletableFuture<Void> commitResult =
commitRequestManager.commitAsync(offsets);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitAsync(Optional.of(offsets));
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@@ -781,7 +918,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
// Send async commit (not expected to be retried).
- CompletableFuture<Void> commitResult =
commitRequestManager.commitAsync(offsets);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitAsync(Optional.of(offsets));
completeOffsetCommitRequestWithError(commitRequestManager, error);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@@ -806,7 +943,7 @@ public class CommitRequestManagerTest {
// Send sync offset commit request that fails with retriable error.
long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
- CompletableFuture<Void> commitResult =
commitRequestManager.commitSync(offsets, deadlineMs);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager,
Errors.REQUEST_TIMED_OUT);
// Request retried after backoff, and fails with retriable again.
Should not complete yet
@@ -839,7 +976,7 @@ public class CommitRequestManagerTest {
// Send offset commit request that fails with retriable error.
long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
- CompletableFuture<Void> commitResult =
commitRequestManager.commitSync(offsets, deadlineMs);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager, error);
// Sleep to expire the request timeout. Request should fail on the
next poll with a
@@ -869,7 +1006,7 @@ public class CommitRequestManagerTest {
// Send async commit request that fails with retriable error (not
expected to be retried).
Errors retriableError = Errors.COORDINATOR_NOT_AVAILABLE;
- CompletableFuture<Void> commitResult =
commitRequestManager.commitAsync(offsets);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitAsync(Optional.of(offsets));
completeOffsetCommitRequestWithError(commitRequestManager,
retriableError);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@@ -894,7 +1031,7 @@ public class CommitRequestManagerTest {
offsets.put(new TopicPartition("t1", 1), new OffsetAndMetadata(2));
offsets.put(new TopicPartition("t1", 2), new OffsetAndMetadata(3));
- commitRequestManager.commitSync(offsets, time.milliseconds() +
defaultApiTimeoutMs);
+ commitRequestManager.commitSync(Optional.of(offsets),
time.milliseconds() + defaultApiTimeoutMs);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@@ -919,7 +1056,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- commitRequestManager.commitSync(offsets, deadlineMs);
+ commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new
TimeoutException());
@@ -1175,7 +1312,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
long commitCreationTimeMs = time.milliseconds();
- commitRequestManager.commitAsync(offsets);
+ commitRequestManager.commitAsync(Optional.of(offsets));
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@@ -1338,7 +1475,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(offsets);
+ commitRequestManager.commitAsync(Optional.of(offsets));
commitRequestManager.signalClose();
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@@ -1384,7 +1521,7 @@ public class CommitRequestManagerTest {
private void sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(
final CommitRequestManager commitRequestManager,
final Errors error,
- final CompletableFuture<Void> commitResult) {
+ final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitResult) {
completeOffsetCommitRequestWithError(commitRequestManager, error);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@@ -1438,7 +1575,8 @@ public class CommitRequestManagerTest {
retryBackoffMs,
retryBackoffMaxMs,
OptionalDouble.of(0),
- metrics));
+ metrics,
+ metadata));
}
private ClientResponse buildOffsetFetchClientResponse(
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index f1c05d2e6b9..a16f9612c74 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import
org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
@@ -32,6 +33,7 @@ import
org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@@ -45,7 +47,6 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -56,6 +57,7 @@ import java.util.regex.Pattern;
import java.util.stream.Stream;
import static
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -72,6 +74,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+@SuppressWarnings("ClassDataAbstractionCoupling")
public class ApplicationEventProcessorTest {
private final Time time = new MockTime();
private final CommitRequestManager commitRequestManager =
mock(CommitRequestManager.class);
@@ -137,8 +140,6 @@ public class ApplicationEventProcessorTest {
return Stream.of(
Arguments.of(new PollEvent(100)),
Arguments.of(new
CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
- Arguments.of(new AsyncCommitEvent(new HashMap<>())),
- Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
Arguments.of(new CheckAndUpdatePositionsEvent(500)),
Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
Arguments.of(new AssignmentChangeEvent(12345, 12345,
Collections.emptyList())));
@@ -202,14 +203,16 @@ public class ApplicationEventProcessorTest {
@Test
public void testSeekUnvalidatedEvent() {
TopicPartition tp = new TopicPartition("topic", 0);
+ Optional<Integer> offsetEpoch = Optional.of(1);
SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(
- 0, Optional.empty(),
Metadata.LeaderAndEpoch.noLeaderOrEpoch());
- SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0,
Optional.empty());
+ 0, offsetEpoch, Metadata.LeaderAndEpoch.noLeaderOrEpoch());
+ SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0,
offsetEpoch);
setupProcessor(false);
doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp);
doNothing().when(subscriptionState).seekUnvalidated(eq(tp), any());
processor.process(event);
+ verify(metadata).updateLastSeenEpochIfNewer(tp, offsetEpoch.get());
verify(metadata).currentLeader(tp);
verify(subscriptionState).seekUnvalidated(tp, position);
assertDoesNotThrow(() -> event.future().get());
@@ -262,6 +265,27 @@ public class ApplicationEventProcessorTest {
assertDoesNotThrow(() -> event.future().get());
}
+ @Test
+ public void testFetchCommittedOffsetsEvent() {
+ TopicPartition tp0 = new TopicPartition("topic", 0);
+ TopicPartition tp1 = new TopicPartition("topic", 1);
+ TopicPartition tp2 = new TopicPartition("topic", 2);
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2);
+ Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = Map.of(
+ tp0, new OffsetAndMetadata(10L, Optional.of(2), ""),
+ tp1, new OffsetAndMetadata(15L, Optional.empty(), ""),
+ tp2, new OffsetAndMetadata(20L, Optional.of(3), "")
+ );
+ FetchCommittedOffsetsEvent event = new
FetchCommittedOffsetsEvent(partitions, 12345);
+
+ setupProcessor(true);
+ when(commitRequestManager.fetchOffsets(partitions,
12345)).thenReturn(CompletableFuture.completedFuture(topicPartitionOffsets));
+ processor.process(event);
+
+ verify(commitRequestManager).fetchOffsets(partitions, 12345);
+ assertEquals(topicPartitionOffsets, assertDoesNotThrow(() ->
event.future().get()));
+ }
+
@Test
public void testTopicSubscriptionChangeEventWithIllegalSubscriptionState()
{
subscriptionState = new SubscriptionState(new LogContext(),
OffsetResetStrategy.EARLIEST);
@@ -360,6 +384,87 @@ public class ApplicationEventProcessorTest {
assertDoesNotThrow(() -> event2.future().get());
}
+ @ParameterizedTest
+ @MethodSource("offsetsGenerator")
+ public void testSyncCommitEvent(Optional<Map<TopicPartition,
OffsetAndMetadata>> offsets) {
+ SyncCommitEvent event = new SyncCommitEvent(offsets, 12345);
+
+ setupProcessor(true);
+
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets,
12345);
+
+ processor.process(event);
+ verify(commitRequestManager).commitSync(offsets, 12345);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets =
assertDoesNotThrow(() -> event.future().get());
+ assertEquals(offsets.orElse(Map.of()), committedOffsets);
+ }
+
+ @Test
+ public void testSyncCommitEventWithoutCommitRequestManager() {
+ SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345);
+
+ setupProcessor(false);
+ processor.process(event);
+ assertFutureThrows(event.future(), KafkaException.class);
+ }
+
+ @Test
+ public void testSyncCommitEventWithException() {
+ SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345);
+
+ setupProcessor(true);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = new
CompletableFuture<>();
+ future.completeExceptionally(new IllegalStateException());
+ doReturn(future).when(commitRequestManager).commitSync(any(),
anyLong());
+ processor.process(event);
+
+ verify(commitRequestManager).commitSync(Optional.empty(), 12345);
+ assertFutureThrows(event.future(), IllegalStateException.class);
+ }
+
+ @ParameterizedTest
+ @MethodSource("offsetsGenerator")
+ public void testAsyncCommitEventWithOffsets(Optional<Map<TopicPartition,
OffsetAndMetadata>> offsets) {
+ AsyncCommitEvent event = new AsyncCommitEvent(offsets);
+
+ setupProcessor(true);
+
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitAsync(offsets);
+
+ processor.process(event);
+ verify(commitRequestManager).commitAsync(offsets);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets =
assertDoesNotThrow(() -> event.future().get());
+ assertEquals(offsets.orElse(Map.of()), committedOffsets);
+ }
+
+ @Test
+ public void testAsyncCommitEventWithoutCommitRequestManager() {
+ AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
+
+ setupProcessor(false);
+ processor.process(event);
+ assertFutureThrows(event.future(), KafkaException.class);
+ }
+
+ @Test
+ public void testAsyncCommitEventWithException() {
+ AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
+
+ setupProcessor(true);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = new
CompletableFuture<>();
+ future.completeExceptionally(new IllegalStateException());
+ doReturn(future).when(commitRequestManager).commitAsync(any());
+ processor.process(event);
+
+ verify(commitRequestManager).commitAsync(Optional.empty());
+ assertFutureThrows(event.future(), IllegalStateException.class);
+ }
+
+ private static Stream<Arguments> offsetsGenerator() {
+ return Stream.of(
+ Arguments.of(Optional.empty()),
+ Arguments.of(Optional.of(Map.of(new TopicPartition("topic", 0),
new OffsetAndMetadata(10, Optional.of(1), ""))))
+ );
+ }
+
private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}