This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 35a372c71db KAFKA-18641: AsyncKafkaConsumer could lose records with 
auto offset commit (#18737)
35a372c71db is described below

commit 35a372c71db1dbe793460e211e6897953cec97d4
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Feb 21 01:11:01 2025 +0800

    KAFKA-18641: AsyncKafkaConsumer could lose records with auto offset commit 
(#18737)
    
    Reviewers: Lianet Magrans <[email protected]>, Jun Rao 
<[email protected]>, Kirk True <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../internals/AbstractMembershipManager.java       |  25 +++-
 .../consumer/internals/AsyncKafkaConsumer.java     |  14 +-
 .../consumer/internals/CommitRequestManager.java   |  68 +++++----
 .../internals/ConsumerMembershipManager.java       |  17 ++-
 .../consumer/internals/RequestManagers.java        |   3 +-
 .../consumer/internals/ShareMembershipManager.java |   3 +-
 .../events/ApplicationEventProcessor.java          |  25 +++-
 .../consumer/internals/events/CommitEvent.java     |  16 +++
 .../consumer/internals/events/PollEvent.java       |  24 ++++
 .../consumer/internals/AsyncKafkaConsumerTest.java |  45 +++++-
 .../internals/CommitRequestManagerTest.java        | 159 +++++++--------------
 .../internals/ConsumerMembershipManagerTest.java   | 117 +++++++--------
 .../events/ApplicationEventProcessorTest.java      |  82 +++++++----
 14 files changed, 357 insertions(+), 243 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7dd8fa7a366..e48b71c4507 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -105,7 +105,7 @@
               
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
 
     <suppress checks="NPathComplexity"
-              
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
+              
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
 
     <suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
               files="CoordinatorClient.java"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 90c2b3a647d..71b61a26d3f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -144,7 +144,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
 
     /**
      * If there is a reconciliation running (triggering commit, callbacks) for 
the
-     * assignmentReadyToReconcile. This will be true if {@link 
#maybeReconcile()} has been triggered
+     * assignmentReadyToReconcile. This will be true if {@link 
#maybeReconcile(boolean)} has been triggered
      * after receiving a heartbeat response, or a metadata update.
      */
     private boolean reconciliationInProgress;
@@ -199,12 +199,15 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
      */
     private boolean isPollTimerExpired;
 
+    private final boolean autoCommitEnabled;
+
     AbstractMembershipManager(String groupId,
                               SubscriptionState subscriptions,
                               ConsumerMetadata metadata,
                               Logger log,
                               Time time,
-                              RebalanceMetricsManager metricsManager) {
+                              RebalanceMetricsManager metricsManager,
+                              boolean autoCommitEnabled) {
         this.groupId = groupId;
         this.state = MemberState.UNSUBSCRIBED;
         this.subscriptions = subscriptions;
@@ -216,6 +219,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
         this.stateUpdatesListeners = new ArrayList<>();
         this.time = time;
         this.metricsManager = metricsManager;
+        this.autoCommitEnabled = autoCommitEnabled;
     }
 
     /**
@@ -791,8 +795,16 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
      *  - Another reconciliation is already in progress.
      *  - There are topics that haven't been added to the current assignment 
yet, but all their topic IDs
      *    are missing from the target assignment.
+     *
+     * @param canCommit Controls whether reconciliation can proceed when 
auto-commit is enabled.
+     *                  Set to true only when the current offset positions are 
safe to commit.
+     *                  If false and auto-commit enabled, the reconciliation 
will be skipped.
      */
-    void maybeReconcile() {
+    public void maybeReconcile(boolean canCommit) {
+        if (state != MemberState.RECONCILING) {
+            return;
+        }
+
         if (targetAssignmentReconciled()) {
             log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
                     "current assignment.");
@@ -818,6 +830,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
             return;
         }
 
+        if (autoCommitEnabled && !canCommit) return;
         markReconciliationInProgress();
 
         // Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
@@ -1347,7 +1360,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
 
     /**
      * @return If there is a reconciliation in process now. Note that 
reconciliation is triggered
-     * by a call to {@link #maybeReconcile()}. Visible for testing.
+     * by a call to {@link #maybeReconcile(boolean)}. Visible for testing.
      */
     boolean reconciliationInProgress() {
         return reconciliationInProgress;
@@ -1383,9 +1396,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
      *                      time-sensitive operations should be performed
      */
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-        if (state == MemberState.RECONCILING) {
-            maybeReconcile();
-        }
+        maybeReconcile(false);
         return NetworkClientDelegate.PollResult.EMPTY;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 3f30acc932c..e19c37c954f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -748,9 +748,14 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             }
 
             do {
-
+                PollEvent event = new PollEvent(timer.currentTimeMs());
                 // Make sure to let the background thread know that we are 
still polling.
-                applicationEventHandler.add(new 
PollEvent(timer.currentTimeMs()));
+                // This will trigger async auto-commits of consumed positions 
when hitting
+                // the interval time or reconciling new assignments
+                applicationEventHandler.add(event);
+                // Wait for reconciliation and auto-commit to be triggered, to 
ensure all commit requests
+                // retrieve the positions to commit before proceeding with 
fetching new records
+                ConsumerUtils.getResult(event.reconcileAndAutoCommit(), 
defaultApiTimeoutMs.toMillis());
 
                 // We must not allow wake-ups between polling for fetches and 
returning the records.
                 // If the polled fetches are not empty the consumed position 
has already been updated in the polling
@@ -818,7 +823,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         try {
             AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
             lastPendingAsyncCommit = 
commit(asyncCommitEvent).whenComplete((committedOffsets, throwable) -> {
-
                 if (throwable == null) {
                     
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(committedOffsets);
                 }
@@ -846,6 +850,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
 
         applicationEventHandler.add(commitEvent);
+
+        // This blocks until the background thread retrieves allConsumed 
positions to commit if none were explicitly specified.
+        // This operation will ensure that the offsets to commit are not 
affected by fetches which may start after this
+        ConsumerUtils.getResult(commitEvent.offsetsReady(), 
defaultApiTimeoutMs.toMillis());
         return commitEvent.future();
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 1d3503886a9..284707a812b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -171,8 +171,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
     }
 
     /**
-     * Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} 
request if there's any. The function will
-     * also try to autocommit the offsets, if feature is enabled.
+     * Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} 
request if there's any.
      */
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
@@ -186,7 +185,6 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             return drainPendingOffsetCommitRequests();
         }
 
-        maybeAutoCommitAsync();
         if (!pendingRequests.hasUnsentRequests())
             return EMPTY;
 
@@ -264,7 +262,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
      * In that case, the next auto-commit request will be sent on the next 
call to poll, after a
      * response for the in-flight is received.
      */
-    public void maybeAutoCommitAsync() {
+    private void maybeAutoCommitAsync() {
         if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
             OffsetCommitRequestState requestState = createOffsetCommitRequest(
                 subscriptions.allConsumed(),
@@ -298,7 +296,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
 
     /**
      * Commit consumed offsets if auto-commit is enabled, regardless of the 
auto-commit interval.
-     * This is used for committing offsets before revoking partitions. This 
will retry committing
+     * This is used for committing offsets before rebalance. This will retry 
committing
      * the latest offsets until the request succeeds, fails with a fatal 
error, or the timeout
      * expires. Note that:
      * <ul>
@@ -306,18 +304,18 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
      *     including the member ID and latest member epoch received from the 
broker.</li>
      *     <li>Considers {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} as a fatal 
error, and will not
      *     retry it although the error extends RetriableException. The reason 
is that if a topic
-     *     or partition is deleted, revocation would not finish in time since 
the auto commit would keep retrying.</li>
+     *     or partition is deleted, rebalance would not finish in time since 
the auto commit would keep retrying.</li>
      * </ul>
      *
      * Also note that this will generate a commit request even if there is 
another one in-flight,
      * generated by the auto-commit on the interval logic, to ensure that the 
latest offsets are
-     * committed before revoking partitions.
+     * committed before rebalance.
      *
      * @return Future that will complete when the offsets are successfully 
committed. It will
      * complete exceptionally if the commit fails with a non-retriable error, 
or if the retry
      * timeout expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(final 
long deadlineMs) {
+    public CompletableFuture<Void> maybeAutoCommitSyncBeforeRebalance(final 
long deadlineMs) {
         if (!autoCommitEnabled()) {
             return CompletableFuture.completedFuture(null);
         }
@@ -325,12 +323,12 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         CompletableFuture<Void> result = new CompletableFuture<>();
         OffsetCommitRequestState requestState =
             createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs);
-        autoCommitSyncBeforeRevocationWithRetries(requestState, result);
+        autoCommitSyncBeforeRebalanceWithRetries(requestState, result);
         return result;
     }
 
-    private void 
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState 
requestAttempt,
-                                                           
CompletableFuture<Void> result) {
+    private void 
autoCommitSyncBeforeRebalanceWithRetries(OffsetCommitRequestState 
requestAttempt,
+                                                          
CompletableFuture<Void> result) {
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAttempt = requestAutoCommit(requestAttempt);
         commitAttempt.whenComplete((committedOffsets, error) -> {
             if (error == null) {
@@ -338,10 +336,10 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             } else {
                 if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
                     if (requestAttempt.isExpired()) {
-                        log.debug("Auto-commit sync before revocation timed 
out and won't be retried anymore");
+                        log.debug("Auto-commit sync before rebalance timed out 
and won't be retried anymore");
                         
result.completeExceptionally(maybeWrapAsTimeoutException(error));
                     } else if (error instanceof 
UnknownTopicOrPartitionException) {
-                        log.debug("Auto-commit sync before revocation failed 
because topic or partition were deleted");
+                        log.debug("Auto-commit sync before rebalance failed 
because topic or partition were deleted");
                         result.completeExceptionally(error);
                     } else {
                         // Make sure the auto-commit is retried with the 
latest offsets
@@ -350,10 +348,10 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                             error.getMessage());
                         requestAttempt.offsets = subscriptions.allConsumed();
                         requestAttempt.resetFuture();
-                        
autoCommitSyncBeforeRevocationWithRetries(requestAttempt, result);
+                        
autoCommitSyncBeforeRebalanceWithRetries(requestAttempt, result);
                     }
                 } else {
-                    log.debug("Auto-commit sync before revocation failed with 
non-retriable error", error);
+                    log.debug("Auto-commit sync before rebalance failed with 
non-retriable error", error);
                     result.completeExceptionally(error);
                 }
             }
@@ -388,14 +386,13 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
      * exceptionally depending on the response. If the request fails with a 
retriable error, the
      * future will be completed with a {@link RetriableCommitFailedException}.
      */
-    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
-        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(subscriptions::allConsumed);
-        if (commitOffsets.isEmpty()) {
+    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
+        if (offsets.isEmpty()) {
             log.debug("Skipping commit of empty offsets");
             return CompletableFuture.completedFuture(Map.of());
         }
-        maybeUpdateLastSeenEpochIfNewer(commitOffsets);
-        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(commitOffsets, Long.MAX_VALUE);
+        maybeUpdateLastSeenEpochIfNewer(offsets);
+        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(offsets, Long.MAX_VALUE);
         pendingRequests.addOffsetCommitRequest(commitRequest);
 
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
asyncCommitResult = new CompletableFuture<>();
@@ -403,7 +400,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             if (error != null) {
                 
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
             } else {
-                asyncCommitResult.complete(commitOffsets);
+                asyncCommitResult.complete(offsets);
             }
         });
         return asyncCommitResult;
@@ -417,15 +414,14 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
      *                   an expected retriable error.
      * @return Future that will complete when a successful response
      */
-    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitSync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets,
+    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets,
                                                                                
 final long deadlineMs) {
-        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(subscriptions::allConsumed);
-        if (commitOffsets.isEmpty()) {
+        if (offsets.isEmpty()) {
             return CompletableFuture.completedFuture(Map.of());
         }
-        maybeUpdateLastSeenEpochIfNewer(commitOffsets);
+        maybeUpdateLastSeenEpochIfNewer(offsets);
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new 
CompletableFuture<>();
-        OffsetCommitRequestState requestState = 
createOffsetCommitRequest(commitOffsets, deadlineMs);
+        OffsetCommitRequestState requestState = 
createOffsetCommitRequest(offsets, deadlineMs);
         commitSyncWithRetries(requestState, result);
         return result;
     }
@@ -566,7 +562,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         return error instanceof StaleMemberEpochException && 
memberInfo.memberEpoch.isPresent();
     }
 
-    public void updateAutoCommitTimer(final long currentTimeMs) {
+    private void updateAutoCommitTimer(final long currentTimeMs) {
         this.autoCommitState.ifPresent(t -> t.updateTimer(currentTimeMs));
     }
 
@@ -637,6 +633,24 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         });
     }
 
+    /**
+     * This is a non-blocking method to update timer and trigger async 
auto-commit.
+     * <p>
+     * This method performs two main tasks:
+     * <ol>
+     * <li>Updates the internal timer with the current time.</li>
+     * <li>Initiate an asynchronous auto-commit operation for all consumed 
positions if needed.</li>
+     * </ol>
+     *
+     * @param currentTimeMs the current timestamp in millisecond
+     * @see CommitRequestManager#updateAutoCommitTimer(long)
+     * @see CommitRequestManager#maybeAutoCommitAsync()
+     */
+    public void updateTimerAndMaybeCommit(final long currentTimeMs) {
+        updateAutoCommitTimer(currentTimeMs);
+        maybeAutoCommitAsync();
+    }
+
     class OffsetCommitRequestState extends RetriableRequestState {
         private Map<TopicPartition, OffsetAndMetadata> offsets;
         private final String groupId;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index f77295e8aef..57d6c21e48b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -123,8 +123,7 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
     private final Optional<String> serverAssignor;
 
     /**
-     * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
-     * enabled)
+     * Manager to perform commit requests needed before rebalance (if 
auto-commit is enabled)
      */
     private final CommitRequestManager commitRequestManager;
 
@@ -145,7 +144,8 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
                                      LogContext logContext,
                                      BackgroundEventHandler 
backgroundEventHandler,
                                      Time time,
-                                     Metrics metrics) {
+                                     Metrics metrics,
+                                     boolean autoCommitEnabled) {
         this(groupId,
             groupInstanceId,
             rebalanceTimeoutMs,
@@ -156,7 +156,8 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
             logContext,
             backgroundEventHandler,
             time,
-            new ConsumerRebalanceMetricsManager(metrics));
+            new ConsumerRebalanceMetricsManager(metrics),
+            autoCommitEnabled);
     }
 
     // Visible for testing
@@ -170,13 +171,15 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
                               LogContext logContext,
                               BackgroundEventHandler backgroundEventHandler,
                               Time time,
-                              RebalanceMetricsManager metricsManager) {
+                              RebalanceMetricsManager metricsManager,
+                              boolean autoCommitEnabled) {
         super(groupId,
             subscriptions,
             metadata,
             logContext.logger(ConsumerMembershipManager.class),
             time,
-            metricsManager);
+            metricsManager,
+            autoCommitEnabled);
         this.groupInstanceId = groupInstanceId;
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.serverAssignor = serverAssignor;
@@ -252,7 +255,7 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
         // best effort to commit the offsets in the case where the epoch might 
have changed while
         // the current reconciliation is in process. Note this is using the 
rebalance timeout as
         // it is the limit enforced by the broker to complete the 
reconciliation process.
-        return 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));
+        return 
commitRequestManager.maybeAutoCommitSyncBeforeRebalance(getDeadlineMsForTimeout(rebalanceTimeoutMs));
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 960567ac96b..f7446b7ad3c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -217,7 +217,8 @@ public class RequestManagers implements Closeable {
                             logContext,
                             backgroundEventHandler,
                             time,
-                            metrics);
+                            metrics,
+                            
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
 
                     // Update the group member ID label in the client 
telemetry reporter.
                     // According to KIP-1082, the consumer will generate the 
member ID as the incarnation ID of the process.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
index 3e3ea246748..d7944466130 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
@@ -106,7 +106,8 @@ public class ShareMembershipManager extends 
AbstractMembershipManager<ShareGroup
                 metadata,
                 logContext.logger(ShareMembershipManager.class),
                 time,
-                metricsManager);
+                metricsManager,
+                false);
         this.rackId = rackId;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 69cc0072a39..08661ef7581 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -206,13 +206,25 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     private void process(final PollEvent event) {
+        // Trigger a reconciliation that can safely commit offsets if needed 
to rebalance,
+        // as we're processing before any new fetching starts in the app thread
+        
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+            consumerMembershipManager.maybeReconcile(true));
         if (requestManagers.commitRequestManager.isPresent()) {
-            requestManagers.commitRequestManager.ifPresent(m -> 
m.updateAutoCommitTimer(event.pollTimeMs()));
+            CommitRequestManager commitRequestManager = 
requestManagers.commitRequestManager.get();
+            commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
+            // all commit request generation points have been passed,
+            // so it's safe to notify the app thread could proceed and start 
fetching
+            event.markReconcileAndAutoCommitComplete();
             requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
                 hrm.membershipManager().onConsumerPoll();
                 hrm.resetPollTimer(event.pollTimeMs());
             });
         } else {
+            // safe to unblock - no auto-commit risk here:
+            // 1. commitRequestManager is not present
+            // 2. shareConsumer has no auto-commit mechanism
+            event.markReconcileAndAutoCommitComplete();
             requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
                 hrm.membershipManager().onConsumerPoll();
                 hrm.resetPollTimer(event.pollTimeMs());
@@ -234,7 +246,9 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
 
         try {
             CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitAsync(event.offsets());
+            Map<TopicPartition, OffsetAndMetadata> offsets = 
event.offsets().orElseGet(subscriptions::allConsumed);
+            event.markOffsetsReady();
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitAsync(offsets);
             future.whenComplete(complete(event.future()));
         } catch (Exception e) {
             event.future().completeExceptionally(e);
@@ -250,7 +264,9 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
 
         try {
             CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitSync(event.offsets(), event.deadlineMs());
+            Map<TopicPartition, OffsetAndMetadata> offsets = 
event.offsets().orElseGet(subscriptions::allConsumed);
+            event.markOffsetsReady();
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitSync(offsets, event.deadlineMs());
             future.whenComplete(complete(event.future()));
         } catch (Exception e) {
             event.future().completeExceptionally(e);
@@ -275,8 +291,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     private void process(final AssignmentChangeEvent event) {
         if (requestManagers.commitRequestManager.isPresent()) {
             CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-            manager.updateAutoCommitTimer(event.currentTimeMs());
-            manager.maybeAutoCommitAsync();
+            manager.updateTimerAndMaybeCommit(event.currentTimeMs());
         }
 
         log.info("Assigned to partition(s): {}", 
event.partitions().stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
index a05ce638ece..9afe3a383bb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 public abstract class CommitEvent extends 
CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
 
@@ -30,6 +31,13 @@ public abstract class CommitEvent extends 
CompletableApplicationEvent<Map<TopicP
      */
     private final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets;
 
+    /**
+     * Future that completes when allConsumed offsets have been calculated.
+     * The app thread waits for this future before returning control to ensure
+     * the offsets to be committed are up-to-date.
+     */
+    protected final CompletableFuture<Void> offsetsReady = new 
CompletableFuture<>();
+
     protected CommitEvent(final Type type, final Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets, final long deadlineMs) {
         super(type, deadlineMs);
         this.offsets = validate(offsets);
@@ -57,6 +65,14 @@ public abstract class CommitEvent extends 
CompletableApplicationEvent<Map<TopicP
         return offsets;
     }
 
+    public CompletableFuture<Void> offsetsReady() {
+        return offsetsReady;
+    }
+
+    public void markOffsetsReady() {
+        offsetsReady.complete(null);
+    }
+
     @Override
     protected String toStringBase() {
         return super.toStringBase() + ", offsets=" + offsets;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
index 96614c06e9b..37df5d9ddc2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
@@ -16,10 +16,26 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import java.util.concurrent.CompletableFuture;
+
 public class PollEvent extends ApplicationEvent {
 
     private final long pollTimeMs;
 
+    /**
+     * A future that represents the completion of reconciliation and 
auto-commit
+     * processing.
+     * This future is completed when all commit request generation points have
+     * been passed, including:
+     * <ul>
+     *   <li>auto-commit on rebalance</li>
+     *   <li>auto-commit on the interval</li>
+     * </ul>
+     * Once completed, it signals that it's safe for the consumer to proceed 
with
+     * fetching new records.
+     */
+    private final CompletableFuture<Void> reconcileAndAutoCommit = new 
CompletableFuture<>();
+
     public PollEvent(final long pollTimeMs) {
         super(Type.POLL);
         this.pollTimeMs = pollTimeMs;
@@ -29,6 +45,14 @@ public class PollEvent extends ApplicationEvent {
         return pollTimeMs;
     }
 
+    public CompletableFuture<Void> reconcileAndAutoCommit() {
+        return reconcileAndAutoCommit;
+    }
+
+    public void markReconcileAndAutoCommitComplete() {
+        reconcileAndAutoCommit.complete(null);
+    }
+
     @Override
     public String toStringBase() {
         return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 58ec7a334dc..cb03d585ba6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -37,6 +37,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
@@ -293,6 +294,7 @@ public class AsyncKafkaConsumerTest {
         offsets.put(t0, new OffsetAndMetadata(10L));
         offsets.put(t1, new OffsetAndMetadata(20L));
 
+        markOffsetsReadyForCommitEvent();
         consumer.commitAsync(offsets, null);
 
         final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
@@ -394,6 +396,7 @@ public class AsyncKafkaConsumerTest {
 
         consumer.wakeup();
 
+        markReconcileAndAutoCommitCompleteForPollEvent();
         assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ZERO));
         assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
     }
@@ -413,6 +416,7 @@ public class AsyncKafkaConsumerTest {
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
 
+        markReconcileAndAutoCommitCompleteForPollEvent();
         assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ofMinutes(1)));
         assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
     }
@@ -436,6 +440,7 @@ public class AsyncKafkaConsumerTest {
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
 
+        markReconcileAndAutoCommitCompleteForPollEvent();
         // since wakeup() is called when the non-empty fetch is returned the 
wakeup should be ignored
         assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
         // the previously ignored wake-up should not be ignored in the next 
call
@@ -472,6 +477,7 @@ public class AsyncKafkaConsumerTest {
 
         completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(Collections.singletonList(topicName), listener);
+        markReconcileAndAutoCommitCompleteForPollEvent();
         consumer.poll(Duration.ZERO);
         assertTrue(callbackExecuted.get());
     }
@@ -493,6 +499,7 @@ public class AsyncKafkaConsumerTest {
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
 
+        markReconcileAndAutoCommitCompleteForPollEvent();
         consumer.poll(Duration.ZERO);
 
         assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
@@ -584,6 +591,7 @@ public class AsyncKafkaConsumerTest {
         consumer.assign(Collections.singleton(tp));
         completeSeekUnvalidatedEventSuccessfully();
         consumer.seek(tp, 20);
+        markOffsetsReadyForCommitEvent();
         consumer.commitAsync();
 
         CompletableApplicationEvent<Void> event = getLastEnqueuedEvent();
@@ -617,6 +625,7 @@ public class AsyncKafkaConsumerTest {
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
         assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), 
callback));
+        markReconcileAndAutoCommitCompleteForPollEvent();
         assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO),
             callback,
             null);
@@ -732,6 +741,7 @@ public class AsyncKafkaConsumerTest {
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         completeSeekUnvalidatedEventSuccessfully();
         subscriptions.seek(new TopicPartition("topic", 0), 100);
+        markOffsetsReadyForCommitEvent();
         consumer.commitSyncAllConsumed(time.timer(100));
 
         ArgumentCaptor<SyncCommitEvent> eventCaptor = 
ArgumentCaptor.forClass(SyncCommitEvent.class);
@@ -1026,6 +1036,7 @@ public class AsyncKafkaConsumerTest {
             ApplicationEvent event = invocation.getArgument(0);
             if (event instanceof SyncCommitEvent) {
                 capturedEvent.set((SyncCommitEvent) event);
+                ((SyncCommitEvent) event).markOffsetsReady();
             }
             return null;
         }).when(applicationEventHandler).add(any());
@@ -1050,7 +1061,9 @@ public class AsyncKafkaConsumerTest {
         completeSeekUnvalidatedEventSuccessfully();
         consumer.seek(tp, 20);
 
+        markOffsetsReadyForCommitEvent();
         consumer.commitAsync();
+
         Exception e = assertThrows(KafkaException.class, () -> 
consumer.close(Duration.ofMillis(10)));
         assertInstanceOf(TimeoutException.class, e.getCause());
     }
@@ -1391,6 +1404,7 @@ public class AsyncKafkaConsumerTest {
             backgroundEventQueue.add(e);
         }
 
+        markReconcileAndAutoCommitCompleteForPollEvent();
         // This will trigger the background event queue to process our 
background event message.
         // If any error is happening inside the rebalance callbacks, we expect 
the first exception to be thrown from poll.
         if (expectedException.isPresent()) {
@@ -1460,6 +1474,7 @@ public class AsyncKafkaConsumerTest {
         backgroundEventQueue.add(errorEvent);
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singletonList(new TopicPartition("topic", 0)));
+        markReconcileAndAutoCommitCompleteForPollEvent();
         final KafkaException exception = assertThrows(KafkaException.class, () 
-> consumer.poll(Duration.ZERO));
 
         assertEquals(expectedException.getMessage(), exception.getMessage());
@@ -1478,6 +1493,7 @@ public class AsyncKafkaConsumerTest {
         backgroundEventQueue.add(errorEvent2);
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singletonList(new TopicPartition("topic", 0)));
+        markReconcileAndAutoCommitCompleteForPollEvent();
         final KafkaException exception = assertThrows(KafkaException.class, () 
-> consumer.poll(Duration.ZERO));
 
         assertEquals(expectedException1.getMessage(), exception.getMessage());
@@ -1510,6 +1526,7 @@ public class AsyncKafkaConsumerTest {
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
         props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
         props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         final ConsumerConfig config = new ConsumerConfig(props);
         consumer = newConsumer(config);
 
@@ -1533,6 +1550,7 @@ public class AsyncKafkaConsumerTest {
         final Properties props = 
requiredConsumerConfigAndGroupId("consumerGroupA");
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
         props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         final ConsumerConfig config = new ConsumerConfig(props);
         consumer = newConsumer(config);
 
@@ -1561,6 +1579,7 @@ public class AsyncKafkaConsumerTest {
 
         completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(singletonList("topic1"));
+        markReconcileAndAutoCommitCompleteForPollEvent();
         consumer.poll(Duration.ofMillis(100));
         verify(applicationEventHandler).add(any(PollEvent.class));
         
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
@@ -1579,7 +1598,7 @@ public class AsyncKafkaConsumerTest {
 
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(new TopicPartition("t1", 1)));
-
+        markReconcileAndAutoCommitCompleteForPollEvent();
         consumer.poll(Duration.ZERO);
 
         verify(applicationEventHandler, atLeast(1))
@@ -1616,6 +1635,7 @@ public class AsyncKafkaConsumerTest {
         ).when(fetchCollector).collectFetch(any(FetchBuffer.class));
         
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
 
+        markReconcileAndAutoCommitCompleteForPollEvent();
         // And then poll for up to 10000ms, which should return 2 records 
without timing out
         ConsumerRecords<?, ?> returnedRecords = 
consumer.poll(Duration.ofMillis(10000));
         assertEquals(2, returnedRecords.count());
@@ -1719,6 +1739,7 @@ public class AsyncKafkaConsumerTest {
         // interrupt the thread and call poll
         try {
             Thread.currentThread().interrupt();
+            markReconcileAndAutoCommitCompleteForPollEvent();
             assertThrows(InterruptException.class, () -> 
consumer.poll(Duration.ZERO));
         } finally {
             // clear interrupted state again since this thread may be reused 
by JUnit
@@ -1750,6 +1771,7 @@ public class AsyncKafkaConsumerTest {
         completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(Collections.singletonList("topic"));
         
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
+        markReconcileAndAutoCommitCompleteForPollEvent();
         consumer.poll(Duration.ZERO);
         verify(backgroundEventReaper).reap(time.milliseconds());
     }
@@ -1812,6 +1834,7 @@ public class AsyncKafkaConsumerTest {
         completeUnsubscribeApplicationEventSuccessfully();
 
         consumer.assign(singleton(new TopicPartition("topic1", 0)));
+        markReconcileAndAutoCommitCompleteForPollEvent();
         consumer.poll(Duration.ZERO);
         verify(applicationEventHandler, 
never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
 
@@ -1974,6 +1997,7 @@ public class AsyncKafkaConsumerTest {
     private void completeCommitAsyncApplicationEventExceptionally(Exception 
ex) {
         doAnswer(invocation -> {
             AsyncCommitEvent event = invocation.getArgument(0);
+            event.markOffsetsReady();
             event.future().completeExceptionally(ex);
             return null;
         
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
@@ -1982,6 +2006,7 @@ public class AsyncKafkaConsumerTest {
     private void completeCommitSyncApplicationEventExceptionally(Exception ex) 
{
         doAnswer(invocation -> {
             SyncCommitEvent event = invocation.getArgument(0);
+            event.markOffsetsReady();
             event.future().completeExceptionally(ex);
             return null;
         
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
@@ -1994,6 +2019,7 @@ public class AsyncKafkaConsumerTest {
     private void completeCommitAsyncApplicationEventSuccessfully() {
         doAnswer(invocation -> {
             AsyncCommitEvent event = invocation.getArgument(0);
+            event.markOffsetsReady();
             event.future().complete(null);
             return null;
         
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
@@ -2002,6 +2028,7 @@ public class AsyncKafkaConsumerTest {
     private void completeCommitSyncApplicationEventSuccessfully() {
         doAnswer(invocation -> {
             SyncCommitEvent event = invocation.getArgument(0);
+            event.markOffsetsReady();
             event.future().complete(null);
             return null;
         
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
@@ -2089,4 +2116,20 @@ public class AsyncKafkaConsumerTest {
         // Invokes callback
         consumer.commitAsync();
     }
+
+    private void markOffsetsReadyForCommitEvent() {
+        doAnswer(invocation -> {
+            CommitEvent event = invocation.getArgument(0);
+            event.markOffsetsReady();
+            return null;
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitEvent.class));
+    }
+
+    private void markReconcileAndAutoCommitCompleteForPollEvent() {
+        doAnswer(invocation -> {
+            PollEvent event = invocation.getArgument(0);
+            event.markReconcileAndAutoCommitComplete();
+            return null;
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class));
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 32b8aea5b22..f44b285136a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -84,10 +84,10 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -179,7 +179,7 @@ public class CommitRequestManagerTest {
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManager.commitAsync(Optional.of(offsets));
+        commitRequestManager.commitAsync(offsets);
         assertPoll(false, 0, commitRequestManager);
     }
 
@@ -190,7 +190,7 @@ public class CommitRequestManagerTest {
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManager.commitAsync(Optional.of(offsets));
+        commitRequestManager.commitAsync(offsets);
         assertPoll(false, 0, commitRequestManager);
         assertPoll(true, 1, commitRequestManager);
     }
@@ -202,7 +202,7 @@ public class CommitRequestManagerTest {
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManager.commitAsync(Optional.of(offsets));
+        commitRequestManager.commitAsync(offsets);
         assertPoll(1, commitRequestManager);
     }
 
@@ -214,9 +214,9 @@ public class CommitRequestManagerTest {
         CommitRequestManager commitRequestManager = create(true, 100);
         assertPoll(0, commitRequestManager);
 
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
         pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
                 "t1",
@@ -243,9 +243,9 @@ public class CommitRequestManagerTest {
 
         // Add the requests to the CommitRequestManager and store their futures
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        commitManager.commitSync(Optional.of(offsets1), deadlineMs);
+        commitManager.commitSync(offsets1, deadlineMs);
         commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 0)), deadlineMs);
-        commitManager.commitSync(Optional.of(offsets2), deadlineMs);
+        commitManager.commitSync(offsets2, deadlineMs);
         commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 1)), deadlineMs);
 
         // Poll the CommitRequestManager and verify that the 
inflightOffsetFetches size is correct
@@ -278,7 +278,7 @@ public class CommitRequestManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
             new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
-        commitRequestManager.commitAsync(Optional.of(offsets));
+        commitRequestManager.commitAsync(offsets);
         assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
         assertEquals(1, 
commitRequestManager.poll(time.milliseconds()).unsentRequests.size());
         
assertTrue(commitRequestManager.unsentOffsetCommitRequests().isEmpty());
@@ -295,7 +295,7 @@ public class CommitRequestManagerTest {
 
         CommitRequestManager commitRequestManager = create(false, 100);
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitSync(
-            Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
+            offsets, time.milliseconds() + defaultApiTimeoutMs);
         assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
         List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
         pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
@@ -314,43 +314,17 @@ public class CommitRequestManagerTest {
     @Test
     public void testCommitSyncWithEmptyOffsets() {
         subscriptionState = mock(SubscriptionState.class);
-        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
-        TopicPartition tp = new TopicPartition("topic", 1);
-        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
-        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
-        doReturn(offsets).when(subscriptionState).allConsumed();
 
         CommitRequestManager commitRequestManager = create(false, 100);
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitSync(
-            Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
-        assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
-        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
-        pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
-            "topic",
-            1,
-            (short) 1,
-            Errors.NONE)));
-
-        verify(subscriptionState).allConsumed();
-        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
-        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
+            Collections.emptyMap(), time.milliseconds() + defaultApiTimeoutMs);
         assertTrue(future.isDone());
-        assertEquals(offsets, commitOffsets);
-    }
-
-    @Test
-    public void testCommitSyncWithEmptyAllConsumedOffsets() {
-        subscriptionState = mock(SubscriptionState.class);
-        doReturn(Map.of()).when(subscriptionState).allConsumed();
-
-        CommitRequestManager commitRequestManager = create(true, 100);
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitSync(
-            Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
+        assertEquals(0, 
commitRequestManager.unsentOffsetCommitRequests().size());
+        assertPoll(0, commitRequestManager);
 
-        verify(subscriptionState).allConsumed();
+        verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
         Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
-        assertTrue(future.isDone());
-        assertTrue(commitOffsets.isEmpty());
+        assertEquals(Collections.emptyMap(), commitOffsets);
     }
 
     @Test
@@ -362,7 +336,7 @@ public class CommitRequestManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
 
         CommitRequestManager commitRequestManager = create(true, 100);
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(Optional.of(offsets));
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(offsets);
         assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
         List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
         pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
@@ -381,39 +355,12 @@ public class CommitRequestManagerTest {
     @Test
     public void testCommitAsyncWithEmptyOffsets() {
         subscriptionState = mock(SubscriptionState.class);
-        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
-        TopicPartition tp = new TopicPartition("topic", 1);
-        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
-        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
-        doReturn(offsets).when(subscriptionState).allConsumed();
-
-        CommitRequestManager commitRequestManager = create(true, 100);
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(Optional.empty());
-        assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
-        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
-        pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
-            "topic",
-            1,
-            (short) 1,
-            Errors.NONE)));
-
-        verify(subscriptionState).allConsumed();
-        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
-        assertTrue(future.isDone());
-        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
-        assertEquals(offsets, commitOffsets);
-    }
-
-    @Test
-    public void testCommitAsyncWithEmptyAllConsumedOffsets() {
-        subscriptionState = mock(SubscriptionState.class);
-        doReturn(Map.of()).when(subscriptionState).allConsumed();
 
         CommitRequestManager commitRequestManager = create(true, 100);
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(Optional.empty());
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(Collections.emptyMap());
 
-        verify(subscriptionState).allConsumed();
         assertTrue(future.isDone());
+        assertPoll(0, commitRequestManager);
         Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
         assertTrue(commitOffsets.isEmpty());
     }
@@ -428,7 +375,7 @@ public class CommitRequestManagerTest {
         subscriptionState.assignFromUser(Collections.singleton(tp));
         subscriptionState.seek(tp, 100);
         time.sleep(commitInterval);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         List<NetworkClientDelegate.FutureCompletionHandler> futures = 
assertPoll(1, commitRequestManager);
         // Complete the autocommit request exceptionally. It should fail right 
away, without retry.
         futures.get(0).onComplete(mockOffsetCommitResponse(
@@ -441,14 +388,14 @@ public class CommitRequestManagerTest {
         // (making sure we wait for the backoff, to check that the failed 
request is not being
         // retried).
         time.sleep(retryBackoffMs);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         assertPoll(0, commitRequestManager);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
 
         // Only when polling after the auto-commit interval, a new auto-commit 
request should be
         // generated.
         time.sleep(commitInterval);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         futures = assertPoll(1, commitRequestManager);
         assertEmptyPendingRequests(commitRequestManager);
         futures.get(0).onComplete(mockOffsetCommitResponse(
@@ -470,7 +417,7 @@ public class CommitRequestManagerTest {
             new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(offsets, deadlineMs);
         
sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManager, 
error, commitResult);
 
         // We expect that request should have been retried on this sync commit.
@@ -496,7 +443,7 @@ public class CommitRequestManagerTest {
             new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(offsets, deadlineMs);
 
         completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.UNKNOWN_MEMBER_ID);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
@@ -516,7 +463,7 @@ public class CommitRequestManagerTest {
 
         // Send commit request expected to be retried on retriable errors
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(
-            Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
+            offsets, time.milliseconds() + defaultApiTimeoutMs);
         completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.STALE_MEMBER_EPOCH);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
@@ -535,7 +482,6 @@ public class CommitRequestManagerTest {
     public void 
testAutoCommitAsyncFailsWithStaleMemberEpochContinuesToCommitOnTheInterval() {
         CommitRequestManager commitRequestManager = create(true, 100);
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
         TopicPartition t1p = new TopicPartition("topic1", 0);
         subscriptionState.assignFromUser(singleton(t1p));
@@ -543,7 +489,7 @@ public class CommitRequestManagerTest {
 
         // Async commit on the interval fails with fatal stale epoch and just 
resets the timer to
         // the interval
-        commitRequestManager.maybeAutoCommitAsync();
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.STALE_MEMBER_EPOCH);
         verify(commitRequestManager).resetAutoCommitTimer();
 
@@ -552,7 +498,7 @@ public class CommitRequestManagerTest {
         assertEquals(0, res.unsentRequests.size(), "No request should be 
generated until the " +
             "interval expires");
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         res = commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
 
@@ -568,7 +514,7 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         // Async commit that won't be retried.
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitAsync(Optional.of(offsets));
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitAsync(offsets);
 
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -590,11 +536,11 @@ public class CommitRequestManagerTest {
 
         CommitRequestManager commitRequestManager = create(true, 100);
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         List<NetworkClientDelegate.FutureCompletionHandler> futures = 
assertPoll(1, commitRequestManager);
 
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         // We want to make sure we don't resend autocommit if the previous 
request has not been
         // completed, even if the interval expired
         assertPoll(0, commitRequestManager);
@@ -602,6 +548,7 @@ public class CommitRequestManagerTest {
 
         // complete the unsent request and re-poll
         futures.get(0).onComplete(buildOffsetCommitClientResponse(new 
OffsetCommitResponse(0, new HashMap<>())));
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         assertPoll(1, commitRequestManager);
     }
 
@@ -615,7 +562,7 @@ public class CommitRequestManagerTest {
         // Send auto-commit request on the interval.
         CommitRequestManager commitRequestManager = create(true, 100);
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
         NetworkClientDelegate.FutureCompletionHandler autoCommitOnInterval =
@@ -624,7 +571,7 @@ public class CommitRequestManagerTest {
         // Another auto-commit request should be sent if a revocation happens, 
even if an
         // auto-commit on the interval is in-flight.
         CompletableFuture<Void> autoCommitBeforeRevocation =
-            commitRequestManager.maybeAutoCommitSyncBeforeRevocation(200);
+            commitRequestManager.maybeAutoCommitSyncBeforeRebalance(200);
         assertEquals(1, 
commitRequestManager.pendingRequests.unsentOffsetCommits.size());
 
         // Receive response for initial auto-commit on interval
@@ -641,7 +588,7 @@ public class CommitRequestManagerTest {
 
         CommitRequestManager commitRequestManager = create(true, 100);
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         List<NetworkClientDelegate.FutureCompletionHandler> futures = 
assertPoll(1, commitRequestManager);
 
         // complete the unsent request to trigger interceptor
@@ -659,7 +606,7 @@ public class CommitRequestManagerTest {
 
         CommitRequestManager commitRequestManager = create(true, 100);
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         List<NetworkClientDelegate.FutureCompletionHandler> futures = 
assertPoll(1, commitRequestManager);
 
         // complete the unsent request to trigger interceptor
@@ -673,8 +620,7 @@ public class CommitRequestManagerTest {
     public void testAutoCommitEmptyOffsetsDoesNotGenerateRequest() {
         CommitRequestManager commitRequestManager = create(true, 100);
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
-        commitRequestManager.maybeAutoCommitAsync();
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         
assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
         verify(commitRequestManager).resetAutoCommitTimer();
     }
@@ -687,15 +633,13 @@ public class CommitRequestManagerTest {
 
         // Auto-commit of empty offsets
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
-        commitRequestManager.maybeAutoCommitAsync();
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
 
         // Next auto-commit consumed offsets (not empty). Should generate a 
request, ensuring
         // that the previous auto-commit of empty did not leave the inflight 
request flag on
         subscriptionState.seek(t1p, 100);
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
-        commitRequestManager.maybeAutoCommitAsync();
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         assertEquals(1, 
commitRequestManager.pendingRequests.unsentOffsetCommits.size());
 
         verify(commitRequestManager, times(2)).resetAutoCommitTimer();
@@ -711,8 +655,7 @@ public class CommitRequestManagerTest {
 
         // Send auto-commit request that will remain in-flight without a 
response
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
-        commitRequestManager.maybeAutoCommitAsync();
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         List<NetworkClientDelegate.FutureCompletionHandler> futures = 
assertPoll(1, commitRequestManager);
         assertEquals(1, futures.size());
         NetworkClientDelegate.FutureCompletionHandler inflightCommitResult = 
futures.get(0);
@@ -723,8 +666,7 @@ public class CommitRequestManagerTest {
         // should not be reset either, to ensure that the next auto-commit is 
sent out as soon as
         // the inflight receives a response.
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
-        commitRequestManager.maybeAutoCommitAsync();
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         assertPoll(0, commitRequestManager);
         verify(commitRequestManager, never()).resetAutoCommitTimer();
 
@@ -732,6 +674,7 @@ public class CommitRequestManagerTest {
         // polling the manager.
         inflightCommitResult.onComplete(
             mockOffsetCommitResponse(t1p.topic(), t1p.partition(), (short) 1, 
Errors.NONE));
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         assertPoll(1, commitRequestManager);
     }
 
@@ -918,7 +861,7 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         // Send async commit (not expected to be retried).
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitAsync(Optional.of(offsets));
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitAsync(offsets);
         completeOffsetCommitRequestWithError(commitRequestManager, error);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
@@ -943,7 +886,7 @@ public class CommitRequestManagerTest {
 
         // Send sync offset commit request that fails with retriable error.
         long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(offsets, deadlineMs);
         completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);
 
         // Request retried after backoff, and fails with retriable again. 
Should not complete yet
@@ -976,7 +919,7 @@ public class CommitRequestManagerTest {
 
         // Send offset commit request that fails with retriable error.
         long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(offsets, deadlineMs);
         completeOffsetCommitRequestWithError(commitRequestManager, error);
 
         // Sleep to expire the request timeout. Request should fail on the 
next poll with a
@@ -1006,7 +949,7 @@ public class CommitRequestManagerTest {
 
         // Send async commit request that fails with retriable error (not 
expected to be retried).
         Errors retriableError = Errors.COORDINATOR_NOT_AVAILABLE;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitAsync(Optional.of(offsets));
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitAsync(offsets);
         completeOffsetCommitRequestWithError(commitRequestManager, 
retriableError);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
@@ -1031,7 +974,7 @@ public class CommitRequestManagerTest {
         offsets.put(new TopicPartition("t1", 1), new OffsetAndMetadata(2));
         offsets.put(new TopicPartition("t1", 2), new OffsetAndMetadata(3));
 
-        commitRequestManager.commitSync(Optional.of(offsets), 
time.milliseconds() + defaultApiTimeoutMs);
+        commitRequestManager.commitSync(offsets, time.milliseconds() + 
defaultApiTimeoutMs);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
 
@@ -1056,7 +999,7 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
+        commitRequestManager.commitSync(offsets, deadlineMs);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
         res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new 
TimeoutException());
@@ -1212,7 +1155,7 @@ public class CommitRequestManagerTest {
         long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
 
         // Send commit request expected to be retried on STALE_MEMBER_EPOCH 
error while it does not expire
-        commitRequestManager.maybeAutoCommitSyncBeforeRevocation(deadlineMs);
+        commitRequestManager.maybeAutoCommitSyncBeforeRebalance(deadlineMs);
 
         int newEpoch = 8;
         String memberId = "member1";
@@ -1266,7 +1209,7 @@ public class CommitRequestManagerTest {
         // Send auto commit to revoke partitions, expected to be retried on 
STALE_MEMBER_EPOCH
         // with the latest epochs received (using long deadline to avoid 
expiring the request
         // while retrying with the new epochs)
-        
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(Long.MAX_VALUE);
+        
commitRequestManager.maybeAutoCommitSyncBeforeRebalance(Long.MAX_VALUE);
 
         int initialEpoch = 1;
         String memberId = "member1";
@@ -1312,7 +1255,7 @@ public class CommitRequestManagerTest {
                 new OffsetAndMetadata(0));
 
         long commitCreationTimeMs = time.milliseconds();
-        commitRequestManager.commitAsync(Optional.of(offsets));
+        commitRequestManager.commitAsync(offsets);
 
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -1475,7 +1418,7 @@ public class CommitRequestManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
 
-        commitRequestManager.commitAsync(Optional.of(offsets));
+        commitRequestManager.commitAsync(offsets);
         commitRequestManager.signalClose();
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index d42d81d7ce4..3a93c25072d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -126,7 +126,7 @@ public class ConsumerMembershipManagerTest {
         metrics = new Metrics(time);
         rebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics);
 
-        
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
+        
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
     }
 
     private ConsumerMembershipManager createMembershipManagerJoiningGroup() {
@@ -143,7 +143,7 @@ public class ConsumerMembershipManagerTest {
         ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
             GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, 
Optional.empty(),
             subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
-            backgroundEventHandler, time, rebalanceMetricsManager));
+            backgroundEventHandler, time, rebalanceMetricsManager, true));
         assertMemberIdIsGenerated(manager.memberId());
         return manager;
     }
@@ -153,7 +153,7 @@ public class ConsumerMembershipManagerTest {
         ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
                 GROUP_ID, Optional.ofNullable(groupInstanceId), 
REBALANCE_TIMEOUT,
                 Optional.ofNullable(serverAssignor), subscriptionState, 
commitRequestManager,
-                metadata, LOG_CONTEXT, backgroundEventHandler, time, 
rebalanceMetricsManager));
+                metadata, LOG_CONTEXT, backgroundEventHandler, time, 
rebalanceMetricsManager, true));
         assertMemberIdIsGenerated(manager.memberId());
         manager.transitionToJoining();
         return manager;
@@ -232,7 +232,7 @@ public class ConsumerMembershipManagerTest {
         ConsumerMembershipManager membershipManager = new 
ConsumerMembershipManager(
                 GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
                 subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
-            backgroundEventHandler, time, rebalanceMetricsManager);
+            backgroundEventHandler, time, rebalanceMetricsManager, true);
         assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
         membershipManager.transitionToJoining();
 
@@ -309,7 +309,7 @@ public class ConsumerMembershipManagerTest {
             membershipManager.memberId());
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
         membershipManager.onHeartbeatSuccess(heartbeatResponse);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         membershipManager.onHeartbeatRequestGenerated();
         assertEquals(MemberState.STABLE, membershipManager.state());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
@@ -608,8 +608,8 @@ public class ConsumerMembershipManagerTest {
         
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2,
 membershipManager.memberId()));
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         CompletableFuture<Void> commitResult = new CompletableFuture<>();
-        
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(commitResult);
-        membershipManager.poll(time.milliseconds());
+        
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(commitResult);
+        membershipManager.maybeReconcile(false);
 
         // Get fenced, commit completes
         membershipManager.transitionToFenced();
@@ -627,7 +627,7 @@ public class ConsumerMembershipManagerTest {
         // We have to reconcile & ack the assignment again
         
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
 membershipManager.memberId()));
         assertEquals(MemberState.RECONCILING, membershipManager.state());
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
         membershipManager.onHeartbeatRequestGenerated();
         assertEquals(MemberState.STABLE, membershipManager.state());
@@ -663,7 +663,7 @@ public class ConsumerMembershipManagerTest {
         // stay in RECONCILING state, since an unresolved topic is assigned
         
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
 membershipManager.memberId()));
         assertEquals(MemberState.RECONCILING, membershipManager.state());
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         verifyReconciliationTriggeredAndCompleted(membershipManager,
             Collections.singletonList(new TopicIdPartition(topic1, new 
TopicPartition("topic1", 0)))
         );
@@ -679,7 +679,7 @@ public class ConsumerMembershipManagerTest {
         // Receive original assignment again - full reconciliation not 
triggered but assignment is acked again
         
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
 membershipManager.memberId()));
         assertEquals(MemberState.RECONCILING, membershipManager.state());
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(false);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
         verifyReconciliationNotTriggered(membershipManager);
         assertEquals(Collections.singletonMap(topic1, mkSortedSet(0)), 
membershipManager.currentAssignment().partitions);
@@ -751,8 +751,7 @@ public class ConsumerMembershipManagerTest {
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         assertEquals(topic2Assignment, 
membershipManager.topicPartitionsAwaitingReconciliation());
 
-        // Next reconciliation triggered in poll
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         assertEquals(Collections.emptySet(), 
membershipManager.topicsAwaitingReconciliation());
         
verify(subscriptionState).assignFromSubscribedAwaitingCallback(topicPartitions(topic2Assignment,
 topic2Metadata), topicPartitions(topic2Assignment, topic2Metadata));
@@ -797,7 +796,7 @@ public class ConsumerMembershipManagerTest {
             );
 
         receiveAssignment(newAssignment, membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(false);
 
         verifyReconciliationNotTriggered(membershipManager);
         assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -819,8 +818,7 @@ public class ConsumerMembershipManagerTest {
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         clearInvocations(membershipManager, commitRequestManager);
 
-        // Next poll should trigger final reconciliation
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
Arrays.asList(topicId1Partition0, topicId2Partition0));
     }
@@ -858,7 +856,7 @@ public class ConsumerMembershipManagerTest {
             );
 
         receiveAssignment(newAssignment, membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(false);
 
         // No full reconciliation triggered, but assignment needs to be 
acknowledged.
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -881,7 +879,7 @@ public class ConsumerMembershipManagerTest {
         );
         when(metadata.topicNames()).thenReturn(fullTopicMetadata);
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
Arrays.asList(topicId1Partition0, topicId2Partition0));
     }
@@ -973,7 +971,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         List<TopicIdPartition> assignedPartitions = Arrays.asList(
             new TopicIdPartition(topicId, new TopicPartition(topicName, 0)),
@@ -1207,7 +1205,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Collections.singletonList(0), 
membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         Set<TopicPartition> expectedAssignment = Collections.singleton(new 
TopicPartition(topicName, 0));
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1249,7 +1247,7 @@ public class ConsumerMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
Collections.emptyList());
 
@@ -1273,7 +1271,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Collections.singletonList(0), 
membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(false);
         membershipManager.onHeartbeatRequestGenerated();
 
         assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -1301,7 +1299,7 @@ public class ConsumerMembershipManagerTest {
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
         
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         // Assignment should have been reconciled.
         Set<TopicPartition> expectedAssignment = Collections.singleton(new 
TopicPartition(topicName, 1));
@@ -1363,7 +1361,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         List<TopicIdPartition> assignedPartitions = topicIdPartitions(topicId, 
topicName, 0, 1);
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
assignedPartitions);
@@ -1382,7 +1380,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Arrays.asList(0, 1, 2), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         List<TopicIdPartition> assignedPartitions = new ArrayList<>();
         assignedPartitions.add(ownedPartition);
@@ -1403,7 +1401,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
expectedAssignmentReconciled);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1436,7 +1434,7 @@ public class ConsumerMembershipManagerTest {
         receiveEmptyAssignment(membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         testRevocationOfAllPartitionsCompleted(membershipManager);
         verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new 
TopicPartition("topic1", 0)));
@@ -1452,8 +1450,8 @@ public class ConsumerMembershipManagerTest {
         receiveEmptyAssignment(membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(commitResult);
-        membershipManager.poll(time.milliseconds());
+        
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(commitResult);
+        membershipManager.maybeReconcile(true);
 
         // Member stays in RECONCILING while the commit request hasn't 
completed.
         assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -1465,7 +1463,7 @@ public class ConsumerMembershipManagerTest {
         commitResult.complete(null);
         InOrder inOrder = inOrder(subscriptionState, commitRequestManager);
         inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new 
TopicPartition("topic1", 0)));
-        
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRevocation(anyLong());
+        
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRebalance(anyLong());
         inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new 
TopicPartition("topic1", 0)));
 
         testRevocationOfAllPartitionsCompleted(membershipManager);
@@ -1481,7 +1479,7 @@ public class ConsumerMembershipManagerTest {
         receiveEmptyAssignment(membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         // Member stays in RECONCILING while the commit request hasn't 
completed.
         assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -1512,7 +1510,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Arrays.asList(1, 2), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         TreeSet<TopicPartition> expectedSet = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
         expectedSet.add(new TopicPartition(topicName, 1));
@@ -1546,8 +1544,7 @@ public class ConsumerMembershipManagerTest {
         String topicName = "topic1";
         mockTopicNameInMetadataCache(Collections.singletonMap(topicId, 
topicName), true);
 
-        // When the next poll is run, the member should re-trigger 
reconciliation
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         List<TopicIdPartition> expectedAssignmentReconciled = 
topicIdPartitions(topicId, topicName, 0, 1);
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
expectedAssignmentReconciled);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1576,7 +1573,7 @@ public class ConsumerMembershipManagerTest {
         // Next poll is run, but metadata still without the unresolved topic 
in it. Should keep
         // the unresolved and request update again.
         when(metadata.topicNames()).thenReturn(Collections.emptyMap());
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(false);
         verifyReconciliationNotTriggered(membershipManager);
         assertEquals(Collections.singleton(topicId), 
membershipManager.topicsAwaitingReconciliation());
         verify(metadata, times(2)).requestUpdate(anyBoolean());
@@ -1594,7 +1591,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         verify(subscriptionState).markPendingRevocation(Set.of());
 
         // Member should complete reconciliation
@@ -1618,7 +1615,7 @@ public class ConsumerMembershipManagerTest {
         // Revoke one of the 2 partitions
         receiveAssignment(topicId, Collections.singletonList(1), 
membershipManager);
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new 
TopicPartition(topicName, 0)));
 
         // Revocation should complete without requesting any metadata update 
given that the topic
@@ -1671,7 +1668,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         assertTrue(membershipManager.reconciliationInProgress());
@@ -1704,7 +1701,7 @@ public class ConsumerMembershipManagerTest {
         // Step 5: receive an empty assignment, which means we should call 
revoke
         
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
 0, 1));
         receiveEmptyAssignment(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         assertTrue(membershipManager.reconciliationInProgress());
@@ -1767,7 +1764,7 @@ public class ConsumerMembershipManagerTest {
         receiveEmptyAssignment(membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         assertEquals(topicIdPartitionsMap(topicId, 0), 
membershipManager.currentAssignment().partitions);
@@ -1826,7 +1823,7 @@ public class ConsumerMembershipManagerTest {
         receiveEmptyAssignment(membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         assertEquals(topicIdPartitionsMap(topicId, 0), 
membershipManager.currentAssignment().partitions);
@@ -1882,7 +1879,7 @@ public class ConsumerMembershipManagerTest {
         mockPartitionOwnedAndNewPartitionAdded(topicName, partitionOwned, 
partitionAdded,
             new CounterConsumerRebalanceListener(), membershipManager);
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
 addedPartitions);
 
@@ -1914,7 +1911,7 @@ public class ConsumerMembershipManagerTest {
         mockPartitionOwnedAndNewPartitionAdded(topicName, partitionOwned, 
partitionAdded,
             listener, membershipManager);
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
 addedPartitions);
 
@@ -2226,7 +2223,7 @@ public class ConsumerMembershipManagerTest {
         receiveEmptyAssignment(membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         membershipManager.onHeartbeatRequestGenerated();
 
         assertEquals(MemberState.STABLE, membershipManager.state());
@@ -2248,7 +2245,7 @@ public class ConsumerMembershipManagerTest {
         assertEquals(0, listener.lostCount());
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         performCallback(
             membershipManager,
             invoker,
@@ -2285,7 +2282,7 @@ public class ConsumerMembershipManagerTest {
         long reconciliationDurationMs = 1234;
         time.sleep(reconciliationDurationMs);
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         // Complete commit request to complete the callback invocation
         commitResult.complete(null);
 
@@ -2317,7 +2314,7 @@ public class ConsumerMembershipManagerTest {
         
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         // assign partitions
         performCallback(
@@ -2338,7 +2335,7 @@ public class ConsumerMembershipManagerTest {
         
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
 0, 1));
         receiveAssignment(topicId, Collections.singletonList(2), 
membershipManager);
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         performCallback(
             membershipManager,
@@ -2396,6 +2393,14 @@ public class ConsumerMembershipManagerTest {
         assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
     }
 
+    @Test
+    public void testPollMustCallsMaybeReconcileWithFalse() {
+        ConsumerMembershipManager membershipManager = 
createMemberInStableState();
+        membershipManager.poll(time.milliseconds());
+        verify(membershipManager).maybeReconcile(false);
+        verifyReconciliationNotTriggered(membershipManager);
+    }
+
     private Object getMetricValue(Metrics metrics, MetricName name) {
         return metrics.metrics().get(name).metricValue();
     }
@@ -2409,7 +2414,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, partitions, membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         List<TopicIdPartition> assignedPartitions =
             partitions.stream().map(tp -> new TopicIdPartition(topicId,
@@ -2424,7 +2429,7 @@ public class ConsumerMembershipManagerTest {
         receiveEmptyAssignment(membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         verifyReconciliationTriggered(membershipManager);
         clearInvocations(membershipManager);
@@ -2443,7 +2448,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, partitions, membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
 
         verifyReconciliationTriggered(membershipManager);
         clearInvocations(membershipManager);
@@ -2464,7 +2469,7 @@ public class ConsumerMembershipManagerTest {
 
         
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
         receiveAssignment(topicId, partitions, membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         verifyReconciliationTriggered(membershipManager);
         clearInvocations(membershipManager);
         assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -2489,7 +2494,7 @@ public class ConsumerMembershipManagerTest {
 
         
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
         receiveAssignment(topicId, Collections.singletonList(newPartition), 
membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         verifyReconciliationTriggered(membershipManager);
         clearInvocations(membershipManager);
         assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -2552,7 +2557,7 @@ public class ConsumerMembershipManagerTest {
         if (withAutoCommit) {
             when(commitRequestManager.autoCommitEnabled()).thenReturn(true);
             CompletableFuture<Void> commitResult = new CompletableFuture<>();
-            
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(commitResult);
+            
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(commitResult);
             return commitResult;
         } else {
             return CompletableFuture.completedFuture(null);
@@ -2634,7 +2639,7 @@ public class ConsumerMembershipManagerTest {
         membershipManager.onHeartbeatSuccess(heartbeatResponse);
 
         if (triggerReconciliation) {
-            membershipManager.poll(time.milliseconds());
+            membershipManager.maybeReconcile(true);
             
verify(subscriptionState).assignFromSubscribedAwaitingCallback(anyCollection(), 
anyCollection());
         } else {
             verify(subscriptionState, 
never()).assignFromSubscribed(anyCollection());
@@ -2655,7 +2660,7 @@ public class ConsumerMembershipManagerTest {
         
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
         membershipManager.onHeartbeatSuccess(heartbeatResponse);
         assertEquals(MemberState.RECONCILING, membershipManager.state());
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
         membershipManager.onHeartbeatRequestGenerated();
         assertEquals(MemberState.STABLE, membershipManager.state());
@@ -2722,7 +2727,7 @@ public class ConsumerMembershipManagerTest {
         // Stale reconciliation should have been aborted and a new one should 
be triggered on the next poll.
         assertFalse(membershipManager.reconciliationInProgress());
         clearInvocations(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);
         verify(membershipManager).markReconciliationInProgress();
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 911c028f728..3d55b30052b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -63,6 +63,7 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
@@ -168,11 +169,9 @@ public class ApplicationEventProcessorTest {
         
doReturn(true).when(subscriptionState).assignFromUser(Collections.singleton(tp));
         processor.process(event);
         if (withGroupId) {
-            verify(commitRequestManager).updateAutoCommitTimer(currentTimeMs);
-            verify(commitRequestManager).maybeAutoCommitAsync();
+            
verify(commitRequestManager).updateTimerAndMaybeCommit(currentTimeMs);
         } else {
-            verify(commitRequestManager, 
never()).updateAutoCommitTimer(currentTimeMs);
-            verify(commitRequestManager, never()).maybeAutoCommitAsync();
+            verify(commitRequestManager, 
never()).updateTimerAndMaybeCommit(currentTimeMs);
         }
         verify(metadata).requestUpdateForNewTopics();
         verify(subscriptionState).assignFromUser(Collections.singleton(tp));
@@ -241,7 +240,8 @@ public class ApplicationEventProcessorTest {
         setupProcessor(true);
         
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
         processor.process(event);
-        verify(commitRequestManager).updateAutoCommitTimer(12345);
+        assertTrue(event.reconcileAndAutoCommit().isDone());
+        verify(commitRequestManager).updateTimerAndMaybeCommit(12345);
         verify(membershipManager).onConsumerPoll();
         verify(heartbeatRequestManager).resetPollTimer(12345);
     }
@@ -442,18 +442,35 @@ public class ApplicationEventProcessorTest {
         assertEquals(mixedSubscriptionError, thrown);
     }
 
-    @ParameterizedTest
-    @MethodSource("offsetsGenerator")
-    public void testSyncCommitEvent(Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets) {
-        SyncCommitEvent event = new SyncCommitEvent(offsets, 12345);
+    @Test
+    public void testSyncCommitEventWithEmptyOffsets() {
+        Map<TopicPartition, OffsetAndMetadata> allConsumed =
+            Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10, 
Optional.of(1), ""));
+        SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345);
+        setupProcessor(true);
+        doReturn(allConsumed).when(subscriptionState).allConsumed();
+        
doReturn(CompletableFuture.completedFuture(allConsumed)).when(commitRequestManager).commitSync(allConsumed,
 12345);
 
+        processor.process(event);
+        verify(commitRequestManager).commitSync(allConsumed, 12345);
+        assertTrue(event.offsetsReady.isDone());
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertEquals(allConsumed, committedOffsets);
+    }
+
+    @Test
+    public void testSyncCommitEvent() {
+        Map<TopicPartition, OffsetAndMetadata> offsets =
+            Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10, 
Optional.of(1), ""));
+        SyncCommitEvent event = new SyncCommitEvent(Optional.of(offsets), 
12345);
         setupProcessor(true);
-        
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets,
 12345);
+        
doReturn(CompletableFuture.completedFuture(offsets)).when(commitRequestManager).commitSync(offsets,
 12345);
 
         processor.process(event);
         verify(commitRequestManager).commitSync(offsets, 12345);
+        assertTrue(event.offsetsReady.isDone());
         Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
-        assertEquals(offsets.orElse(Map.of()), committedOffsets);
+        assertEquals(offsets, committedOffsets);
     }
 
     @Test
@@ -475,22 +492,40 @@ public class ApplicationEventProcessorTest {
         doReturn(future).when(commitRequestManager).commitSync(any(), 
anyLong());
         processor.process(event);
 
-        verify(commitRequestManager).commitSync(Optional.empty(), 12345);
+        verify(commitRequestManager).commitSync(Collections.emptyMap(), 12345);
+        assertTrue(event.offsetsReady.isDone());
         assertFutureThrows(event.future(), IllegalStateException.class);
     }
 
-    @ParameterizedTest
-    @MethodSource("offsetsGenerator")
-    public void testAsyncCommitEventWithOffsets(Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets) {
-        AsyncCommitEvent event = new AsyncCommitEvent(offsets);
+    @Test
+    public void testAsyncCommitEventWithEmptyOffsets() {
+        Map<TopicPartition, OffsetAndMetadata> allConsumed =
+            Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10, 
Optional.of(1), ""));
+        AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
+        setupProcessor(true);
+        
doReturn(CompletableFuture.completedFuture(allConsumed)).when(commitRequestManager).commitAsync(allConsumed);
+        doReturn(allConsumed).when(subscriptionState).allConsumed();
+
+        processor.process(event);
+        verify(commitRequestManager).commitAsync(allConsumed);
+        assertTrue(event.offsetsReady.isDone());
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertEquals(allConsumed, committedOffsets);
+    }
 
+    @Test
+    public void testAsyncCommitEvent() {
+        Map<TopicPartition, OffsetAndMetadata> offsets =
+            Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10, 
Optional.of(1), ""));
+        AsyncCommitEvent event = new AsyncCommitEvent(Optional.of(offsets));
         setupProcessor(true);
-        
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitAsync(offsets);
+        
doReturn(CompletableFuture.completedFuture(offsets)).when(commitRequestManager).commitAsync(offsets);
 
         processor.process(event);
         verify(commitRequestManager).commitAsync(offsets);
+        assertTrue(event.offsetsReady.isDone());
         Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
-        assertEquals(offsets.orElse(Map.of()), committedOffsets);
+        assertEquals(offsets, committedOffsets);
     }
 
     @Test
@@ -507,22 +542,17 @@ public class ApplicationEventProcessorTest {
         AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
 
         setupProcessor(true);
+        doReturn(Collections.emptyMap()).when(subscriptionState).allConsumed();
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = new 
CompletableFuture<>();
         future.completeExceptionally(new IllegalStateException());
         doReturn(future).when(commitRequestManager).commitAsync(any());
         processor.process(event);
 
-        verify(commitRequestManager).commitAsync(Optional.empty());
+        verify(commitRequestManager).commitAsync(Collections.emptyMap());
+        assertTrue(event.offsetsReady.isDone());
         assertFutureThrows(event.future(), IllegalStateException.class);
     }
 
-    private static Stream<Arguments> offsetsGenerator() {
-        return Stream.of(
-            Arguments.of(Optional.empty()),
-            Arguments.of(Optional.of(Map.of(new TopicPartition("topic", 0), 
new OffsetAndMetadata(10, Optional.of(1), ""))))
-        );
-    }
-
     private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
         return 
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
     }

Reply via email to