This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b213c64f97f KAFKA-17480: New consumer commit all consumed should 
retrieve offsets in background thread (#17150)
b213c64f97f is described below

commit b213c64f97fa6b4ab3b79b828e37095603841d6f
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Nov 7 22:45:44 2024 +0800

    KAFKA-17480: New consumer commit all consumed should retrieve offsets in 
background thread (#17150)
    
    Reviewers: Lianet Magrans <[email protected]>, Kirk True 
<[email protected]>, TengYao Chi <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  59 +++----
 .../consumer/internals/CommitRequestManager.java   |  72 +++++---
 .../consumer/internals/RequestManagers.java        |   3 +-
 .../events/ApplicationEventProcessor.java          |  33 ++--
 .../internals/events/AsyncCommitEvent.java         |   4 +-
 .../consumer/internals/events/CommitEvent.java     |  19 ++-
 .../consumer/internals/events/SyncCommitEvent.java |   5 +-
 .../consumer/internals/AsyncKafkaConsumerTest.java | 163 ++++++------------
 .../internals/CommitRequestManagerTest.java        | 182 ++++++++++++++++++---
 .../events/ApplicationEventProcessorTest.java      | 115 ++++++++++++-
 10 files changed, 434 insertions(+), 221 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index c1e84a4eae5..7fda9a20c05 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -249,7 +249,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
     // Last triggered async commit future. Used to wait until all previous 
async commits are completed.
     // We only need to keep track of the last one, since they are guaranteed 
to complete in order.
-    private CompletableFuture<Void> lastPendingAsyncCommit = null;
+    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
lastPendingAsyncCommit = null;
 
     // currentThread holds the threadId of the current thread accessing the 
AsyncKafkaConsumer
     // and is used to prevent multithreaded access
@@ -752,43 +752,43 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public void commitAsync(OffsetCommitCallback callback) {
-        commitAsync(subscriptions.allConsumed(), callback);
+        commitAsync(Optional.empty(), callback);
     }
 
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback) {
+        commitAsync(Optional.of(offsets), callback);
+    }
+
+    private void commitAsync(Optional<Map<TopicPartition, OffsetAndMetadata>> 
offsets, OffsetCommitCallback callback) {
         acquireAndEnsureOpen();
         try {
             AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
-            lastPendingAsyncCommit = commit(asyncCommitEvent).whenComplete((r, 
t) -> {
+            lastPendingAsyncCommit = 
commit(asyncCommitEvent).whenComplete((committedOffsets, throwable) -> {
 
-                if (t == null) {
-                    
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets);
+                if (throwable == null) {
+                    
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(committedOffsets);
                 }
 
                 if (callback == null) {
-                    if (t != null) {
-                        log.error("Offset commit with offsets {} failed", 
offsets, t);
+                    if (throwable != null) {
+                        log.error("Offset commit with offsets {} failed", 
committedOffsets, throwable);
                     }
                     return;
                 }
 
-                
offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback, offsets, 
(Exception) t);
+                
offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback, 
committedOffsets, (Exception) throwable);
             });
         } finally {
             release();
         }
     }
 
-    private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
+    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commit(final CommitEvent commitEvent) {
         maybeThrowInvalidGroupIdException();
         offsetCommitCallbackInvoker.executeCallbacks();
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets();
-        log.debug("Committing offsets: {}", offsets);
-        offsets.forEach(this::updateLastSeenEpochIfNewer);
-
-        if (offsets.isEmpty()) {
+        if (commitEvent.offsets().isPresent() && 
commitEvent.offsets().get().isEmpty()) {
             return CompletableFuture.completedFuture(null);
         }
 
@@ -828,7 +828,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             } else {
                 log.info("Seeking to offset {} for partition {}", offset, 
partition);
             }
-            updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
 
             Timer timer = time.timer(defaultApiTimeoutMs);
             SeekUnvalidatedEvent seekUnvalidatedEventEvent = new 
SeekUnvalidatedEvent(
@@ -914,9 +913,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 calculateDeadlineMs(time, timeout));
             wakeupTrigger.setActiveTask(event.future());
             try {
-                final Map<TopicPartition, OffsetAndMetadata> committedOffsets 
= applicationEventHandler.addAndGet(event);
-                committedOffsets.forEach(this::updateLastSeenEpochIfNewer);
-                return committedOffsets;
+                return applicationEventHandler.addAndGet(event);
             } catch (TimeoutException e) {
                 throw new TimeoutException("Timeout of " + timeout.toMillis() 
+ "ms expired before the last " +
                     "committed offset for partitions " + partitions + " could 
be determined. Try tuning " +
@@ -1294,13 +1291,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     // Visible for testing
     void commitSyncAllConsumed(final Timer timer) {
-        Map<TopicPartition, OffsetAndMetadata> allConsumed = 
subscriptions.allConsumed();
-        log.debug("Sending synchronous auto-commit of offsets {} on closing", 
allConsumed);
+        log.debug("Sending synchronous auto-commit on closing");
         try {
-            commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
+            commitSync(Duration.ofMillis(timer.remainingMs()));
         } catch (Exception e) {
             // consistent with async auto-commit failures, we do not propagate 
the exception
-            log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+            log.warn("Synchronous auto-commit failed", e);
         }
         timer.update();
     }
@@ -1318,28 +1314,32 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      */
     @Override
     public void commitSync(final Duration timeout) {
-        commitSync(subscriptions.allConsumed(), timeout);
+        commitSync(Optional.empty(), timeout);
     }
 
     @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
-        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
+        commitSync(Optional.of(offsets), 
Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, 
Duration timeout) {
+        commitSync(Optional.of(offsets), timeout);
+    }
+
+    private void commitSync(Optional<Map<TopicPartition, OffsetAndMetadata>> 
offsets, Duration timeout) {
         acquireAndEnsureOpen();
         long commitStart = time.nanoseconds();
         try {
             SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, 
calculateDeadlineMs(time, timeout));
-            CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitFuture = commit(syncCommitEvent);
 
             Timer requestTimer = time.timer(timeout.toMillis());
             awaitPendingAsyncCommitsAndExecuteCommitCallbacks(requestTimer, 
true);
 
             wakeupTrigger.setActiveTask(commitFuture);
-            ConsumerUtils.getResult(commitFuture, requestTimer);
-            interceptors.onCommit(offsets);
+            Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
ConsumerUtils.getResult(commitFuture, requestTimer);
+            interceptors.onCommit(committedOffsets);
         } finally {
             wakeupTrigger.clearTask();
             kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - 
commitStart);
@@ -1588,11 +1588,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         return groupMetadata.get().isPresent();
     }
 
-    private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, 
OffsetAndMetadata offsetAndMetadata) {
-        if (offsetAndMetadata != null)
-            offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
-    }
-
     /**
      * This method signals the background thread to {@link 
CreateFetchRequestsEvent create fetch requests}.
      *
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index dfa2520a02b..a5cb4753b38 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -72,6 +72,7 @@ import static 
org.apache.kafka.common.protocol.Errors.COORDINATOR_LOAD_IN_PROGRE
 public class CommitRequestManager implements RequestManager, 
MemberStateListener {
     private final Time time;
     private final SubscriptionState subscriptions;
+    private final ConsumerMetadata metadata;
     private final LogContext logContext;
     private final Logger log;
     private final Optional<AutoCommitState> autoCommitState;
@@ -102,15 +103,16 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
     private final MemberInfo memberInfo;
 
     public CommitRequestManager(
-            final Time time,
-            final LogContext logContext,
-            final SubscriptionState subscriptions,
-            final ConsumerConfig config,
-            final CoordinatorRequestManager coordinatorRequestManager,
-            final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
-            final String groupId,
-            final Optional<String> groupInstanceId,
-            final Metrics metrics) {
+        final Time time,
+        final LogContext logContext,
+        final SubscriptionState subscriptions,
+        final ConsumerConfig config,
+        final CoordinatorRequestManager coordinatorRequestManager,
+        final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
+        final String groupId,
+        final Optional<String> groupInstanceId,
+        final Metrics metrics,
+        final ConsumerMetadata metadata) {
         this(time,
             logContext,
             subscriptions,
@@ -122,7 +124,8 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
             config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG),
             OptionalDouble.empty(),
-            metrics);
+            metrics,
+            metadata);
     }
 
     // Visible for testing
@@ -138,7 +141,8 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         final long retryBackoffMs,
         final long retryBackoffMaxMs,
         final OptionalDouble jitter,
-        final Metrics metrics) {
+        final Metrics metrics,
+        final ConsumerMetadata metadata) {
         Objects.requireNonNull(coordinatorRequestManager, "Coordinator is 
needed upon committing offsets");
         this.time = time;
         this.logContext = logContext;
@@ -155,6 +159,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         this.groupId = groupId;
         this.groupInstanceId = groupInstanceId;
         this.subscriptions = subscriptions;
+        this.metadata = metadata;
         this.retryBackoffMs = retryBackoffMs;
         this.retryBackoffMaxMs = retryBackoffMaxMs;
         this.jitter = jitter;
@@ -381,20 +386,22 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
      * exceptionally depending on the response. If the request fails with a 
retriable error, the
      * future will be completed with a {@link RetriableCommitFailedException}.
      */
-    public CompletableFuture<Void> commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
-        if (offsets.isEmpty()) {
+    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(subscriptions::allConsumed);
+        if (commitOffsets.isEmpty()) {
             log.debug("Skipping commit of empty offsets");
-            return CompletableFuture.completedFuture(null);
+            return CompletableFuture.completedFuture(Map.of());
         }
-        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(offsets, Long.MAX_VALUE);
+        maybeUpdateLastSeenEpochIfNewer(commitOffsets);
+        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(commitOffsets, Long.MAX_VALUE);
         pendingRequests.addOffsetCommitRequest(commitRequest);
 
-        CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
asyncCommitResult = new CompletableFuture<>();
         commitRequest.future.whenComplete((committedOffsets, error) -> {
             if (error != null) {
                 
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
             } else {
-                asyncCommitResult.complete(null);
+                asyncCommitResult.complete(commitOffsets);
             }
         });
         return asyncCommitResult;
@@ -403,15 +410,20 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
     /**
      * Commit offsets, retrying on expected retriable errors while the retry 
timeout hasn't expired.
      *
-     * @param offsets               Offsets to commit
-     * @param deadlineMs            Time until which the request will be 
retried if it fails with
-     *                              an expected retriable error.
+     * @param offsets    Offsets to commit
+     * @param deadlineMs Time until which the request will be retried if it 
fails with
+     *                   an expected retriable error.
      * @return Future that will complete when a successful response
      */
-    public CompletableFuture<Void> commitSync(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                              final long deadlineMs) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
-        OffsetCommitRequestState requestState = 
createOffsetCommitRequest(offsets, deadlineMs);
+    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitSync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets,
+                                                                               
 final long deadlineMs) {
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(subscriptions::allConsumed);
+        if (commitOffsets.isEmpty()) {
+            return CompletableFuture.completedFuture(Map.of());
+        }
+        maybeUpdateLastSeenEpochIfNewer(commitOffsets);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new 
CompletableFuture<>();
+        OffsetCommitRequestState requestState = 
createOffsetCommitRequest(commitOffsets, deadlineMs);
         commitSyncWithRetries(requestState, result);
         return result;
     }
@@ -439,14 +451,14 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
     }
 
     private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt,
-                                       CompletableFuture<Void> result) {
+                                       CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> result) {
         pendingRequests.addOffsetCommitRequest(requestAttempt);
 
         // Retry the same commit request while it fails with 
RetriableException and the retry
         // timeout hasn't expired.
         requestAttempt.future.whenComplete((res, error) -> {
             if (error == null) {
-                result.complete(null);
+                result.complete(requestAttempt.offsets);
             } else {
                 if (error instanceof RetriableException) {
                     if (requestAttempt.isExpired()) {
@@ -531,6 +543,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                     "outbound buffer:" + fetchRequest);
             }
             if (error == null) {
+                maybeUpdateLastSeenEpochIfNewer(res);
                 result.complete(res);
             } else {
                 if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
@@ -615,6 +628,13 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests);
     }
 
+    private void maybeUpdateLastSeenEpochIfNewer(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
+        offsets.forEach((topicPartition, offsetAndMetadata) -> {
+            if (offsetAndMetadata != null)
+                offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
+        });
+    }
+
     class OffsetCommitRequestState extends RetriableRequestState {
         private Map<TopicPartition, OffsetAndMetadata> offsets;
         private final String groupId;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 7339a49ed6d..7f682c81fc4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -205,7 +205,8 @@ public class RequestManagers implements Closeable {
                             offsetCommitCallbackInvoker,
                             groupRebalanceConfig.groupId,
                             groupRebalanceConfig.groupInstanceId,
-                            metrics);
+                            metrics,
+                            metadata);
                     membershipManager = new ConsumerMembershipManager(
                             groupRebalanceConfig.groupId,
                             groupRebalanceConfig.groupInstanceId,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 2879e471c7a..642e5e0cca2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -198,29 +198,41 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     private void process(final AsyncCommitEvent event) {
-        if (!requestManagers.commitRequestManager.isPresent()) {
+        if (requestManagers.commitRequestManager.isEmpty()) {
+            event.future().completeExceptionally(new KafkaException("Unable to 
async commit " +
+                "offset because the CommitRequestManager is not available. 
Check if group.id was set correctly"));
             return;
         }
 
-        CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Void> future = manager.commitAsync(event.offsets());
-        future.whenComplete(complete(event.future()));
+        try {
+            CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitAsync(event.offsets());
+            future.whenComplete(complete(event.future()));
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
     }
 
     private void process(final SyncCommitEvent event) {
-        if (!requestManagers.commitRequestManager.isPresent()) {
+        if (requestManagers.commitRequestManager.isEmpty()) {
+            event.future().completeExceptionally(new KafkaException("Unable to 
sync commit " +
+                "offset because the CommitRequestManager is not available. 
Check if group.id was set correctly"));
             return;
         }
 
-        CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Void> future = manager.commitSync(event.offsets(), 
event.deadlineMs());
-        future.whenComplete(complete(event.future()));
+        try {
+            CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitSync(event.offsets(), event.deadlineMs());
+            future.whenComplete(complete(event.future()));
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
     }
 
     private void process(final FetchCommittedOffsetsEvent event) {
-        if (!requestManagers.commitRequestManager.isPresent()) {
+        if (requestManagers.commitRequestManager.isEmpty()) {
             event.future().completeExceptionally(new KafkaException("Unable to 
fetch committed " +
-                    "offset because the CommittedRequestManager is not 
available. Check if group.id was set correctly"));
+                    "offset because the CommitRequestManager is not available. 
Check if group.id was set correctly"));
             return;
         }
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
@@ -523,6 +535,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
 
     private void process(final SeekUnvalidatedEvent event) {
         try {
+            event.offsetEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(event.partition(), epoch));
             SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
                     event.offset(),
                     event.offsetEpoch(),
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
index c36f0534b36..238aa1979d0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
@@ -20,13 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Event to commit offsets without waiting for a response, so the request 
won't be retried.
+ * If no offsets are provided, this event will commit all consumed offsets.
  */
 public class AsyncCommitEvent extends CommitEvent {
 
-    public AsyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
+    public AsyncCommitEvent(final Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets) {
         super(Type.COMMIT_ASYNC, offsets, Long.MAX_VALUE);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
index dc863b0ee65..9c94d3b9d3b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
@@ -21,15 +21,16 @@ import org.apache.kafka.common.TopicPartition;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 
-public abstract class CommitEvent extends CompletableApplicationEvent<Void> {
+public abstract class CommitEvent extends 
CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
 
     /**
      * Offsets to commit per partition.
      */
-    private final Map<TopicPartition, OffsetAndMetadata> offsets;
+    private final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets;
 
-    protected CommitEvent(final Type type, final Map<TopicPartition, 
OffsetAndMetadata> offsets, final long deadlineMs) {
+    protected CommitEvent(final Type type, final Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets, final long deadlineMs) {
         super(type, deadlineMs);
         this.offsets = validate(offsets);
     }
@@ -38,17 +39,21 @@ public abstract class CommitEvent extends 
CompletableApplicationEvent<Void> {
      * Validates the offsets are not negative and then returns the given 
offset map as
      * {@link Collections#unmodifiableMap(Map) as unmodifiable}.
      */
-    private static Map<TopicPartition, OffsetAndMetadata> validate(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
-        for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {
+    private static Optional<Map<TopicPartition, OffsetAndMetadata>> 
validate(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
+        if (!offsets.isPresent()) {
+            return Optional.empty();
+        }
+
+        for (OffsetAndMetadata offsetAndMetadata : offsets.get().values()) {
             if (offsetAndMetadata.offset() < 0) {
                 throw new IllegalArgumentException("Invalid offset: " + 
offsetAndMetadata.offset());
             }
         }
 
-        return Collections.unmodifiableMap(offsets);
+        return Optional.of(Collections.unmodifiableMap(offsets.get()));
     }
 
-    public Map<TopicPartition, OffsetAndMetadata> offsets() {
+    public Optional<Map<TopicPartition, OffsetAndMetadata>> offsets() {
         return offsets;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
index 7dc7a023a8d..5b02af2c0c7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
@@ -20,14 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Event to commit offsets waiting for a response and retrying on expected 
retriable errors until
- * the timer expires.
+ * the timer expires. If no offsets are provided, this event will commit all 
consumed offsets.
  */
 public class SyncCommitEvent extends CommitEvent {
 
-    public SyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets, final long deadlineMs) {
+    public SyncCommitEvent(final Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets, final long deadlineMs) {
         super(Type.COMMIT_SYNC, offsets, deadlineMs);
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 380b7082b97..54a41587b06 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -54,7 +54,6 @@ import 
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChang
 import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -185,8 +184,22 @@ public class AsyncKafkaConsumerTest {
     }
 
     private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
+        // disable auto-commit by default, so we don't need to handle 
SyncCommitEvent for each case
+        if (!props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        }
         final ConsumerConfig config = new ConsumerConfig(props);
-        return newConsumer(config);
+        return new AsyncKafkaConsumer<>(
+            config,
+            new StringDeserializer(),
+            new StringDeserializer(),
+            time,
+            (a, b, c, d, e, f, g) -> applicationEventHandler,
+            a -> backgroundEventReaper,
+            (a, b, c, d, e, f, g) -> fetchCollector,
+            (a, b, c, d) -> metadata,
+            backgroundEventQueue
+        );
     }
 
     private AsyncKafkaConsumer<String, String> newConsumer(ConsumerConfig 
config) {
@@ -209,10 +222,10 @@ public class AsyncKafkaConsumerTest {
         ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
         SubscriptionState subscriptions,
         String groupId,
-        String clientId) {
+        String clientId,
+        boolean autoCommitEnabled) {
         long retryBackoffMs = 100L;
         int defaultApiTimeoutMs = 1000;
-        boolean autoCommitEnabled = true;
         return new AsyncKafkaConsumer<>(
             new LogContext(),
             clientId,
@@ -281,8 +294,10 @@ public class AsyncKafkaConsumerTest {
         final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
         verify(applicationEventHandler).add(commitEventCaptor.capture());
         final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
-        assertEquals(offsets, commitEvent.offsets());
-        assertDoesNotThrow(() -> commitEvent.future().complete(null));
+        assertTrue(commitEvent.offsets().isPresent());
+        assertEquals(offsets, commitEvent.offsets().get());
+
+        commitEvent.future().complete(offsets);
         assertDoesNotThrow(() -> consumer.commitAsync(offsets, null));
 
         // Clean-up. Close the consumer here as we know it will cause a 
TimeoutException to be thrown.
@@ -347,25 +362,6 @@ public class AsyncKafkaConsumerTest {
         assertTrue((double) metric.metricValue() > 0);
     }
 
-    @Test
-    public void testCommittedLeaderEpochUpdate() {
-        consumer = newConsumer();
-        final TopicPartition t0 = new TopicPartition("t0", 2);
-        final TopicPartition t1 = new TopicPartition("t0", 3);
-        final TopicPartition t2 = new TopicPartition("t0", 4);
-        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new 
HashMap<>();
-        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, 
Optional.of(2), ""));
-        topicPartitionOffsets.put(t1, null);
-        topicPartitionOffsets.put(t2, new OffsetAndMetadata(20L, 
Optional.of(3), ""));
-        
completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets);
-
-        assertDoesNotThrow(() -> 
consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000)));
-
-        verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
-        verify(metadata).updateLastSeenEpochIfNewer(t2, 3);
-        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
-    }
-
     @Test
     public void testCommittedExceptionThrown() {
         consumer = newConsumer();
@@ -388,7 +384,6 @@ public class AsyncKafkaConsumerTest {
         final TopicPartition tp = new TopicPartition(topicName, partition);
         
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
         
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
-        completeCommitSyncApplicationEventSuccessfully();
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
@@ -410,7 +405,6 @@ public class AsyncKafkaConsumerTest {
             return Fetch.empty();
         }).doAnswer(invocation -> 
Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
         
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
-        completeCommitSyncApplicationEventSuccessfully();
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
@@ -434,7 +428,6 @@ public class AsyncKafkaConsumerTest {
             return Fetch.forPartition(tp, records, true, new 
OffsetAndMetadata(4, Optional.of(0), ""));
         }).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
         
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
-        completeCommitSyncApplicationEventSuccessfully();
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
@@ -492,7 +485,6 @@ public class AsyncKafkaConsumerTest {
         doReturn(Fetch.forPartition(tp, records, true, new 
OffsetAndMetadata(4, Optional.of(0), "")))
             .when(fetchCollector).collectFetch(any(FetchBuffer.class));
         
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
-        completeCommitSyncApplicationEventSuccessfully();
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(singleton(tp));
@@ -527,71 +519,6 @@ public class AsyncKafkaConsumerTest {
         assertThrows(callbackException.getClass(), () -> 
consumer.commitSync());
     }
 
-    @Test
-    public void testCommitSyncLeaderEpochUpdate() {
-        consumer = newConsumer();
-        final TopicPartition t0 = new TopicPartition("t0", 2);
-        final TopicPartition t1 = new TopicPartition("t0", 3);
-        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new 
HashMap<>();
-        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, 
Optional.of(2), ""));
-        topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, 
Optional.of(1), ""));
-        completeCommitSyncApplicationEventSuccessfully();
-
-        completeAssignmentChangeEventSuccessfully();
-        consumer.assign(Arrays.asList(t0, t1));
-
-        assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets));
-
-        verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
-        verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
-        
verify(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
-    }
-
-    @Test
-    public void testCommitAsyncLeaderEpochUpdate() {
-        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.NONE);
-        consumer = newConsumer(
-            mock(FetchBuffer.class),
-            new ConsumerInterceptors<>(Collections.emptyList()),
-            mock(ConsumerRebalanceListenerInvoker.class),
-            subscriptions,
-            "group-id",
-            "client-id");
-        completeCommitSyncApplicationEventSuccessfully();
-        final TopicPartition t0 = new TopicPartition("t0", 2);
-        final TopicPartition t1 = new TopicPartition("t0", 3);
-        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new 
HashMap<>();
-        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, 
Optional.of(2), ""));
-        topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, 
Optional.of(1), ""));
-        when(metadata.currentLeader(t0)).thenReturn(
-            new LeaderAndEpoch(Optional.of(
-                new Node(1, "host", 9000)), Optional.of(1)));
-        when(metadata.currentLeader(t1)).thenReturn(
-            new LeaderAndEpoch(Optional.of(
-                new Node(1, "host", 9000)), Optional.of(1)));
-        completeAssignmentChangeEventSuccessfully();
-        consumer.assign(Arrays.asList(t0, t1));
-        completeSeekUnvalidatedEventSuccessfully();
-        consumer.seek(t0, 10);
-        consumer.seek(t1, 20);
-
-        MockCommitCallback callback = new MockCommitCallback();
-        assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, 
callback));
-
-        verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
-        verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
-        
verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
-
-        // Clean-Up. Close the consumer here as we know it will cause a 
TimeoutException to be thrown.
-        // If we get an error *other* than the TimeoutException, we'll fail 
the test.
-        try {
-            Exception e = assertThrows(KafkaException.class, () -> 
consumer.close(Duration.ZERO));
-            assertInstanceOf(TimeoutException.class, e.getCause());
-        } finally {
-            consumer = null;
-        }
-    }
-
     @Test
     public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
         final TopicPartition tp = new TopicPartition("foo", 0);
@@ -722,7 +649,8 @@ public class AsyncKafkaConsumerTest {
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
             "group-id",
-            "client-id"));
+            "client-id",
+            false));
         completeUnsubscribeApplicationEventSuccessfully();
         consumer.close(Duration.ZERO);
         verifyUnsubscribeEvent(subscriptions);
@@ -739,7 +667,8 @@ public class AsyncKafkaConsumerTest {
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
             "group-id",
-            "client-id"));
+            "client-id",
+            false));
         doThrow(new 
KafkaException()).when(consumer).processBackgroundEvents(any(), any(), any());
         assertThrows(KafkaException.class, () -> 
consumer.close(Duration.ZERO));
         verifyUnsubscribeEvent(subscriptions);
@@ -748,8 +677,7 @@ public class AsyncKafkaConsumerTest {
     }
 
     @Test
-    public void testAutoCommitSyncEnabled() {
-        completeCommitSyncApplicationEventSuccessfully();
+    public void testCommitSyncAllConsumed() {
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.NONE);
         consumer = newConsumer(
             mock(FetchBuffer.class),
@@ -757,14 +685,19 @@ public class AsyncKafkaConsumerTest {
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
             "group-id",
-            "client-id");
+            "client-id",
+            false);
         completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(singleton("topic"), 
mock(ConsumerRebalanceListener.class));
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         completeSeekUnvalidatedEventSuccessfully();
         subscriptions.seek(new TopicPartition("topic", 0), 100);
         consumer.commitSyncAllConsumed(time.timer(100));
-        verify(applicationEventHandler).add(any(SyncCommitEvent.class));
+
+        ArgumentCaptor<SyncCommitEvent> eventCaptor = 
ArgumentCaptor.forClass(SyncCommitEvent.class);
+        verify(applicationEventHandler).add(eventCaptor.capture());
+        SyncCommitEvent capturedEvent = eventCaptor.getValue();
+        assertFalse(capturedEvent.offsets().isPresent(), "Expected empty 
optional offsets");
     }
 
     @Test
@@ -776,12 +709,15 @@ public class AsyncKafkaConsumerTest {
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
             "group-id",
-            "client-id");
+            "client-id",
+            false);
         completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(singleton("topic"), 
mock(ConsumerRebalanceListener.class));
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         completeSeekUnvalidatedEventSuccessfully();
         subscriptions.seek(new TopicPartition("topic", 0), 100);
+        completeUnsubscribeApplicationEventSuccessfully();
+        consumer.close();
         verify(applicationEventHandler, 
never()).add(any(SyncCommitEvent.class));
     }
 
@@ -1035,7 +971,9 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testNoWakeupInCloseCommit() {
         TopicPartition tp = new TopicPartition("topic1", 0);
-        consumer = newConsumer();
+        Properties props = requiredConsumerConfigAndGroupId("consumer-group");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+        consumer = newConsumer(props);
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(Collections.singleton(tp));
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
@@ -1280,8 +1218,7 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
         final String groupId = "consumerGroupA";
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
-        consumer = newConsumer(config);
+        consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
 
         final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
 
@@ -1297,8 +1234,7 @@ public class AsyncKafkaConsumerTest {
         final String groupInstanceId = "groupInstanceId1";
         final Properties props = requiredConsumerConfigAndGroupId(groupId);
         props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
-        final ConsumerConfig config = new ConsumerConfig(props);
-        consumer = newConsumer(config);
+        consumer = newConsumer(props);
 
         final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
 
@@ -1333,9 +1269,8 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testGroupMetadataUpdate() {
         final String groupId = "consumerGroupA";
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
         try (final MockedStatic<RequestManagers> requestManagers = 
mockStatic(RequestManagers.class)) {
-            consumer = newConsumer(config);
+            consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
             final ConsumerGroupMetadata oldGroupMetadata = 
consumer.groupMetadata();
             final MemberStateListener groupMetadataUpdateListener = 
captureGroupMetadataUpdateListener(requestManagers);
             final int expectedMemberEpoch = 42;
@@ -1355,9 +1290,8 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testGroupMetadataIsResetAfterUnsubscribe() {
         final String groupId = "consumerGroupA";
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
         try (final MockedStatic<RequestManagers> requestManagers = 
mockStatic(RequestManagers.class)) {
-            consumer = newConsumer(config);
+            consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
             final MemberStateListener groupMetadataUpdateListener = 
captureGroupMetadataUpdateListener(requestManagers);
             consumer.subscribe(singletonList("topic"));
             final int memberEpoch = 42;
@@ -1479,8 +1413,7 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testBackgroundError() {
         final String groupId = "consumerGroupA";
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
-        consumer = newConsumer(config);
+        consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
 
         final KafkaException expectedException = new KafkaException("Nobody 
expects the Spanish Inquisition");
         final ErrorEvent errorEvent = new ErrorEvent(expectedException);
@@ -1495,8 +1428,7 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testMultipleBackgroundErrors() {
         final String groupId = "consumerGroupA";
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
-        consumer = newConsumer(config);
+        consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
 
         final KafkaException expectedException1 = new KafkaException("Nobody 
expects the Spanish Inquisition");
         final ErrorEvent errorEvent1 = new ErrorEvent(expectedException1);
@@ -1589,7 +1521,8 @@ public class AsyncKafkaConsumerTest {
                 mock(ConsumerRebalanceListenerInvoker.class),
                 subscriptions,
                 "group-id",
-                "client-id");
+                "client-id",
+                false);
         final TopicPartition tp = new TopicPartition("topic", 0);
         final List<ConsumerRecord<String, String>> records = singletonList(
                 new ConsumerRecord<>("topic", 0, 2, "key1", "value1"));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 9ded26b9e71..0ecd99afbd4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -87,6 +87,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -103,6 +104,7 @@ public class CommitRequestManagerTest {
     private static final String DEFAULT_GROUP_INSTANCE_ID = 
"group-instance-id";
     private final Node mockedNode = new Node(1, "host1", 9092);
     private SubscriptionState subscriptionState;
+    private ConsumerMetadata metadata;
     private LogContext logContext;
     private MockTime time;
     private CoordinatorRequestManager coordinatorRequestManager;
@@ -118,6 +120,7 @@ public class CommitRequestManagerTest {
         this.logContext = new LogContext();
         this.time = new MockTime(0);
         this.subscriptionState = new SubscriptionState(new LogContext(), 
OffsetResetStrategy.EARLIEST);
+        this.metadata = mock(ConsumerMetadata.class);
         this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
         this.offsetCommitCallbackInvoker = 
mock(OffsetCommitCallbackInvoker.class);
         this.props = new Properties();
@@ -142,7 +145,8 @@ public class CommitRequestManagerTest {
                 retryBackoffMs,
                 retryBackoffMaxMs,
                 OptionalDouble.of(0),
-                metrics);
+                metrics,
+                metadata);
 
         commitRequestManager.onMemberEpochUpdated(Optional.of(1), 
Uuid.randomUuid().toString());
         Set<TopicPartition> requestedPartitions = Collections.singleton(new 
TopicPartition("topic-1", 1));
@@ -175,7 +179,7 @@ public class CommitRequestManagerTest {
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManager.commitAsync(offsets);
+        commitRequestManager.commitAsync(Optional.of(offsets));
         assertPoll(false, 0, commitRequestManager);
     }
 
@@ -186,7 +190,7 @@ public class CommitRequestManagerTest {
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManager.commitAsync(offsets);
+        commitRequestManager.commitAsync(Optional.of(offsets));
         assertPoll(false, 0, commitRequestManager);
         assertPoll(true, 1, commitRequestManager);
     }
@@ -198,7 +202,7 @@ public class CommitRequestManagerTest {
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManager.commitAsync(offsets);
+        commitRequestManager.commitAsync(Optional.of(offsets));
         assertPoll(1, commitRequestManager);
     }
 
@@ -239,9 +243,9 @@ public class CommitRequestManagerTest {
 
         // Add the requests to the CommitRequestManager and store their futures
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        commitManager.commitSync(offsets1, deadlineMs);
+        commitManager.commitSync(Optional.of(offsets1), deadlineMs);
         commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 0)), deadlineMs);
-        commitManager.commitSync(offsets2, deadlineMs);
+        commitManager.commitSync(Optional.of(offsets2), deadlineMs);
         commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 1)), deadlineMs);
 
         // Poll the CommitRequestManager and verify that the 
inflightOffsetFetches size is correct
@@ -274,13 +278,146 @@ public class CommitRequestManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
             new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
-        commitRequestManager.commitAsync(offsets);
+        commitRequestManager.commitAsync(Optional.of(offsets));
         assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
         assertEquals(1, 
commitRequestManager.poll(time.milliseconds()).unsentRequests.size());
         
assertTrue(commitRequestManager.unsentOffsetCommitRequests().isEmpty());
         assertEmptyPendingRequests(commitRequestManager);
     }
 
+    @Test
+    public void testCommitSync() {
+        subscriptionState = mock(SubscriptionState.class);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        TopicPartition tp = new TopicPartition("topic", 1);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
+        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
+
+        CommitRequestManager commitRequestManager = create(false, 100);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitSync(
+            Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
+        assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
+        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
+        pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
+            "topic",
+            1,
+            (short) 1,
+            Errors.NONE)));
+
+        verify(subscriptionState, never()).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
+        assertTrue(future.isDone());
+        assertEquals(offsets, commitOffsets);
+    }
+
+    @Test
+    public void testCommitSyncWithEmptyOffsets() {
+        subscriptionState = mock(SubscriptionState.class);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        TopicPartition tp = new TopicPartition("topic", 1);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
+        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
+        doReturn(offsets).when(subscriptionState).allConsumed();
+
+        CommitRequestManager commitRequestManager = create(false, 100);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitSync(
+            Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
+        assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
+        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
+        pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
+            "topic",
+            1,
+            (short) 1,
+            Errors.NONE)));
+
+        verify(subscriptionState).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
+        assertTrue(future.isDone());
+        assertEquals(offsets, commitOffsets);
+    }
+
+    @Test
+    public void testCommitSyncWithEmptyAllConsumedOffsets() {
+        subscriptionState = mock(SubscriptionState.class);
+        doReturn(Map.of()).when(subscriptionState).allConsumed();
+
+        CommitRequestManager commitRequestManager = create(true, 100);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitSync(
+            Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
+
+        verify(subscriptionState).allConsumed();
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
+        assertTrue(future.isDone());
+        assertTrue(commitOffsets.isEmpty());
+    }
+
+    @Test
+    public void testCommitAsync() {
+        subscriptionState = mock(SubscriptionState.class);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        TopicPartition tp = new TopicPartition("topic", 1);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
+        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
+
+        CommitRequestManager commitRequestManager = create(true, 100);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(Optional.of(offsets));
+        assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
+        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
+        pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
+            "topic",
+            1,
+            (short) 1,
+            Errors.NONE)));
+
+        verify(subscriptionState, never()).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        assertTrue(future.isDone());
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
+        assertEquals(offsets, commitOffsets);
+    }
+
+    @Test
+    public void testCommitAsyncWithEmptyOffsets() {
+        subscriptionState = mock(SubscriptionState.class);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        TopicPartition tp = new TopicPartition("topic", 1);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
+        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
+        doReturn(offsets).when(subscriptionState).allConsumed();
+
+        CommitRequestManager commitRequestManager = create(true, 100);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(Optional.empty());
+        assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
+        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
+        pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
+            "topic",
+            1,
+            (short) 1,
+            Errors.NONE)));
+
+        verify(subscriptionState).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        assertTrue(future.isDone());
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
+        assertEquals(offsets, commitOffsets);
+    }
+
+    @Test
+    public void testCommitAsyncWithEmptyAllConsumedOffsets() {
+        subscriptionState = mock(SubscriptionState.class);
+        doReturn(Map.of()).when(subscriptionState).allConsumed();
+
+        CommitRequestManager commitRequestManager = create(true, 100);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(Optional.empty());
+
+        verify(subscriptionState).allConsumed();
+        assertTrue(future.isDone());
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
+        assertTrue(commitOffsets.isEmpty());
+    }
+
     // This is the case of the async auto commit sent on calls to assign 
(async commit that
     // should not be retried).
     @Test
@@ -333,7 +470,7 @@ public class CommitRequestManagerTest {
             new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
         
sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManager, 
error, commitResult);
 
         // We expect that request should have been retried on this sync commit.
@@ -359,7 +496,7 @@ public class CommitRequestManagerTest {
             new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
 
         completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.UNKNOWN_MEMBER_ID);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
@@ -378,8 +515,8 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         // Send commit request expected to be retried on retriable errors
-        CompletableFuture<Void> commitResult = commitRequestManager.commitSync(
-            offsets, time.milliseconds() + defaultApiTimeoutMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(
+            Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
         completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.STALE_MEMBER_EPOCH);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
@@ -431,7 +568,7 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         // Async commit that won't be retried.
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitAsync(offsets);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitAsync(Optional.of(offsets));
 
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -781,7 +918,7 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         // Send async commit (not expected to be retried).
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitAsync(offsets);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitAsync(Optional.of(offsets));
         completeOffsetCommitRequestWithError(commitRequestManager, error);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
@@ -806,7 +943,7 @@ public class CommitRequestManagerTest {
 
         // Send sync offset commit request that fails with retriable error.
         long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
         completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);
 
         // Request retried after backoff, and fails with retriable again. 
Should not complete yet
@@ -839,7 +976,7 @@ public class CommitRequestManagerTest {
 
         // Send offset commit request that fails with retriable error.
         long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
         completeOffsetCommitRequestWithError(commitRequestManager, error);
 
         // Sleep to expire the request timeout. Request should fail on the 
next poll with a
@@ -869,7 +1006,7 @@ public class CommitRequestManagerTest {
 
         // Send async commit request that fails with retriable error (not 
expected to be retried).
         Errors retriableError = Errors.COORDINATOR_NOT_AVAILABLE;
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitAsync(offsets);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult 
= commitRequestManager.commitAsync(Optional.of(offsets));
         completeOffsetCommitRequestWithError(commitRequestManager, 
retriableError);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
@@ -894,7 +1031,7 @@ public class CommitRequestManagerTest {
         offsets.put(new TopicPartition("t1", 1), new OffsetAndMetadata(2));
         offsets.put(new TopicPartition("t1", 2), new OffsetAndMetadata(3));
 
-        commitRequestManager.commitSync(offsets, time.milliseconds() + 
defaultApiTimeoutMs);
+        commitRequestManager.commitSync(Optional.of(offsets), 
time.milliseconds() + defaultApiTimeoutMs);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
 
@@ -919,7 +1056,7 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        commitRequestManager.commitSync(offsets, deadlineMs);
+        commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
         res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new 
TimeoutException());
@@ -1175,7 +1312,7 @@ public class CommitRequestManagerTest {
                 new OffsetAndMetadata(0));
 
         long commitCreationTimeMs = time.milliseconds();
-        commitRequestManager.commitAsync(offsets);
+        commitRequestManager.commitAsync(Optional.of(offsets));
 
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -1338,7 +1475,7 @@ public class CommitRequestManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
 
-        commitRequestManager.commitAsync(offsets);
+        commitRequestManager.commitAsync(Optional.of(offsets));
         commitRequestManager.signalClose();
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -1384,7 +1521,7 @@ public class CommitRequestManagerTest {
     private void sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(
         final CommitRequestManager commitRequestManager,
         final Errors error,
-        final CompletableFuture<Void> commitResult) {
+        final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitResult) {
         completeOffsetCommitRequestWithError(commitRequestManager, error);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
@@ -1438,7 +1575,8 @@ public class CommitRequestManagerTest {
                 retryBackoffMs,
                 retryBackoffMaxMs,
                 OptionalDouble.of(0),
-                metrics));
+                metrics,
+                metadata));
     }
 
     private ClientResponse buildOffsetFetchClientResponse(
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index f1c05d2e6b9..a16f9612c74 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
 import 
org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
@@ -32,6 +33,7 @@ import 
org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
@@ -45,7 +47,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -56,6 +57,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -72,6 +74,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings("ClassDataAbstractionCoupling")
 public class ApplicationEventProcessorTest {
     private final Time time = new MockTime();
     private final CommitRequestManager commitRequestManager = 
mock(CommitRequestManager.class);
@@ -137,8 +140,6 @@ public class ApplicationEventProcessorTest {
         return Stream.of(
                 Arguments.of(new PollEvent(100)),
                 Arguments.of(new 
CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
-                Arguments.of(new AsyncCommitEvent(new HashMap<>())),
-                Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
                 Arguments.of(new CheckAndUpdatePositionsEvent(500)),
                 Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
                 Arguments.of(new AssignmentChangeEvent(12345, 12345, 
Collections.emptyList())));
@@ -202,14 +203,16 @@ public class ApplicationEventProcessorTest {
     @Test
     public void testSeekUnvalidatedEvent() {
         TopicPartition tp = new TopicPartition("topic", 0);
+        Optional<Integer> offsetEpoch = Optional.of(1);
         SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(
-                0, Optional.empty(), 
Metadata.LeaderAndEpoch.noLeaderOrEpoch());
-        SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, 
Optional.empty());
+                0, offsetEpoch, Metadata.LeaderAndEpoch.noLeaderOrEpoch());
+        SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, 
offsetEpoch);
 
         setupProcessor(false);
         
doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp);
         doNothing().when(subscriptionState).seekUnvalidated(eq(tp), any());
         processor.process(event);
+        verify(metadata).updateLastSeenEpochIfNewer(tp, offsetEpoch.get());
         verify(metadata).currentLeader(tp);
         verify(subscriptionState).seekUnvalidated(tp, position);
         assertDoesNotThrow(() -> event.future().get());
@@ -262,6 +265,27 @@ public class ApplicationEventProcessorTest {
         assertDoesNotThrow(() -> event.future().get());
     }
 
+    @Test
+    public void testFetchCommittedOffsetsEvent() {
+        TopicPartition tp0 = new TopicPartition("topic", 0);
+        TopicPartition tp1 = new TopicPartition("topic", 1);
+        TopicPartition tp2 = new TopicPartition("topic", 2);
+        Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2);
+        Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = Map.of(
+            tp0, new OffsetAndMetadata(10L, Optional.of(2), ""),
+            tp1, new OffsetAndMetadata(15L, Optional.empty(), ""),
+            tp2, new OffsetAndMetadata(20L, Optional.of(3), "")
+        );
+        FetchCommittedOffsetsEvent event = new 
FetchCommittedOffsetsEvent(partitions, 12345);
+
+        setupProcessor(true);
+        when(commitRequestManager.fetchOffsets(partitions, 
12345)).thenReturn(CompletableFuture.completedFuture(topicPartitionOffsets));
+        processor.process(event);
+
+        verify(commitRequestManager).fetchOffsets(partitions, 12345);
+        assertEquals(topicPartitionOffsets, assertDoesNotThrow(() -> 
event.future().get()));
+    }
+
     @Test
     public void testTopicSubscriptionChangeEventWithIllegalSubscriptionState() 
{
         subscriptionState = new SubscriptionState(new LogContext(), 
OffsetResetStrategy.EARLIEST);
@@ -360,6 +384,87 @@ public class ApplicationEventProcessorTest {
         assertDoesNotThrow(() -> event2.future().get());
     }
 
+    @ParameterizedTest
+    @MethodSource("offsetsGenerator")
+    public void testSyncCommitEvent(Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets) {
+        SyncCommitEvent event = new SyncCommitEvent(offsets, 12345);
+
+        setupProcessor(true);
+        
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets,
 12345);
+
+        processor.process(event);
+        verify(commitRequestManager).commitSync(offsets, 12345);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertEquals(offsets.orElse(Map.of()), committedOffsets);
+    }
+
+    @Test
+    public void testSyncCommitEventWithoutCommitRequestManager() {
+        SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345);
+
+        setupProcessor(false);
+        processor.process(event);
+        assertFutureThrows(event.future(), KafkaException.class);
+    }
+
+    @Test
+    public void testSyncCommitEventWithException() {
+        SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345);
+
+        setupProcessor(true);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = new 
CompletableFuture<>();
+        future.completeExceptionally(new IllegalStateException());
+        doReturn(future).when(commitRequestManager).commitSync(any(), 
anyLong());
+        processor.process(event);
+
+        verify(commitRequestManager).commitSync(Optional.empty(), 12345);
+        assertFutureThrows(event.future(), IllegalStateException.class);
+    }
+
+    @ParameterizedTest
+    @MethodSource("offsetsGenerator")
+    public void testAsyncCommitEventWithOffsets(Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets) {
+        AsyncCommitEvent event = new AsyncCommitEvent(offsets);
+
+        setupProcessor(true);
+        
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitAsync(offsets);
+
+        processor.process(event);
+        verify(commitRequestManager).commitAsync(offsets);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertEquals(offsets.orElse(Map.of()), committedOffsets);
+    }
+
+    @Test
+    public void testAsyncCommitEventWithoutCommitRequestManager() {
+        AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
+
+        setupProcessor(false);
+        processor.process(event);
+        assertFutureThrows(event.future(), KafkaException.class);
+    }
+
+    @Test
+    public void testAsyncCommitEventWithException() {
+        AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
+
+        setupProcessor(true);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = new 
CompletableFuture<>();
+        future.completeExceptionally(new IllegalStateException());
+        doReturn(future).when(commitRequestManager).commitAsync(any());
+        processor.process(event);
+
+        verify(commitRequestManager).commitAsync(Optional.empty());
+        assertFutureThrows(event.future(), IllegalStateException.class);
+    }
+
+    private static Stream<Arguments> offsetsGenerator() {
+        return Stream.of(
+            Arguments.of(Optional.empty()),
+            Arguments.of(Optional.of(Map.of(new TopicPartition("topic", 0), 
new OffsetAndMetadata(10, Optional.of(1), ""))))
+        );
+    }
+
     private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
         return 
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
     }

Reply via email to