This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 35a372c71db KAFKA-18641: AsyncKafkaConsumer could lose records with
auto offset commit (#18737)
35a372c71db is described below
commit 35a372c71db1dbe793460e211e6897953cec97d4
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Feb 21 01:11:01 2025 +0800
KAFKA-18641: AsyncKafkaConsumer could lose records with auto offset commit
(#18737)
Reviewers: Lianet Magrans <[email protected]>, Jun Rao
<[email protected]>, Kirk True <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../internals/AbstractMembershipManager.java | 25 +++-
.../consumer/internals/AsyncKafkaConsumer.java | 14 +-
.../consumer/internals/CommitRequestManager.java | 68 +++++----
.../internals/ConsumerMembershipManager.java | 17 ++-
.../consumer/internals/RequestManagers.java | 3 +-
.../consumer/internals/ShareMembershipManager.java | 3 +-
.../events/ApplicationEventProcessor.java | 25 +++-
.../consumer/internals/events/CommitEvent.java | 16 +++
.../consumer/internals/events/PollEvent.java | 24 ++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 45 +++++-
.../internals/CommitRequestManagerTest.java | 159 +++++++--------------
.../internals/ConsumerMembershipManagerTest.java | 117 +++++++--------
.../events/ApplicationEventProcessorTest.java | 82 +++++++----
14 files changed, 357 insertions(+), 243 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7dd8fa7a366..e48b71c4507 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -105,7 +105,7 @@
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
<suppress checks="NPathComplexity"
-
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
+
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 90c2b3a647d..71b61a26d3f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -144,7 +144,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
/**
* If there is a reconciliation running (triggering commit, callbacks) for
the
- * assignmentReadyToReconcile. This will be true if {@link
#maybeReconcile()} has been triggered
+ * assignmentReadyToReconcile. This will be true if {@link
#maybeReconcile(boolean)} has been triggered
* after receiving a heartbeat response, or a metadata update.
*/
private boolean reconciliationInProgress;
@@ -199,12 +199,15 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
*/
private boolean isPollTimerExpired;
+ private final boolean autoCommitEnabled;
+
AbstractMembershipManager(String groupId,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
Logger log,
Time time,
- RebalanceMetricsManager metricsManager) {
+ RebalanceMetricsManager metricsManager,
+ boolean autoCommitEnabled) {
this.groupId = groupId;
this.state = MemberState.UNSUBSCRIBED;
this.subscriptions = subscriptions;
@@ -216,6 +219,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
this.stateUpdatesListeners = new ArrayList<>();
this.time = time;
this.metricsManager = metricsManager;
+ this.autoCommitEnabled = autoCommitEnabled;
}
/**
@@ -791,8 +795,16 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
* - Another reconciliation is already in progress.
* - There are topics that haven't been added to the current assignment
yet, but all their topic IDs
* are missing from the target assignment.
+ *
+ * @param canCommit Controls whether reconciliation can proceed when
auto-commit is enabled.
+ * Set to true only when the current offset positions are
safe to commit.
+ * If false and auto-commit enabled, the reconciliation
will be skipped.
*/
- void maybeReconcile() {
+ public void maybeReconcile(boolean canCommit) {
+ if (state != MemberState.RECONCILING) {
+ return;
+ }
+
if (targetAssignmentReconciled()) {
log.trace("Ignoring reconciliation attempt. Target assignment is
equal to the " +
"current assignment.");
@@ -818,6 +830,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
return;
}
+ if (autoCommitEnabled && !canCommit) return;
markReconciliationInProgress();
// Keep copy of assigned TopicPartitions created from the
TopicIdPartitions that are
@@ -1347,7 +1360,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
/**
* @return If there is a reconciliation in process now. Note that
reconciliation is triggered
- * by a call to {@link #maybeReconcile()}. Visible for testing.
+ * by a call to {@link #maybeReconcile(boolean)}. Visible for testing.
*/
boolean reconciliationInProgress() {
return reconciliationInProgress;
@@ -1383,9 +1396,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
* time-sensitive operations should be performed
*/
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
- if (state == MemberState.RECONCILING) {
- maybeReconcile();
- }
+ maybeReconcile(false);
return NetworkClientDelegate.PollResult.EMPTY;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 3f30acc932c..e19c37c954f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -748,9 +748,14 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
do {
-
+ PollEvent event = new PollEvent(timer.currentTimeMs());
// Make sure to let the background thread know that we are
still polling.
- applicationEventHandler.add(new
PollEvent(timer.currentTimeMs()));
+ // This will trigger async auto-commits of consumed positions
when hitting
+ // the interval time or reconciling new assignments
+ applicationEventHandler.add(event);
+ // Wait for reconciliation and auto-commit to be triggered, to
ensure all commit requests
+ // retrieve the positions to commit before proceeding with
fetching new records
+ ConsumerUtils.getResult(event.reconcileAndAutoCommit(),
defaultApiTimeoutMs.toMillis());
// We must not allow wake-ups between polling for fetches and
returning the records.
// If the polled fetches are not empty the consumed position
has already been updated in the polling
@@ -818,7 +823,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
try {
AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
lastPendingAsyncCommit =
commit(asyncCommitEvent).whenComplete((committedOffsets, throwable) -> {
-
if (throwable == null) {
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(committedOffsets);
}
@@ -846,6 +850,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
applicationEventHandler.add(commitEvent);
+
+ // This blocks until the background thread retrieves allConsumed
positions to commit if none were explicitly specified.
+ // This operation will ensure that the offsets to commit are not
affected by fetches which may start after this
+ ConsumerUtils.getResult(commitEvent.offsetsReady(),
defaultApiTimeoutMs.toMillis());
return commitEvent.future();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 1d3503886a9..284707a812b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -171,8 +171,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
}
/**
- * Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest}
request if there's any. The function will
- * also try to autocommit the offsets, if feature is enabled.
+ * Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest}
request if there's any.
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
@@ -186,7 +185,6 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
return drainPendingOffsetCommitRequests();
}
- maybeAutoCommitAsync();
if (!pendingRequests.hasUnsentRequests())
return EMPTY;
@@ -264,7 +262,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
* In that case, the next auto-commit request will be sent on the next
call to poll, after a
* response for the in-flight is received.
*/
- public void maybeAutoCommitAsync() {
+ private void maybeAutoCommitAsync() {
if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
OffsetCommitRequestState requestState = createOffsetCommitRequest(
subscriptions.allConsumed(),
@@ -298,7 +296,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
/**
* Commit consumed offsets if auto-commit is enabled, regardless of the
auto-commit interval.
- * This is used for committing offsets before revoking partitions. This
will retry committing
+ * This is used for committing offsets before rebalance. This will retry
committing
* the latest offsets until the request succeeds, fails with a fatal
error, or the timeout
* expires. Note that:
* <ul>
@@ -306,18 +304,18 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
* including the member ID and latest member epoch received from the
broker.</li>
* <li>Considers {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} as a fatal
error, and will not
* retry it although the error extends RetriableException. The reason
is that if a topic
- * or partition is deleted, revocation would not finish in time since
the auto commit would keep retrying.</li>
+ * or partition is deleted, rebalance would not finish in time since
the auto commit would keep retrying.</li>
* </ul>
*
* Also note that this will generate a commit request even if there is
another one in-flight,
* generated by the auto-commit on the interval logic, to ensure that the
latest offsets are
- * committed before revoking partitions.
+ * committed before rebalance.
*
* @return Future that will complete when the offsets are successfully
committed. It will
* complete exceptionally if the commit fails with a non-retriable error,
or if the retry
* timeout expires.
*/
- public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(final
long deadlineMs) {
+ public CompletableFuture<Void> maybeAutoCommitSyncBeforeRebalance(final
long deadlineMs) {
if (!autoCommitEnabled()) {
return CompletableFuture.completedFuture(null);
}
@@ -325,12 +323,12 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
CompletableFuture<Void> result = new CompletableFuture<>();
OffsetCommitRequestState requestState =
createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs);
- autoCommitSyncBeforeRevocationWithRetries(requestState, result);
+ autoCommitSyncBeforeRebalanceWithRetries(requestState, result);
return result;
}
- private void
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
requestAttempt,
-
CompletableFuture<Void> result) {
+ private void
autoCommitSyncBeforeRebalanceWithRetries(OffsetCommitRequestState
requestAttempt,
+
CompletableFuture<Void> result) {
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitAttempt = requestAutoCommit(requestAttempt);
commitAttempt.whenComplete((committedOffsets, error) -> {
if (error == null) {
@@ -338,10 +336,10 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
} else {
if (error instanceof RetriableException ||
isStaleEpochErrorAndValidEpochAvailable(error)) {
if (requestAttempt.isExpired()) {
- log.debug("Auto-commit sync before revocation timed
out and won't be retried anymore");
+ log.debug("Auto-commit sync before rebalance timed out
and won't be retried anymore");
result.completeExceptionally(maybeWrapAsTimeoutException(error));
} else if (error instanceof
UnknownTopicOrPartitionException) {
- log.debug("Auto-commit sync before revocation failed
because topic or partition were deleted");
+ log.debug("Auto-commit sync before rebalance failed
because topic or partition were deleted");
result.completeExceptionally(error);
} else {
// Make sure the auto-commit is retried with the
latest offsets
@@ -350,10 +348,10 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
error.getMessage());
requestAttempt.offsets = subscriptions.allConsumed();
requestAttempt.resetFuture();
-
autoCommitSyncBeforeRevocationWithRetries(requestAttempt, result);
+
autoCommitSyncBeforeRebalanceWithRetries(requestAttempt, result);
}
} else {
- log.debug("Auto-commit sync before revocation failed with
non-retriable error", error);
+ log.debug("Auto-commit sync before rebalance failed with
non-retriable error", error);
result.completeExceptionally(error);
}
}
@@ -388,14 +386,13 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
* exceptionally depending on the response. If the request fails with a
retriable error, the
* future will be completed with a {@link RetriableCommitFailedException}.
*/
- public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
- Map<TopicPartition, OffsetAndMetadata> commitOffsets =
offsets.orElseGet(subscriptions::allConsumed);
- if (commitOffsets.isEmpty()) {
+ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
+ if (offsets.isEmpty()) {
log.debug("Skipping commit of empty offsets");
return CompletableFuture.completedFuture(Map.of());
}
- maybeUpdateLastSeenEpochIfNewer(commitOffsets);
- OffsetCommitRequestState commitRequest =
createOffsetCommitRequest(commitOffsets, Long.MAX_VALUE);
+ maybeUpdateLastSeenEpochIfNewer(offsets);
+ OffsetCommitRequestState commitRequest =
createOffsetCommitRequest(offsets, Long.MAX_VALUE);
pendingRequests.addOffsetCommitRequest(commitRequest);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
asyncCommitResult = new CompletableFuture<>();
@@ -403,7 +400,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
if (error != null) {
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
} else {
- asyncCommitResult.complete(commitOffsets);
+ asyncCommitResult.complete(offsets);
}
});
return asyncCommitResult;
@@ -417,15 +414,14 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
* an expected retriable error.
* @return Future that will complete when a successful response
*/
- public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitSync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets,
+ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets,
final long deadlineMs) {
- Map<TopicPartition, OffsetAndMetadata> commitOffsets =
offsets.orElseGet(subscriptions::allConsumed);
- if (commitOffsets.isEmpty()) {
+ if (offsets.isEmpty()) {
return CompletableFuture.completedFuture(Map.of());
}
- maybeUpdateLastSeenEpochIfNewer(commitOffsets);
+ maybeUpdateLastSeenEpochIfNewer(offsets);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new
CompletableFuture<>();
- OffsetCommitRequestState requestState =
createOffsetCommitRequest(commitOffsets, deadlineMs);
+ OffsetCommitRequestState requestState =
createOffsetCommitRequest(offsets, deadlineMs);
commitSyncWithRetries(requestState, result);
return result;
}
@@ -566,7 +562,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
return error instanceof StaleMemberEpochException &&
memberInfo.memberEpoch.isPresent();
}
- public void updateAutoCommitTimer(final long currentTimeMs) {
+ private void updateAutoCommitTimer(final long currentTimeMs) {
this.autoCommitState.ifPresent(t -> t.updateTimer(currentTimeMs));
}
@@ -637,6 +633,24 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
});
}
+ /**
+ * This is a non-blocking method to update timer and trigger async
auto-commit.
+ * <p>
+ * This method performs two main tasks:
+ * <ol>
+ * <li>Updates the internal timer with the current time.</li>
+ * <li>Initiate an asynchronous auto-commit operation for all consumed
positions if needed.</li>
+ * </ol>
+ *
+ * @param currentTimeMs the current timestamp in millisecond
+ * @see CommitRequestManager#updateAutoCommitTimer(long)
+ * @see CommitRequestManager#maybeAutoCommitAsync()
+ */
+ public void updateTimerAndMaybeCommit(final long currentTimeMs) {
+ updateAutoCommitTimer(currentTimeMs);
+ maybeAutoCommitAsync();
+ }
+
class OffsetCommitRequestState extends RetriableRequestState {
private Map<TopicPartition, OffsetAndMetadata> offsets;
private final String groupId;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index f77295e8aef..57d6c21e48b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -123,8 +123,7 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
private final Optional<String> serverAssignor;
/**
- * Manager to perform commit requests needed before revoking partitions
(if auto-commit is
- * enabled)
+ * Manager to perform commit requests needed before rebalance (if
auto-commit is enabled)
*/
private final CommitRequestManager commitRequestManager;
@@ -145,7 +144,8 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
LogContext logContext,
BackgroundEventHandler
backgroundEventHandler,
Time time,
- Metrics metrics) {
+ Metrics metrics,
+ boolean autoCommitEnabled) {
this(groupId,
groupInstanceId,
rebalanceTimeoutMs,
@@ -156,7 +156,8 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
logContext,
backgroundEventHandler,
time,
- new ConsumerRebalanceMetricsManager(metrics));
+ new ConsumerRebalanceMetricsManager(metrics),
+ autoCommitEnabled);
}
// Visible for testing
@@ -170,13 +171,15 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
LogContext logContext,
BackgroundEventHandler backgroundEventHandler,
Time time,
- RebalanceMetricsManager metricsManager) {
+ RebalanceMetricsManager metricsManager,
+ boolean autoCommitEnabled) {
super(groupId,
subscriptions,
metadata,
logContext.logger(ConsumerMembershipManager.class),
time,
- metricsManager);
+ metricsManager,
+ autoCommitEnabled);
this.groupInstanceId = groupInstanceId;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.serverAssignor = serverAssignor;
@@ -252,7 +255,7 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
// best effort to commit the offsets in the case where the epoch might
have changed while
// the current reconciliation is in process. Note this is using the
rebalance timeout as
// it is the limit enforced by the broker to complete the
reconciliation process.
- return
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));
+ return
commitRequestManager.maybeAutoCommitSyncBeforeRebalance(getDeadlineMsForTimeout(rebalanceTimeoutMs));
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 960567ac96b..f7446b7ad3c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -217,7 +217,8 @@ public class RequestManagers implements Closeable {
logContext,
backgroundEventHandler,
time,
- metrics);
+ metrics,
+
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
// Update the group member ID label in the client
telemetry reporter.
// According to KIP-1082, the consumer will generate the
member ID as the incarnation ID of the process.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
index 3e3ea246748..d7944466130 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
@@ -106,7 +106,8 @@ public class ShareMembershipManager extends
AbstractMembershipManager<ShareGroup
metadata,
logContext.logger(ShareMembershipManager.class),
time,
- metricsManager);
+ metricsManager,
+ false);
this.rackId = rackId;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 69cc0072a39..08661ef7581 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -206,13 +206,25 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
private void process(final PollEvent event) {
+ // Trigger a reconciliation that can safely commit offsets if needed
to rebalance,
+ // as we're processing before any new fetching starts in the app thread
+
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+ consumerMembershipManager.maybeReconcile(true));
if (requestManagers.commitRequestManager.isPresent()) {
- requestManagers.commitRequestManager.ifPresent(m ->
m.updateAutoCommitTimer(event.pollTimeMs()));
+ CommitRequestManager commitRequestManager =
requestManagers.commitRequestManager.get();
+ commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
+ // all commit request generation points have been passed,
+ // so it's safe to notify the app thread could proceed and start
fetching
+ event.markReconcileAndAutoCommitComplete();
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
} else {
+ // safe to unblock - no auto-commit risk here:
+ // 1. commitRequestManager is not present
+ // 2. shareConsumer has no auto-commit mechanism
+ event.markReconcileAndAutoCommitComplete();
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
@@ -234,7 +246,9 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
try {
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitAsync(event.offsets());
+ Map<TopicPartition, OffsetAndMetadata> offsets =
event.offsets().orElseGet(subscriptions::allConsumed);
+ event.markOffsetsReady();
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitAsync(offsets);
future.whenComplete(complete(event.future()));
} catch (Exception e) {
event.future().completeExceptionally(e);
@@ -250,7 +264,9 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
try {
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitSync(event.offsets(), event.deadlineMs());
+ Map<TopicPartition, OffsetAndMetadata> offsets =
event.offsets().orElseGet(subscriptions::allConsumed);
+ event.markOffsetsReady();
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitSync(offsets, event.deadlineMs());
future.whenComplete(complete(event.future()));
} catch (Exception e) {
event.future().completeExceptionally(e);
@@ -275,8 +291,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
private void process(final AssignmentChangeEvent event) {
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- manager.updateAutoCommitTimer(event.currentTimeMs());
- manager.maybeAutoCommitAsync();
+ manager.updateTimerAndMaybeCommit(event.currentTimeMs());
}
log.info("Assigned to partition(s): {}",
event.partitions().stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
index a05ce638ece..9afe3a383bb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
public abstract class CommitEvent extends
CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
@@ -30,6 +31,13 @@ public abstract class CommitEvent extends
CompletableApplicationEvent<Map<TopicP
*/
private final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets;
+ /**
+ * Future that completes when allConsumed offsets have been calculated.
+ * The app thread waits for this future before returning control to ensure
+ * the offsets to be committed are up-to-date.
+ */
+ protected final CompletableFuture<Void> offsetsReady = new
CompletableFuture<>();
+
protected CommitEvent(final Type type, final Optional<Map<TopicPartition,
OffsetAndMetadata>> offsets, final long deadlineMs) {
super(type, deadlineMs);
this.offsets = validate(offsets);
@@ -57,6 +65,14 @@ public abstract class CommitEvent extends
CompletableApplicationEvent<Map<TopicP
return offsets;
}
+ public CompletableFuture<Void> offsetsReady() {
+ return offsetsReady;
+ }
+
+ public void markOffsetsReady() {
+ offsetsReady.complete(null);
+ }
+
@Override
protected String toStringBase() {
return super.toStringBase() + ", offsets=" + offsets;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
index 96614c06e9b..37df5d9ddc2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
@@ -16,10 +16,26 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
+import java.util.concurrent.CompletableFuture;
+
public class PollEvent extends ApplicationEvent {
private final long pollTimeMs;
+ /**
+ * A future that represents the completion of reconciliation and
auto-commit
+ * processing.
+ * This future is completed when all commit request generation points have
+ * been passed, including:
+ * <ul>
+ * <li>auto-commit on rebalance</li>
+ * <li>auto-commit on the interval</li>
+ * </ul>
+ * Once completed, it signals that it's safe for the consumer to proceed
with
+ * fetching new records.
+ */
+ private final CompletableFuture<Void> reconcileAndAutoCommit = new
CompletableFuture<>();
+
public PollEvent(final long pollTimeMs) {
super(Type.POLL);
this.pollTimeMs = pollTimeMs;
@@ -29,6 +45,14 @@ public class PollEvent extends ApplicationEvent {
return pollTimeMs;
}
+ public CompletableFuture<Void> reconcileAndAutoCommit() {
+ return reconcileAndAutoCommit;
+ }
+
+ public void markReconcileAndAutoCommitComplete() {
+ reconcileAndAutoCommit.complete(null);
+ }
+
@Override
public String toStringBase() {
return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 58ec7a334dc..cb03d585ba6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -37,6 +37,7 @@ import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
@@ -293,6 +294,7 @@ public class AsyncKafkaConsumerTest {
offsets.put(t0, new OffsetAndMetadata(10L));
offsets.put(t1, new OffsetAndMetadata(20L));
+ markOffsetsReadyForCommitEvent();
consumer.commitAsync(offsets, null);
final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
@@ -394,6 +396,7 @@ public class AsyncKafkaConsumerTest {
consumer.wakeup();
+ markReconcileAndAutoCommitCompleteForPollEvent();
assertThrows(WakeupException.class, () ->
consumer.poll(Duration.ZERO));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@@ -413,6 +416,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
+ markReconcileAndAutoCommitCompleteForPollEvent();
assertThrows(WakeupException.class, () ->
consumer.poll(Duration.ofMinutes(1)));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@@ -436,6 +440,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
+ markReconcileAndAutoCommitCompleteForPollEvent();
// since wakeup() is called when the non-empty fetch is returned the
wakeup should be ignored
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
// the previously ignored wake-up should not be ignored in the next
call
@@ -472,6 +477,7 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList(topicName), listener);
+ markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
assertTrue(callbackExecuted.get());
}
@@ -493,6 +499,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
+ markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
@@ -584,6 +591,7 @@ public class AsyncKafkaConsumerTest {
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
+ markOffsetsReadyForCommitEvent();
consumer.commitAsync();
CompletableApplicationEvent<Void> event = getLastEnqueuedEvent();
@@ -617,6 +625,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(),
callback));
+ markReconcileAndAutoCommitCompleteForPollEvent();
assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO),
callback,
null);
@@ -732,6 +741,7 @@ public class AsyncKafkaConsumerTest {
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
+ markOffsetsReadyForCommitEvent();
consumer.commitSyncAllConsumed(time.timer(100));
ArgumentCaptor<SyncCommitEvent> eventCaptor =
ArgumentCaptor.forClass(SyncCommitEvent.class);
@@ -1026,6 +1036,7 @@ public class AsyncKafkaConsumerTest {
ApplicationEvent event = invocation.getArgument(0);
if (event instanceof SyncCommitEvent) {
capturedEvent.set((SyncCommitEvent) event);
+ ((SyncCommitEvent) event).markOffsetsReady();
}
return null;
}).when(applicationEventHandler).add(any());
@@ -1050,7 +1061,9 @@ public class AsyncKafkaConsumerTest {
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
+ markOffsetsReadyForCommitEvent();
consumer.commitAsync();
+
Exception e = assertThrows(KafkaException.class, () ->
consumer.close(Duration.ofMillis(10)));
assertInstanceOf(TimeoutException.class, e.getCause());
}
@@ -1391,6 +1404,7 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(e);
}
+ markReconcileAndAutoCommitCompleteForPollEvent();
// This will trigger the background event queue to process our
background event message.
// If any error is happening inside the rebalance callbacks, we expect
the first exception to be thrown from poll.
if (expectedException.isPresent()) {
@@ -1460,6 +1474,7 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(errorEvent);
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
+ markReconcileAndAutoCommitCompleteForPollEvent();
final KafkaException exception = assertThrows(KafkaException.class, ()
-> consumer.poll(Duration.ZERO));
assertEquals(expectedException.getMessage(), exception.getMessage());
@@ -1478,6 +1493,7 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(errorEvent2);
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
+ markReconcileAndAutoCommitCompleteForPollEvent();
final KafkaException exception = assertThrows(KafkaException.class, ()
-> consumer.poll(Duration.ZERO));
assertEquals(expectedException1.getMessage(), exception.getMessage());
@@ -1510,6 +1526,7 @@ public class AsyncKafkaConsumerTest {
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
@@ -1533,6 +1550,7 @@ public class AsyncKafkaConsumerTest {
final Properties props =
requiredConsumerConfigAndGroupId("consumerGroupA");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
@@ -1561,6 +1579,7 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singletonList("topic1"));
+ markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(PollEvent.class));
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
@@ -1579,7 +1598,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(new TopicPartition("t1", 1)));
-
+ markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
verify(applicationEventHandler, atLeast(1))
@@ -1616,6 +1635,7 @@ public class AsyncKafkaConsumerTest {
).when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
+ markReconcileAndAutoCommitCompleteForPollEvent();
// And then poll for up to 10000ms, which should return 2 records
without timing out
ConsumerRecords<?, ?> returnedRecords =
consumer.poll(Duration.ofMillis(10000));
assertEquals(2, returnedRecords.count());
@@ -1719,6 +1739,7 @@ public class AsyncKafkaConsumerTest {
// interrupt the thread and call poll
try {
Thread.currentThread().interrupt();
+ markReconcileAndAutoCommitCompleteForPollEvent();
assertThrows(InterruptException.class, () ->
consumer.poll(Duration.ZERO));
} finally {
// clear interrupted state again since this thread may be reused
by JUnit
@@ -1750,6 +1771,7 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList("topic"));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
+ markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
verify(backgroundEventReaper).reap(time.milliseconds());
}
@@ -1812,6 +1834,7 @@ public class AsyncKafkaConsumerTest {
completeUnsubscribeApplicationEventSuccessfully();
consumer.assign(singleton(new TopicPartition("topic1", 0)));
+ markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
verify(applicationEventHandler,
never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
@@ -1974,6 +1997,7 @@ public class AsyncKafkaConsumerTest {
private void completeCommitAsyncApplicationEventExceptionally(Exception
ex) {
doAnswer(invocation -> {
AsyncCommitEvent event = invocation.getArgument(0);
+ event.markOffsetsReady();
event.future().completeExceptionally(ex);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
@@ -1982,6 +2006,7 @@ public class AsyncKafkaConsumerTest {
private void completeCommitSyncApplicationEventExceptionally(Exception ex)
{
doAnswer(invocation -> {
SyncCommitEvent event = invocation.getArgument(0);
+ event.markOffsetsReady();
event.future().completeExceptionally(ex);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
@@ -1994,6 +2019,7 @@ public class AsyncKafkaConsumerTest {
private void completeCommitAsyncApplicationEventSuccessfully() {
doAnswer(invocation -> {
AsyncCommitEvent event = invocation.getArgument(0);
+ event.markOffsetsReady();
event.future().complete(null);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
@@ -2002,6 +2028,7 @@ public class AsyncKafkaConsumerTest {
private void completeCommitSyncApplicationEventSuccessfully() {
doAnswer(invocation -> {
SyncCommitEvent event = invocation.getArgument(0);
+ event.markOffsetsReady();
event.future().complete(null);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
@@ -2089,4 +2116,20 @@ public class AsyncKafkaConsumerTest {
// Invokes callback
consumer.commitAsync();
}
+
+ private void markOffsetsReadyForCommitEvent() {
+ doAnswer(invocation -> {
+ CommitEvent event = invocation.getArgument(0);
+ event.markOffsetsReady();
+ return null;
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitEvent.class));
+ }
+
+ private void markReconcileAndAutoCommitCompleteForPollEvent() {
+ doAnswer(invocation -> {
+ PollEvent event = invocation.getArgument(0);
+ event.markReconcileAndAutoCommitComplete();
+ return null;
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class));
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 32b8aea5b22..f44b285136a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -84,10 +84,10 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -179,7 +179,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(Optional.of(offsets));
+ commitRequestManager.commitAsync(offsets);
assertPoll(false, 0, commitRequestManager);
}
@@ -190,7 +190,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(Optional.of(offsets));
+ commitRequestManager.commitAsync(offsets);
assertPoll(false, 0, commitRequestManager);
assertPoll(true, 1, commitRequestManager);
}
@@ -202,7 +202,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(Optional.of(offsets));
+ commitRequestManager.commitAsync(offsets);
assertPoll(1, commitRequestManager);
}
@@ -214,9 +214,9 @@ public class CommitRequestManagerTest {
CommitRequestManager commitRequestManager = create(true, 100);
assertPoll(0, commitRequestManager);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
"t1",
@@ -243,9 +243,9 @@ public class CommitRequestManagerTest {
// Add the requests to the CommitRequestManager and store their futures
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- commitManager.commitSync(Optional.of(offsets1), deadlineMs);
+ commitManager.commitSync(offsets1, deadlineMs);
commitManager.fetchOffsets(Collections.singleton(new
TopicPartition("test", 0)), deadlineMs);
- commitManager.commitSync(Optional.of(offsets2), deadlineMs);
+ commitManager.commitSync(offsets2, deadlineMs);
commitManager.fetchOffsets(Collections.singleton(new
TopicPartition("test", 1)), deadlineMs);
// Poll the CommitRequestManager and verify that the
inflightOffsetFetches size is correct
@@ -278,7 +278,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(Optional.of(offsets));
+ commitRequestManager.commitAsync(offsets);
assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
assertEquals(1,
commitRequestManager.poll(time.milliseconds()).unsentRequests.size());
assertTrue(commitRequestManager.unsentOffsetCommitRequests().isEmpty());
@@ -295,7 +295,7 @@ public class CommitRequestManagerTest {
CommitRequestManager commitRequestManager = create(false, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitSync(
- Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
+ offsets, time.milliseconds() + defaultApiTimeoutMs);
assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
@@ -314,43 +314,17 @@ public class CommitRequestManagerTest {
@Test
public void testCommitSyncWithEmptyOffsets() {
subscriptionState = mock(SubscriptionState.class);
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
- TopicPartition tp = new TopicPartition("topic", 1);
- OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
- Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
- doReturn(offsets).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(false, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitSync(
- Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
- assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
- List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
- pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
- "topic",
- 1,
- (short) 1,
- Errors.NONE)));
-
- verify(subscriptionState).allConsumed();
- verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
- Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
+ Collections.emptyMap(), time.milliseconds() + defaultApiTimeoutMs);
assertTrue(future.isDone());
- assertEquals(offsets, commitOffsets);
- }
-
- @Test
- public void testCommitSyncWithEmptyAllConsumedOffsets() {
- subscriptionState = mock(SubscriptionState.class);
- doReturn(Map.of()).when(subscriptionState).allConsumed();
-
- CommitRequestManager commitRequestManager = create(true, 100);
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitSync(
- Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
+ assertEquals(0,
commitRequestManager.unsentOffsetCommitRequests().size());
+ assertPoll(0, commitRequestManager);
- verify(subscriptionState).allConsumed();
+ verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
- assertTrue(future.isDone());
- assertTrue(commitOffsets.isEmpty());
+ assertEquals(Collections.emptyMap(), commitOffsets);
}
@Test
@@ -362,7 +336,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
CommitRequestManager commitRequestManager = create(true, 100);
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(Optional.of(offsets));
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(offsets);
assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
@@ -381,39 +355,12 @@ public class CommitRequestManagerTest {
@Test
public void testCommitAsyncWithEmptyOffsets() {
subscriptionState = mock(SubscriptionState.class);
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
- TopicPartition tp = new TopicPartition("topic", 1);
- OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
- Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
- doReturn(offsets).when(subscriptionState).allConsumed();
-
- CommitRequestManager commitRequestManager = create(true, 100);
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(Optional.empty());
- assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
- List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
- pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
- "topic",
- 1,
- (short) 1,
- Errors.NONE)));
-
- verify(subscriptionState).allConsumed();
- verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
- assertTrue(future.isDone());
- Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
- assertEquals(offsets, commitOffsets);
- }
-
- @Test
- public void testCommitAsyncWithEmptyAllConsumedOffsets() {
- subscriptionState = mock(SubscriptionState.class);
- doReturn(Map.of()).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(true, 100);
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(Optional.empty());
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(Collections.emptyMap());
- verify(subscriptionState).allConsumed();
assertTrue(future.isDone());
+ assertPoll(0, commitRequestManager);
Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
assertTrue(commitOffsets.isEmpty());
}
@@ -428,7 +375,7 @@ public class CommitRequestManagerTest {
subscriptionState.assignFromUser(Collections.singleton(tp));
subscriptionState.seek(tp, 100);
time.sleep(commitInterval);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> futures =
assertPoll(1, commitRequestManager);
// Complete the autocommit request exceptionally. It should fail right
away, without retry.
futures.get(0).onComplete(mockOffsetCommitResponse(
@@ -441,14 +388,14 @@ public class CommitRequestManagerTest {
// (making sure we wait for the backoff, to check that the failed
request is not being
// retried).
time.sleep(retryBackoffMs);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertPoll(0, commitRequestManager);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
// Only when polling after the auto-commit interval, a new auto-commit
request should be
// generated.
time.sleep(commitInterval);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
futures = assertPoll(1, commitRequestManager);
assertEmptyPendingRequests(commitRequestManager);
futures.get(0).onComplete(mockOffsetCommitResponse(
@@ -470,7 +417,7 @@ public class CommitRequestManagerTest {
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(offsets, deadlineMs);
sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManager,
error, commitResult);
// We expect that request should have been retried on this sync commit.
@@ -496,7 +443,7 @@ public class CommitRequestManagerTest {
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(offsets, deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager,
Errors.UNKNOWN_MEMBER_ID);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
@@ -516,7 +463,7 @@ public class CommitRequestManagerTest {
// Send commit request expected to be retried on retriable errors
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(
- Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
+ offsets, time.milliseconds() + defaultApiTimeoutMs);
completeOffsetCommitRequestWithError(commitRequestManager,
Errors.STALE_MEMBER_EPOCH);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@@ -535,7 +482,6 @@ public class CommitRequestManagerTest {
public void
testAutoCommitAsyncFailsWithStaleMemberEpochContinuesToCommitOnTheInterval() {
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
TopicPartition t1p = new TopicPartition("topic1", 0);
subscriptionState.assignFromUser(singleton(t1p));
@@ -543,7 +489,7 @@ public class CommitRequestManagerTest {
// Async commit on the interval fails with fatal stale epoch and just
resets the timer to
// the interval
- commitRequestManager.maybeAutoCommitAsync();
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
completeOffsetCommitRequestWithError(commitRequestManager,
Errors.STALE_MEMBER_EPOCH);
verify(commitRequestManager).resetAutoCommitTimer();
@@ -552,7 +498,7 @@ public class CommitRequestManagerTest {
assertEquals(0, res.unsentRequests.size(), "No request should be
generated until the " +
"interval expires");
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@@ -568,7 +514,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
// Async commit that won't be retried.
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitAsync(Optional.of(offsets));
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitAsync(offsets);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@@ -590,11 +536,11 @@ public class CommitRequestManagerTest {
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> futures =
assertPoll(1, commitRequestManager);
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
// We want to make sure we don't resend autocommit if the previous
request has not been
// completed, even if the interval expired
assertPoll(0, commitRequestManager);
@@ -602,6 +548,7 @@ public class CommitRequestManagerTest {
// complete the unsent request and re-poll
futures.get(0).onComplete(buildOffsetCommitClientResponse(new
OffsetCommitResponse(0, new HashMap<>())));
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertPoll(1, commitRequestManager);
}
@@ -615,7 +562,7 @@ public class CommitRequestManagerTest {
// Send auto-commit request on the interval.
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
NetworkClientDelegate.FutureCompletionHandler autoCommitOnInterval =
@@ -624,7 +571,7 @@ public class CommitRequestManagerTest {
// Another auto-commit request should be sent if a revocation happens,
even if an
// auto-commit on the interval is in-flight.
CompletableFuture<Void> autoCommitBeforeRevocation =
- commitRequestManager.maybeAutoCommitSyncBeforeRevocation(200);
+ commitRequestManager.maybeAutoCommitSyncBeforeRebalance(200);
assertEquals(1,
commitRequestManager.pendingRequests.unsentOffsetCommits.size());
// Receive response for initial auto-commit on interval
@@ -641,7 +588,7 @@ public class CommitRequestManagerTest {
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> futures =
assertPoll(1, commitRequestManager);
// complete the unsent request to trigger interceptor
@@ -659,7 +606,7 @@ public class CommitRequestManagerTest {
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> futures =
assertPoll(1, commitRequestManager);
// complete the unsent request to trigger interceptor
@@ -673,8 +620,7 @@ public class CommitRequestManagerTest {
public void testAutoCommitEmptyOffsetsDoesNotGenerateRequest() {
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
- commitRequestManager.maybeAutoCommitAsync();
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
verify(commitRequestManager).resetAutoCommitTimer();
}
@@ -687,15 +633,13 @@ public class CommitRequestManagerTest {
// Auto-commit of empty offsets
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
- commitRequestManager.maybeAutoCommitAsync();
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
// Next auto-commit consumed offsets (not empty). Should generate a
request, ensuring
// that the previous auto-commit of empty did not leave the inflight
request flag on
subscriptionState.seek(t1p, 100);
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
- commitRequestManager.maybeAutoCommitAsync();
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertEquals(1,
commitRequestManager.pendingRequests.unsentOffsetCommits.size());
verify(commitRequestManager, times(2)).resetAutoCommitTimer();
@@ -711,8 +655,7 @@ public class CommitRequestManagerTest {
// Send auto-commit request that will remain in-flight without a
response
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
- commitRequestManager.maybeAutoCommitAsync();
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> futures =
assertPoll(1, commitRequestManager);
assertEquals(1, futures.size());
NetworkClientDelegate.FutureCompletionHandler inflightCommitResult =
futures.get(0);
@@ -723,8 +666,7 @@ public class CommitRequestManagerTest {
// should not be reset either, to ensure that the next auto-commit is
sent out as soon as
// the inflight receives a response.
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
- commitRequestManager.maybeAutoCommitAsync();
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertPoll(0, commitRequestManager);
verify(commitRequestManager, never()).resetAutoCommitTimer();
@@ -732,6 +674,7 @@ public class CommitRequestManagerTest {
// polling the manager.
inflightCommitResult.onComplete(
mockOffsetCommitResponse(t1p.topic(), t1p.partition(), (short) 1,
Errors.NONE));
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertPoll(1, commitRequestManager);
}
@@ -918,7 +861,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
// Send async commit (not expected to be retried).
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitAsync(Optional.of(offsets));
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitAsync(offsets);
completeOffsetCommitRequestWithError(commitRequestManager, error);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@@ -943,7 +886,7 @@ public class CommitRequestManagerTest {
// Send sync offset commit request that fails with retriable error.
long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(offsets, deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager,
Errors.REQUEST_TIMED_OUT);
// Request retried after backoff, and fails with retriable again.
Should not complete yet
@@ -976,7 +919,7 @@ public class CommitRequestManagerTest {
// Send offset commit request that fails with retriable error.
long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitSync(offsets, deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager, error);
// Sleep to expire the request timeout. Request should fail on the
next poll with a
@@ -1006,7 +949,7 @@ public class CommitRequestManagerTest {
// Send async commit request that fails with retriable error (not
expected to be retried).
Errors retriableError = Errors.COORDINATOR_NOT_AVAILABLE;
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitAsync(Optional.of(offsets));
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult
= commitRequestManager.commitAsync(offsets);
completeOffsetCommitRequestWithError(commitRequestManager,
retriableError);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@@ -1031,7 +974,7 @@ public class CommitRequestManagerTest {
offsets.put(new TopicPartition("t1", 1), new OffsetAndMetadata(2));
offsets.put(new TopicPartition("t1", 2), new OffsetAndMetadata(3));
- commitRequestManager.commitSync(Optional.of(offsets),
time.milliseconds() + defaultApiTimeoutMs);
+ commitRequestManager.commitSync(offsets, time.milliseconds() +
defaultApiTimeoutMs);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@@ -1056,7 +999,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
+ commitRequestManager.commitSync(offsets, deadlineMs);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new
TimeoutException());
@@ -1212,7 +1155,7 @@ public class CommitRequestManagerTest {
long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
// Send commit request expected to be retried on STALE_MEMBER_EPOCH
error while it does not expire
- commitRequestManager.maybeAutoCommitSyncBeforeRevocation(deadlineMs);
+ commitRequestManager.maybeAutoCommitSyncBeforeRebalance(deadlineMs);
int newEpoch = 8;
String memberId = "member1";
@@ -1266,7 +1209,7 @@ public class CommitRequestManagerTest {
// Send auto commit to revoke partitions, expected to be retried on
STALE_MEMBER_EPOCH
// with the latest epochs received (using long deadline to avoid
expiring the request
// while retrying with the new epochs)
-
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(Long.MAX_VALUE);
+
commitRequestManager.maybeAutoCommitSyncBeforeRebalance(Long.MAX_VALUE);
int initialEpoch = 1;
String memberId = "member1";
@@ -1312,7 +1255,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
long commitCreationTimeMs = time.milliseconds();
- commitRequestManager.commitAsync(Optional.of(offsets));
+ commitRequestManager.commitAsync(offsets);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@@ -1475,7 +1418,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(Optional.of(offsets));
+ commitRequestManager.commitAsync(offsets);
commitRequestManager.signalClose();
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index d42d81d7ce4..3a93c25072d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -126,7 +126,7 @@ public class ConsumerMembershipManagerTest {
metrics = new Metrics(time);
rebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics);
-
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
+
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
}
private ConsumerMembershipManager createMembershipManagerJoiningGroup() {
@@ -143,7 +143,7 @@ public class ConsumerMembershipManagerTest {
ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT,
Optional.empty(),
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
- backgroundEventHandler, time, rebalanceMetricsManager));
+ backgroundEventHandler, time, rebalanceMetricsManager, true));
assertMemberIdIsGenerated(manager.memberId());
return manager;
}
@@ -153,7 +153,7 @@ public class ConsumerMembershipManagerTest {
ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
GROUP_ID, Optional.ofNullable(groupInstanceId),
REBALANCE_TIMEOUT,
Optional.ofNullable(serverAssignor), subscriptionState,
commitRequestManager,
- metadata, LOG_CONTEXT, backgroundEventHandler, time,
rebalanceMetricsManager));
+ metadata, LOG_CONTEXT, backgroundEventHandler, time,
rebalanceMetricsManager, true));
assertMemberIdIsGenerated(manager.memberId());
manager.transitionToJoining();
return manager;
@@ -232,7 +232,7 @@ public class ConsumerMembershipManagerTest {
ConsumerMembershipManager membershipManager = new
ConsumerMembershipManager(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT,
Optional.empty(),
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
- backgroundEventHandler, time, rebalanceMetricsManager);
+ backgroundEventHandler, time, rebalanceMetricsManager, true);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.transitionToJoining();
@@ -309,7 +309,7 @@ public class ConsumerMembershipManagerTest {
membershipManager.memberId());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
membershipManager.onHeartbeatSuccess(heartbeatResponse);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
@@ -608,8 +608,8 @@ public class ConsumerMembershipManagerTest {
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2,
membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
CompletableFuture<Void> commitResult = new CompletableFuture<>();
-
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(commitResult);
- membershipManager.poll(time.milliseconds());
+
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(commitResult);
+ membershipManager.maybeReconcile(false);
// Get fenced, commit completes
membershipManager.transitionToFenced();
@@ -627,7 +627,7 @@ public class ConsumerMembershipManagerTest {
// We have to reconcile & ack the assignment again
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
@@ -663,7 +663,7 @@ public class ConsumerMembershipManagerTest {
// stay in RECONCILING state, since an unresolved topic is assigned
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verifyReconciliationTriggeredAndCompleted(membershipManager,
Collections.singletonList(new TopicIdPartition(topic1, new
TopicPartition("topic1", 0)))
);
@@ -679,7 +679,7 @@ public class ConsumerMembershipManagerTest {
// Receive original assignment again - full reconciliation not
triggered but assignment is acked again
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(false);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
verifyReconciliationNotTriggered(membershipManager);
assertEquals(Collections.singletonMap(topic1, mkSortedSet(0)),
membershipManager.currentAssignment().partitions);
@@ -751,8 +751,7 @@ public class ConsumerMembershipManagerTest {
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertEquals(topic2Assignment,
membershipManager.topicPartitionsAwaitingReconciliation());
- // Next reconciliation triggered in poll
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
assertEquals(Collections.emptySet(),
membershipManager.topicsAwaitingReconciliation());
verify(subscriptionState).assignFromSubscribedAwaitingCallback(topicPartitions(topic2Assignment,
topic2Metadata), topicPartitions(topic2Assignment, topic2Metadata));
@@ -797,7 +796,7 @@ public class ConsumerMembershipManagerTest {
);
receiveAssignment(newAssignment, membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(false);
verifyReconciliationNotTriggered(membershipManager);
assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -819,8 +818,7 @@ public class ConsumerMembershipManagerTest {
assertEquals(MemberState.RECONCILING, membershipManager.state());
clearInvocations(membershipManager, commitRequestManager);
- // Next poll should trigger final reconciliation
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verifyReconciliationTriggeredAndCompleted(membershipManager,
Arrays.asList(topicId1Partition0, topicId2Partition0));
}
@@ -858,7 +856,7 @@ public class ConsumerMembershipManagerTest {
);
receiveAssignment(newAssignment, membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(false);
// No full reconciliation triggered, but assignment needs to be
acknowledged.
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -881,7 +879,7 @@ public class ConsumerMembershipManagerTest {
);
when(metadata.topicNames()).thenReturn(fullTopicMetadata);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verifyReconciliationTriggeredAndCompleted(membershipManager,
Arrays.asList(topicId1Partition0, topicId2Partition0));
}
@@ -973,7 +971,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
List<TopicIdPartition> assignedPartitions = Arrays.asList(
new TopicIdPartition(topicId, new TopicPartition(topicName, 0)),
@@ -1207,7 +1205,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Collections.singletonList(0),
membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Set<TopicPartition> expectedAssignment = Collections.singleton(new
TopicPartition(topicName, 0));
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1249,7 +1247,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verifyReconciliationTriggeredAndCompleted(membershipManager,
Collections.emptyList());
@@ -1273,7 +1271,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Collections.singletonList(0),
membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(false);
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -1301,7 +1299,7 @@ public class ConsumerMembershipManagerTest {
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
// Assignment should have been reconciled.
Set<TopicPartition> expectedAssignment = Collections.singleton(new
TopicPartition(topicName, 1));
@@ -1363,7 +1361,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
List<TopicIdPartition> assignedPartitions = topicIdPartitions(topicId,
topicName, 0, 1);
verifyReconciliationTriggeredAndCompleted(membershipManager,
assignedPartitions);
@@ -1382,7 +1380,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1, 2), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
List<TopicIdPartition> assignedPartitions = new ArrayList<>();
assignedPartitions.add(ownedPartition);
@@ -1403,7 +1401,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verifyReconciliationTriggeredAndCompleted(membershipManager,
expectedAssignmentReconciled);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1436,7 +1434,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
testRevocationOfAllPartitionsCompleted(membershipManager);
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new
TopicPartition("topic1", 0)));
@@ -1452,8 +1450,8 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
-
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(commitResult);
- membershipManager.poll(time.milliseconds());
+
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(commitResult);
+ membershipManager.maybeReconcile(true);
// Member stays in RECONCILING while the commit request hasn't
completed.
assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -1465,7 +1463,7 @@ public class ConsumerMembershipManagerTest {
commitResult.complete(null);
InOrder inOrder = inOrder(subscriptionState, commitRequestManager);
inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new
TopicPartition("topic1", 0)));
-
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRevocation(anyLong());
+
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRebalance(anyLong());
inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new
TopicPartition("topic1", 0)));
testRevocationOfAllPartitionsCompleted(membershipManager);
@@ -1481,7 +1479,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
// Member stays in RECONCILING while the commit request hasn't
completed.
assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -1512,7 +1510,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(1, 2), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
TreeSet<TopicPartition> expectedSet = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
expectedSet.add(new TopicPartition(topicName, 1));
@@ -1546,8 +1544,7 @@ public class ConsumerMembershipManagerTest {
String topicName = "topic1";
mockTopicNameInMetadataCache(Collections.singletonMap(topicId,
topicName), true);
- // When the next poll is run, the member should re-trigger
reconciliation
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
List<TopicIdPartition> expectedAssignmentReconciled =
topicIdPartitions(topicId, topicName, 0, 1);
verifyReconciliationTriggeredAndCompleted(membershipManager,
expectedAssignmentReconciled);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1576,7 +1573,7 @@ public class ConsumerMembershipManagerTest {
// Next poll is run, but metadata still without the unresolved topic
in it. Should keep
// the unresolved and request update again.
when(metadata.topicNames()).thenReturn(Collections.emptyMap());
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(false);
verifyReconciliationNotTriggered(membershipManager);
assertEquals(Collections.singleton(topicId),
membershipManager.topicsAwaitingReconciliation());
verify(metadata, times(2)).requestUpdate(anyBoolean());
@@ -1594,7 +1591,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verify(subscriptionState).markPendingRevocation(Set.of());
// Member should complete reconciliation
@@ -1618,7 +1615,7 @@ public class ConsumerMembershipManagerTest {
// Revoke one of the 2 partitions
receiveAssignment(topicId, Collections.singletonList(1),
membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new
TopicPartition(topicName, 0)));
// Revocation should complete without requesting any metadata update
given that the topic
@@ -1671,7 +1668,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertTrue(membershipManager.reconciliationInProgress());
@@ -1704,7 +1701,7 @@ public class ConsumerMembershipManagerTest {
// Step 5: receive an empty assignment, which means we should call
revoke
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
0, 1));
receiveEmptyAssignment(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertTrue(membershipManager.reconciliationInProgress());
@@ -1767,7 +1764,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertEquals(topicIdPartitionsMap(topicId, 0),
membershipManager.currentAssignment().partitions);
@@ -1826,7 +1823,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertEquals(topicIdPartitionsMap(topicId, 0),
membershipManager.currentAssignment().partitions);
@@ -1882,7 +1879,7 @@ public class ConsumerMembershipManagerTest {
mockPartitionOwnedAndNewPartitionAdded(topicName, partitionOwned,
partitionAdded,
new CounterConsumerRebalanceListener(), membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
addedPartitions);
@@ -1914,7 +1911,7 @@ public class ConsumerMembershipManagerTest {
mockPartitionOwnedAndNewPartitionAdded(topicName, partitionOwned,
partitionAdded,
listener, membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
addedPartitions);
@@ -2226,7 +2223,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
@@ -2248,7 +2245,7 @@ public class ConsumerMembershipManagerTest {
assertEquals(0, listener.lostCount());
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
performCallback(
membershipManager,
invoker,
@@ -2285,7 +2282,7 @@ public class ConsumerMembershipManagerTest {
long reconciliationDurationMs = 1234;
time.sleep(reconciliationDurationMs);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
// Complete commit request to complete the callback invocation
commitResult.complete(null);
@@ -2317,7 +2314,7 @@ public class ConsumerMembershipManagerTest {
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
// assign partitions
performCallback(
@@ -2338,7 +2335,7 @@ public class ConsumerMembershipManagerTest {
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
0, 1));
receiveAssignment(topicId, Collections.singletonList(2),
membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
performCallback(
membershipManager,
@@ -2396,6 +2393,14 @@ public class ConsumerMembershipManagerTest {
assertEquals(-1d, getMetricValue(metrics,
rebalanceMetricsManager.lastRebalanceSecondsAgo));
}
+ @Test
+ public void testPollMustCallsMaybeReconcileWithFalse() {
+ ConsumerMembershipManager membershipManager =
createMemberInStableState();
+ membershipManager.poll(time.milliseconds());
+ verify(membershipManager).maybeReconcile(false);
+ verifyReconciliationNotTriggered(membershipManager);
+ }
+
private Object getMetricValue(Metrics metrics, MetricName name) {
return metrics.metrics().get(name).metricValue();
}
@@ -2409,7 +2414,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, partitions, membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
List<TopicIdPartition> assignedPartitions =
partitions.stream().map(tp -> new TopicIdPartition(topicId,
@@ -2424,7 +2429,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verifyReconciliationTriggered(membershipManager);
clearInvocations(membershipManager);
@@ -2443,7 +2448,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, partitions, membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verifyReconciliationTriggered(membershipManager);
clearInvocations(membershipManager);
@@ -2464,7 +2469,7 @@ public class ConsumerMembershipManagerTest {
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
receiveAssignment(topicId, partitions, membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verifyReconciliationTriggered(membershipManager);
clearInvocations(membershipManager);
assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -2489,7 +2494,7 @@ public class ConsumerMembershipManagerTest {
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
receiveAssignment(topicId, Collections.singletonList(newPartition),
membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verifyReconciliationTriggered(membershipManager);
clearInvocations(membershipManager);
assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -2552,7 +2557,7 @@ public class ConsumerMembershipManagerTest {
if (withAutoCommit) {
when(commitRequestManager.autoCommitEnabled()).thenReturn(true);
CompletableFuture<Void> commitResult = new CompletableFuture<>();
-
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(commitResult);
+
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(commitResult);
return commitResult;
} else {
return CompletableFuture.completedFuture(null);
@@ -2634,7 +2639,7 @@ public class ConsumerMembershipManagerTest {
membershipManager.onHeartbeatSuccess(heartbeatResponse);
if (triggerReconciliation) {
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verify(subscriptionState).assignFromSubscribedAwaitingCallback(anyCollection(),
anyCollection());
} else {
verify(subscriptionState,
never()).assignFromSubscribed(anyCollection());
@@ -2655,7 +2660,7 @@ public class ConsumerMembershipManagerTest {
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
assertEquals(MemberState.RECONCILING, membershipManager.state());
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
@@ -2722,7 +2727,7 @@ public class ConsumerMembershipManagerTest {
// Stale reconciliation should have been aborted and a new one should
be triggered on the next poll.
assertFalse(membershipManager.reconciliationInProgress());
clearInvocations(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
verify(membershipManager).markReconciliationInProgress();
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 911c028f728..3d55b30052b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -63,6 +63,7 @@ import static
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
@@ -168,11 +169,9 @@ public class ApplicationEventProcessorTest {
doReturn(true).when(subscriptionState).assignFromUser(Collections.singleton(tp));
processor.process(event);
if (withGroupId) {
- verify(commitRequestManager).updateAutoCommitTimer(currentTimeMs);
- verify(commitRequestManager).maybeAutoCommitAsync();
+
verify(commitRequestManager).updateTimerAndMaybeCommit(currentTimeMs);
} else {
- verify(commitRequestManager,
never()).updateAutoCommitTimer(currentTimeMs);
- verify(commitRequestManager, never()).maybeAutoCommitAsync();
+ verify(commitRequestManager,
never()).updateTimerAndMaybeCommit(currentTimeMs);
}
verify(metadata).requestUpdateForNewTopics();
verify(subscriptionState).assignFromUser(Collections.singleton(tp));
@@ -241,7 +240,8 @@ public class ApplicationEventProcessorTest {
setupProcessor(true);
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
processor.process(event);
- verify(commitRequestManager).updateAutoCommitTimer(12345);
+ assertTrue(event.reconcileAndAutoCommit().isDone());
+ verify(commitRequestManager).updateTimerAndMaybeCommit(12345);
verify(membershipManager).onConsumerPoll();
verify(heartbeatRequestManager).resetPollTimer(12345);
}
@@ -442,18 +442,35 @@ public class ApplicationEventProcessorTest {
assertEquals(mixedSubscriptionError, thrown);
}
- @ParameterizedTest
- @MethodSource("offsetsGenerator")
- public void testSyncCommitEvent(Optional<Map<TopicPartition,
OffsetAndMetadata>> offsets) {
- SyncCommitEvent event = new SyncCommitEvent(offsets, 12345);
+ @Test
+ public void testSyncCommitEventWithEmptyOffsets() {
+ Map<TopicPartition, OffsetAndMetadata> allConsumed =
+ Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10,
Optional.of(1), ""));
+ SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345);
+ setupProcessor(true);
+ doReturn(allConsumed).when(subscriptionState).allConsumed();
+
doReturn(CompletableFuture.completedFuture(allConsumed)).when(commitRequestManager).commitSync(allConsumed,
12345);
+ processor.process(event);
+ verify(commitRequestManager).commitSync(allConsumed, 12345);
+ assertTrue(event.offsetsReady.isDone());
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets =
assertDoesNotThrow(() -> event.future().get());
+ assertEquals(allConsumed, committedOffsets);
+ }
+
+ @Test
+ public void testSyncCommitEvent() {
+ Map<TopicPartition, OffsetAndMetadata> offsets =
+ Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10,
Optional.of(1), ""));
+ SyncCommitEvent event = new SyncCommitEvent(Optional.of(offsets),
12345);
setupProcessor(true);
-
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets,
12345);
+
doReturn(CompletableFuture.completedFuture(offsets)).when(commitRequestManager).commitSync(offsets,
12345);
processor.process(event);
verify(commitRequestManager).commitSync(offsets, 12345);
+ assertTrue(event.offsetsReady.isDone());
Map<TopicPartition, OffsetAndMetadata> committedOffsets =
assertDoesNotThrow(() -> event.future().get());
- assertEquals(offsets.orElse(Map.of()), committedOffsets);
+ assertEquals(offsets, committedOffsets);
}
@Test
@@ -475,22 +492,40 @@ public class ApplicationEventProcessorTest {
doReturn(future).when(commitRequestManager).commitSync(any(),
anyLong());
processor.process(event);
- verify(commitRequestManager).commitSync(Optional.empty(), 12345);
+ verify(commitRequestManager).commitSync(Collections.emptyMap(), 12345);
+ assertTrue(event.offsetsReady.isDone());
assertFutureThrows(event.future(), IllegalStateException.class);
}
- @ParameterizedTest
- @MethodSource("offsetsGenerator")
- public void testAsyncCommitEventWithOffsets(Optional<Map<TopicPartition,
OffsetAndMetadata>> offsets) {
- AsyncCommitEvent event = new AsyncCommitEvent(offsets);
+ @Test
+ public void testAsyncCommitEventWithEmptyOffsets() {
+ Map<TopicPartition, OffsetAndMetadata> allConsumed =
+ Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10,
Optional.of(1), ""));
+ AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
+ setupProcessor(true);
+
doReturn(CompletableFuture.completedFuture(allConsumed)).when(commitRequestManager).commitAsync(allConsumed);
+ doReturn(allConsumed).when(subscriptionState).allConsumed();
+
+ processor.process(event);
+ verify(commitRequestManager).commitAsync(allConsumed);
+ assertTrue(event.offsetsReady.isDone());
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets =
assertDoesNotThrow(() -> event.future().get());
+ assertEquals(allConsumed, committedOffsets);
+ }
+ @Test
+ public void testAsyncCommitEvent() {
+ Map<TopicPartition, OffsetAndMetadata> offsets =
+ Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10,
Optional.of(1), ""));
+ AsyncCommitEvent event = new AsyncCommitEvent(Optional.of(offsets));
setupProcessor(true);
-
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitAsync(offsets);
+
doReturn(CompletableFuture.completedFuture(offsets)).when(commitRequestManager).commitAsync(offsets);
processor.process(event);
verify(commitRequestManager).commitAsync(offsets);
+ assertTrue(event.offsetsReady.isDone());
Map<TopicPartition, OffsetAndMetadata> committedOffsets =
assertDoesNotThrow(() -> event.future().get());
- assertEquals(offsets.orElse(Map.of()), committedOffsets);
+ assertEquals(offsets, committedOffsets);
}
@Test
@@ -507,22 +542,17 @@ public class ApplicationEventProcessorTest {
AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
setupProcessor(true);
+ doReturn(Collections.emptyMap()).when(subscriptionState).allConsumed();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = new
CompletableFuture<>();
future.completeExceptionally(new IllegalStateException());
doReturn(future).when(commitRequestManager).commitAsync(any());
processor.process(event);
- verify(commitRequestManager).commitAsync(Optional.empty());
+ verify(commitRequestManager).commitAsync(Collections.emptyMap());
+ assertTrue(event.offsetsReady.isDone());
assertFutureThrows(event.future(), IllegalStateException.class);
}
- private static Stream<Arguments> offsetsGenerator() {
- return Stream.of(
- Arguments.of(Optional.empty()),
- Arguments.of(Optional.of(Map.of(new TopicPartition("topic", 0),
new OffsetAndMetadata(10, Optional.of(1), ""))))
- );
- }
-
private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}