This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 51bb0367945 KAFKA-19589: Check for ability to skip position validation 
in application thread when collecting buffered data (#20324)
51bb0367945 is described below

commit 51bb03679456fb55ef0db38fd3ce8d6ac44f3b35
Author: Kirk True <[email protected]>
AuthorDate: Wed Nov 26 12:18:19 2025 -0800

    KAFKA-19589: Check for ability to skip position validation in application 
thread when collecting buffered data (#20324)
    
    Introduces `PositionsValidator` which queries the assignment data  from
    the `SubscriptionState` from the  application thread, allowing
    `AsyncKafkaConsumer` to potentially avoid the need to wait.
    
    The impetus for this change is the observation of these two points:
    
    1. Waiting for the completion of
    `OffsetsRequestManager.updateFetchPositions()` on the application thread
    means either busy-waiting or blocking, either of which add significant
    (~60%) CPU load to the `AsyncKafkaConsumer` compared to the
    `ClassicKafkaConsumer`
    2. In testing, data shows that 99.99+% of the time that
    `OffsetsRequestManager.updateFetchPositions()` is called, the partitions
    are up-to-date and there is no need to fetch offsets.
    
    This patch allows the check for stable partitions to be made in the
    application thread, resulting in far less waiting in the critical path
    of `AsyncKafkaConsumer.poll()`.
    
    Reviewers: Lianet Magrans <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  17 ++-
 .../clients/consumer/internals/OffsetFetcher.java  |   2 +-
 .../consumer/internals/OffsetFetcherUtils.java     |  59 +++-----
 .../consumer/internals/OffsetsRequestManager.java  |  15 +-
 .../consumer/internals/PositionsValidator.java     | 151 +++++++++++++++++++++
 .../consumer/internals/RequestManagers.java        |   4 +-
 .../consumer/internals/SubscriptionState.java      |  22 ++-
 .../events/ApplicationEventProcessor.java          |   4 +-
 .../events/CheckAndUpdatePositionsEvent.java       |   8 +-
 .../consumer/internals/AsyncKafkaConsumerTest.java |   7 +-
 .../internals/OffsetsRequestManagerTest.java       |  24 ++--
 .../consumer/internals/RequestManagersTest.java    |  36 +++--
 .../events/ApplicationEventProcessorTest.java      |   4 +-
 13 files changed, 267 insertions(+), 86 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 8b70cc778fd..fee082e2481 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -325,6 +325,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     // Init value is needed to avoid NPE in case of exception raised in the 
constructor
     private Optional<ClientTelemetryReporter> clientTelemetryReporter = 
Optional.empty();
 
+    private final PositionsValidator positionsValidator;
     private AsyncPollEvent inflightPoll;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
     private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
@@ -429,6 +430,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
             // This FetchBuffer is shared between the application and network 
threads.
             this.fetchBuffer = new FetchBuffer(logContext);
+            this.positionsValidator = new PositionsValidator(logContext, time, 
subscriptions, metadata);
             final Supplier<NetworkClientDelegate> 
networkClientDelegateSupplier = NetworkClientDelegate.supplier(time,
                     logContext,
                     metadata,
@@ -458,7 +460,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     metrics,
                     offsetCommitCallbackInvoker,
                     memberStateListener,
-                    streamsRebalanceData
+                    streamsRebalanceData,
+                    positionsValidator
             );
             final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier = 
ApplicationEventProcessor.supplier(logContext,
                     metadata,
@@ -533,7 +536,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                        int requestTimeoutMs,
                        int defaultApiTimeoutMs,
                        String groupId,
-                       boolean autoCommitEnabled) {
+                       boolean autoCommitEnabled,
+                       PositionsValidator positionsValidator) {
         this.log = logContext.logger(getClass());
         this.subscriptions = subscriptions;
         this.clientId = clientId;
@@ -565,6 +569,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             time,
             asyncConsumerMetrics
         );
+        this.positionsValidator = positionsValidator;
     }
 
     AsyncKafkaConsumer(LogContext logContext,
@@ -624,6 +629,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             new RebalanceCallbackMetricsManager(metrics)
         );
         ApiVersions apiVersions = new ApiVersions();
+        this.positionsValidator = new PositionsValidator(logContext, time, 
subscriptions, metadata);
         Supplier<NetworkClientDelegate> networkClientDelegateSupplier = 
NetworkClientDelegate.supplier(
             time,
             config,
@@ -651,7 +657,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             metrics,
             offsetCommitCallbackInvoker,
             memberStateListener,
-            Optional.empty()
+            Optional.empty(),
+            positionsValidator
         );
         Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier 
= ApplicationEventProcessor.supplier(
                 logContext,
@@ -1930,6 +1937,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         // thread has not completed that stage for the inflight event, don't 
attempt to collect data from the fetch
         // buffer. If the inflight event was nulled out by 
checkInflightPoll(), that implies that it is safe to
         // attempt to collect data from the fetch buffer.
+        if (positionsValidator.canSkipUpdateFetchPositions()) {
+            return fetchCollector.collectFetch(fetchBuffer);
+        }
+
         if (inflightPoll != null && 
!inflightPoll.isValidatePositionsComplete()) {
             return Fetch.empty();
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index bb01510e906..f9cca5c1339 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -115,7 +115,7 @@ public class OffsetFetcher {
      */
     public void validatePositionsIfNeeded() {
         Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate =
-                offsetFetcherUtils.getPartitionsToValidate();
+                offsetFetcherUtils.refreshAndGetPartitionsToValidate();
 
         validatePositionsAsync(partitionsToValidate);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index 0b7813eaad6..a92237d0fc9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -47,10 +47,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -62,22 +60,15 @@ class OffsetFetcherUtils {
     private final Time time;
     private final long retryBackoffMs;
     private final ApiVersions apiVersions;
+    private final PositionsValidator positionsValidator;
     private final Logger log;
 
-    /**
-     * Exception that occurred while validating positions, that will be 
propagated on the next
-     * call to validate positions. This could be an error received in the
-     * OffsetsForLeaderEpoch response, or a LogTruncationException detected 
when using a
-     * successful response to validate the positions. It will be cleared when 
thrown.
-     */
-    private final AtomicReference<RuntimeException> 
cachedValidatePositionsException = new AtomicReference<>();
     /**
      * Exception that occurred while resetting positions, that will be 
propagated on the next
      * call to reset positions. This will have the error received in the 
response to the
      * ListOffsets request. It will be cleared when thrown on the next call to 
reset.
      */
     private final AtomicReference<RuntimeException> 
cachedResetPositionsException = new AtomicReference<>();
-    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
 
     OffsetFetcherUtils(LogContext logContext,
                        ConsumerMetadata metadata,
@@ -85,12 +76,25 @@ class OffsetFetcherUtils {
                        Time time,
                        long retryBackoffMs,
                        ApiVersions apiVersions) {
+        this(logContext, metadata, subscriptionState,
+            time, retryBackoffMs, apiVersions,
+            new PositionsValidator(logContext, time, subscriptionState, 
metadata));
+    }
+
+    OffsetFetcherUtils(LogContext logContext,
+                       ConsumerMetadata metadata,
+                       SubscriptionState subscriptionState,
+                       Time time,
+                       long retryBackoffMs,
+                       ApiVersions apiVersions,
+                       PositionsValidator positionsValidator) {
         this.log = logContext.logger(getClass());
         this.metadata = metadata;
         this.subscriptionState = subscriptionState;
         this.time = time;
         this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
+        this.positionsValidator = positionsValidator;
     }
 
     /**
@@ -168,27 +172,8 @@ class OffsetFetcherUtils {
                         Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
     }
 
-    Map<TopicPartition, SubscriptionState.FetchPosition> 
getPartitionsToValidate() {
-        RuntimeException exception = 
cachedValidatePositionsException.getAndSet(null);
-        if (exception != null)
-            throw exception;
-
-        // Validate each partition against the current leader and epoch
-        // If we see a new metadata version, check all partitions
-        validatePositionsOnMetadataChange();
-
-        // Collect positions needing validation, with backoff
-        return subscriptionState
-                .partitionsNeedingValidation(time.milliseconds())
-                .stream()
-                .filter(tp -> subscriptionState.position(tp) != null)
-                .collect(Collectors.toMap(Function.identity(), 
subscriptionState::position));
-    }
-
-    void maybeSetValidatePositionsException(RuntimeException e) {
-        if (!cachedValidatePositionsException.compareAndSet(null, e)) {
-            log.error("Discarding error validating positions because another 
error is pending", e);
-        }
+    Map<TopicPartition, SubscriptionState.FetchPosition> 
refreshAndGetPartitionsToValidate() {
+        return 
positionsValidator.refreshAndGetPartitionsToValidate(apiVersions);
     }
 
     /**
@@ -196,13 +181,7 @@ class OffsetFetcherUtils {
      * we should check that all the assignments have a valid position.
      */
     void validatePositionsOnMetadataChange() {
-        int newMetadataUpdateVersion = metadata.updateVersion();
-        if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != 
newMetadataUpdateVersion) {
-            subscriptionState.assignedPartitions().forEach(topicPartition -> {
-                ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(topicPartition);
-                
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, 
topicPartition, leaderAndEpoch);
-            });
-        }
+        positionsValidator.validatePositionsOnMetadataChange(apiVersions);
     }
 
     /**
@@ -357,7 +336,7 @@ class OffsetFetcherUtils {
         });
 
         if (!truncations.isEmpty()) {
-            
maybeSetValidatePositionsException(buildLogTruncationException(truncations));
+            
positionsValidator.maybeSetError(buildLogTruncationException(truncations));
         }
     }
 
@@ -367,7 +346,7 @@ class OffsetFetcherUtils {
         metadata.requestUpdate(false);
 
         if (!(error instanceof RetriableException)) {
-            maybeSetValidatePositionsException(error);
+            positionsValidator.maybeSetError(error);
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 4c8d10ad323..080dfa5cd96 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -119,6 +119,7 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
                                  final ApiVersions apiVersions,
                                  final NetworkClientDelegate 
networkClientDelegate,
                                  final CommitRequestManager 
commitRequestManager,
+                                 final PositionsValidator positionsValidator,
                                  final LogContext logContext) {
         requireNonNull(subscriptionState);
         requireNonNull(metadata);
@@ -140,7 +141,7 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
         this.apiVersions = apiVersions;
         this.networkClientDelegate = networkClientDelegate;
         this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, 
subscriptionState,
-                time, retryBackoffMs, apiVersions);
+                time, retryBackoffMs, apiVersions, positionsValidator);
         // Register the cluster metadata update callback. Note this only 
relies on the
         // requestsToRetry initialized above, and won't be invoked until all 
managers are
         // initialized and the network thread started.
@@ -231,8 +232,8 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
      * on {@link SubscriptionState#hasAllFetchPositions()}). It will complete 
immediately, with true, if all positions
      * are already available. If some positions are missing, the future will 
complete once the offsets are retrieved and positions are updated.
      */
-    public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
+    public CompletableFuture<Void> updateFetchPositions(long deadlineMs) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
 
         try {
             if (maybeCompleteWithPreviousException(result)) {
@@ -243,7 +244,7 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
 
             if (subscriptionState.hasAllFetchPositions()) {
                 // All positions are already available
-                result.complete(true);
+                result.complete(null);
                 return result;
             }
 
@@ -252,7 +253,7 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
                 if (error != null) {
                     result.completeExceptionally(error);
                 } else {
-                    result.complete(subscriptionState.hasAllFetchPositions());
+                    result.complete(null);
                 }
             });
 
@@ -262,7 +263,7 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
         return result;
     }
 
-    private boolean 
maybeCompleteWithPreviousException(CompletableFuture<Boolean> result) {
+    private boolean maybeCompleteWithPreviousException(CompletableFuture<Void> 
result) {
         Throwable cachedException = 
cachedUpdatePositionsException.getAndSet(null);
         if (cachedException != null) {
             result.completeExceptionally(cachedException);
@@ -501,7 +502,7 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
      * next call to this function.
      */
     void validatePositionsIfNeeded() {
-        Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate();
+        Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate = offsetFetcherUtils.refreshAndGetPartitionsToValidate();
         if (partitionsToValidate.isEmpty()) {
             return;
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PositionsValidator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PositionsValidator.java
new file mode 100644
index 00000000000..a714157400d
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PositionsValidator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import org.slf4j.Logger;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * As named, this class validates positions in the {@link SubscriptionState} 
based on current {@link ConsumerMetadata}
+ * version. It maintains just enough shared state to determine when it can 
avoid costly inter-thread communication
+ * in the {@link Consumer#poll(Duration)} method.
+ *
+ * <p/>
+ *
+ * Callers from the application thread should not mutate any of the state 
contained within this class.
+ * It should be considered as <em>read-only</em>, and only the background 
thread should mutate the state.
+ */
+public class PositionsValidator {
+
+    private final Logger log;
+    private final Time time;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+
+    /**
+     * Exception that occurred while validating positions, that will be 
propagated on the next
+     * call to validate positions. This could be an error received in the
+     * OffsetsForLeaderEpoch response, or a LogTruncationException detected 
when using a
+     * successful response to validate the positions. It will be cleared when 
thrown.
+     */
+    private final AtomicReference<RuntimeException> 
cachedValidatePositionsException = new AtomicReference<>();
+
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+    public PositionsValidator(LogContext logContext,
+                              Time time,
+                              SubscriptionState subscriptions,
+                              ConsumerMetadata metadata) {
+        this.log = requireNonNull(logContext).logger(getClass());
+        this.time = requireNonNull(time);
+        this.metadata = requireNonNull(metadata);
+        this.subscriptions = requireNonNull(subscriptions);
+    }
+
+    /**
+     * This method is called by the background thread in response to {@link 
AsyncPollEvent} and
+     * {@link CheckAndUpdatePositionsEvent}.
+     */
+    Map<TopicPartition, SubscriptionState.FetchPosition> 
refreshAndGetPartitionsToValidate(ApiVersions apiVersions) {
+        maybeThrowError();
+
+        // Validate each partition against the current leader and epoch
+        // If we see a new metadata version, check all partitions
+        validatePositionsOnMetadataChange(apiVersions);
+
+        // Collect positions needing validation, with backoff
+        return subscriptions.partitionsNeedingValidation(time.milliseconds());
+    }
+
+    /**
+     * If we have seen new metadata (as tracked by {@link 
org.apache.kafka.clients.Metadata#updateVersion()}), then
+     * we should check that all the assignments have a valid position.
+     */
+    void validatePositionsOnMetadataChange(ApiVersions apiVersions) {
+        int newMetadataUpdateVersion = metadata.updateVersion();
+        if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != 
newMetadataUpdateVersion) {
+            subscriptions.assignedPartitions().forEach(topicPartition -> {
+                ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(topicPartition);
+                
subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, 
topicPartition, leaderAndEpoch);
+            });
+        }
+    }
+
+    void maybeSetError(RuntimeException e) {
+        if (!cachedValidatePositionsException.compareAndSet(null, e)) {
+            log.error("Discarding error validating positions because another 
error is pending", e);
+        }
+    }
+
+    void maybeThrowError() {
+        RuntimeException exception = 
cachedValidatePositionsException.getAndSet(null);
+        if (exception != null)
+            throw exception;
+    }
+
+    /**
+     * This method is used by {@code AsyncKafkaConsumer} to determine if it 
can skip the step of validating
+     * positions as this is in the critical path for the {@link 
Consumer#poll(Duration)}. If the application thread
+     * can safely and accurately determine that it doesn't need to perform the
+     * {@link OffsetsRequestManager#updateFetchPositions(long)} call, a big 
performance savings can be realized.
+     *
+     * <p/>
+     *
+     * <ol>
+     *     <li>
+     *         Checks for previous errors from validation, and throws the 
error if present
+     *     </li>
+     *     <li>
+     *         Checks that the current {@link 
ConsumerMetadata#updateVersion()} matches its current cached
+     *         value to ensure that it is not stale
+     *     </li>
+     *     <li>
+     *         Checks that all positions are in the {@link 
SubscriptionState.FetchStates#FETCHING} state
+     *         ({@link SubscriptionState#hasAllFetchPositions()})
+     *     </li>
+     * </ol>
+     *
+     * If any checks fail, this method will return {@code false}, otherwise, 
it will return {@code true}, which
+     * signals to the application thread that the position validation step can 
be skipped.
+     *
+     * @return true if all checks pass, false if any checks fail
+     */
+    boolean canSkipUpdateFetchPositions() {
+        maybeThrowError();
+
+        if (metadataUpdateVersion.get() != metadata.updateVersion()) {
+            return false;
+        }
+
+        // If there are no partitions in the AWAIT_RESET, AWAIT_VALIDATION, or 
INITIALIZING states, it's ok to skip.
+        return subscriptions.hasAllFetchPositions();
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index e9585d2d806..fe61d5c53ba 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -168,7 +168,8 @@ public class RequestManagers implements Closeable {
                                                      final Metrics metrics,
                                                      final 
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
                                                      final MemberStateListener 
applicationThreadMemberStateListener,
-                                                     final 
Optional<StreamsRebalanceData> streamsRebalanceData
+                                                     final 
Optional<StreamsRebalanceData> streamsRebalanceData,
+                                                     final PositionsValidator 
positionsValidator
     ) {
         return new CachedSupplier<>() {
             @Override
@@ -293,6 +294,7 @@ public class RequestManagers implements Closeable {
                     apiVersions,
                     networkClientDelegate,
                     commitRequestManager,
+                    positionsValidator,
                     logContext);
 
                 return new RequestManagers(
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 4629dd0934a..38ed6c668d9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -879,8 +879,26 @@ public class SubscriptionState {
         return collectPartitions(state -> state.awaitingReset() && 
!state.awaitingRetryBackoff(nowMs));
     }
 
-    public synchronized Set<TopicPartition> partitionsNeedingValidation(long 
nowMs) {
-        return collectPartitions(state -> state.awaitingValidation() && 
!state.awaitingRetryBackoff(nowMs));
+    public synchronized Map<TopicPartition, FetchPosition> 
partitionsNeedingValidation(long nowMs) {
+        Map<TopicPartition, FetchPosition> result = new HashMap<>();
+
+        assignment.forEach((tp, tps) -> {
+            if (tps.awaitingValidation() && !tps.awaitingRetryBackoff(nowMs) 
&& tps.position != null) {
+                result.put(tp, tps.position);
+            }
+        });
+
+        return result;
+    }
+
+    public synchronized boolean hasPartitionsNeedingValidation(long nowMs) {
+        for (TopicPartitionState tps  : assignment.partitionStateValues()) {
+            if (tps.awaitingValidation() && !tps.awaitingRetryBackoff(nowMs) 
&& tps.position != null) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
     public synchronized boolean isAssigned(TopicPartition tp) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 568518d4c48..daa9fe6a3a9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -442,7 +442,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      * them to update positions in the subscription state.
      */
     private void process(final CheckAndUpdatePositionsEvent event) {
-        CompletableFuture<Boolean> future = 
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
+        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
         future.whenComplete(complete(event.future()));
     }
 
@@ -735,7 +735,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
             });
         }
 
-        CompletableFuture<Boolean> updatePositionsFuture = 
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
+        CompletableFuture<Void> updatePositionsFuture = 
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
         event.markValidatePositionsComplete();
 
         updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
index 4fd834eaf09..978e3cebf24 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.TopicPartition;
 
@@ -28,10 +29,11 @@ import java.time.Duration;
  * offsets and update positions when it gets them. This will first attempt to 
use the committed offsets if available. If
  * no committed offsets available, it will use the partition offsets retrieved 
from the leader.
  * <p/>
- * The event completes with a boolean indicating if all assigned partitions 
have valid fetch positions
- * (based on {@link SubscriptionState#hasAllFetchPositions()}).
+ * The event completes when {@link OffsetsRequestManager} has completed its 
attempt to update the positions. There
+ * is no guarantee that {@link SubscriptionState#hasAllFetchPositions()} will 
return {@code true} just because the
+ * event has completed.
  */
-public class CheckAndUpdatePositionsEvent extends 
CompletableApplicationEvent<Boolean> implements MetadataErrorNotifiableEvent {
+public class CheckAndUpdatePositionsEvent extends 
CompletableApplicationEvent<Void> implements MetadataErrorNotifiableEvent {
 
     public CheckAndUpdatePositionsEvent(long deadlineMs) {
         super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index e00eb268ae5..40cc0e8df83 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -272,7 +272,8 @@ public class AsyncKafkaConsumerTest {
             requestTimeoutMs,
             defaultApiTimeoutMs,
             "group-id",
-            false);
+            false,
+            new PositionsValidator(new LogContext(), time, subscriptions, 
metadata));
     }
 
     @Test
@@ -1317,6 +1318,7 @@ public class AsyncKafkaConsumerTest {
             any(),
             any(),
             applicationThreadMemberStateListener.capture(),
+            any(),
             any()
         ));
         return applicationThreadMemberStateListener.getValue();
@@ -1389,7 +1391,8 @@ public class AsyncKafkaConsumerTest {
             any(),
             any(),
             any(),
-            streamRebalanceData.capture()
+            streamRebalanceData.capture(),
+            any()
         ));
         return streamRebalanceData.getValue();
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index ed96b817900..2251c0f138a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -118,6 +118,7 @@ public class OffsetsRequestManagerTest {
                 apiVersions,
                 mock(NetworkClientDelegate.class),
                 commitRequestManager,
+                new PositionsValidator(logContext, time, subscriptionState, 
metadata),
                 logContext
         );
     }
@@ -602,7 +603,7 @@ public class OffsetsRequestManagerTest {
                 Optional.of(5));
         SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(5L,
                 Optional.of(10), leaderAndEpoch);
-        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Map.of(TEST_PARTITION_1,
 position));
         when(subscriptionState.position(any())).thenReturn(position, position);
         NodeApiVersions nodeApiVersions = NodeApiVersions.create();
         when(apiVersions.get(LEADER_1.idString())).thenReturn(nodeApiVersions);
@@ -645,7 +646,7 @@ public class OffsetsRequestManagerTest {
         SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(currentOffset,
                 Optional.of(10), leaderAndEpoch);
 
-        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Map.of(TEST_PARTITION_1,
 position));
         when(subscriptionState.position(any())).thenReturn(position, position);
 
         // No api version info initially available
@@ -657,7 +658,7 @@ public class OffsetsRequestManagerTest {
 
         // Api version updated, next validate positions should successfully 
build the request
         
when(apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create());
-        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Map.of(TEST_PARTITION_1,
 position));
         when(subscriptionState.position(any())).thenReturn(position, position);
         requestManager.validatePositionsIfNeeded();
         assertEquals(1, requestManager.requestsToSend(), "Invalid request 
count");
@@ -676,7 +677,7 @@ public class OffsetsRequestManagerTest {
         // Call to updateFetchPositions. Should send an OffsetFetch request 
and use the response to set positions
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult 
= new CompletableFuture<>();
         when(commitRequestManager.fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout)).thenReturn(fetchResult);
-        CompletableFuture<Boolean> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
+        CompletableFuture<Void> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
         assertFalse(updatePositions1.isDone(), "Update positions should wait 
for the OffsetFetch request");
         verify(commitRequestManager).fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout);
 
@@ -705,13 +706,13 @@ public class OffsetsRequestManagerTest {
         // call to updateFetchPositions. Should send an OffsetFetch request
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult 
= new CompletableFuture<>();
         when(commitRequestManager.fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout)).thenReturn(fetchResult);
-        CompletableFuture<Boolean> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
+        CompletableFuture<Void> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
         assertFalse(updatePositions1.isDone(), "Update positions should wait 
for the OffsetFetch request");
         verify(commitRequestManager).fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout);
         clearInvocations(commitRequestManager);
 
         // Call to updateFetchPositions again with the same set of 
initializing partitions should reuse request
-        CompletableFuture<Boolean> updatePositions2 = 
requestManager.updateFetchPositions(time.milliseconds());
+        CompletableFuture<Void> updatePositions2 = 
requestManager.updateFetchPositions(time.milliseconds());
         verify(commitRequestManager, never()).fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout);
 
         // Receive response with committed offsets, should complete both calls
@@ -738,7 +739,7 @@ public class OffsetsRequestManagerTest {
         // call to updateFetchPositions will trigger an OffsetFetch request 
for tp1 (won't complete just yet)
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult 
= new CompletableFuture<>();
         when(commitRequestManager.fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout)).thenReturn(fetchResult);
-        CompletableFuture<Boolean> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
+        CompletableFuture<Void> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
         assertFalse(updatePositions1.isDone());
         verify(commitRequestManager).fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout);
         clearInvocations(commitRequestManager);
@@ -766,7 +767,7 @@ public class OffsetsRequestManagerTest {
         // call to updateFetchPositions will trigger an OffsetFetch request 
for tp1 (won't complete just yet)
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult 
= new CompletableFuture<>();
         when(commitRequestManager.fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout)).thenReturn(fetchResult);
-        CompletableFuture<Boolean> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
+        CompletableFuture<Void> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
         assertFalse(updatePositions1.isDone());
         verify(commitRequestManager).fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout);
         clearInvocations(commitRequestManager);
@@ -807,6 +808,7 @@ public class OffsetsRequestManagerTest {
                 apiVersions,
                 mock(NetworkClientDelegate.class),
                 commitRequestManager,
+                new PositionsValidator(new LogContext(), time, 
subscriptionState, metadata),
                 new LogContext()
         );
 
@@ -826,7 +828,7 @@ public class OffsetsRequestManagerTest {
     private void mockAssignedPartitionsMissingPositions(Set<TopicPartition> 
assignedPartitions,
                                                         Set<TopicPartition> 
initializingPartitions,
                                                         
Metadata.LeaderAndEpoch leaderAndEpoch) {
-        
when(subscriptionState.partitionsNeedingValidation(anyLong())).thenReturn(Collections.emptySet());
+        
when(subscriptionState.partitionsNeedingValidation(anyLong())).thenReturn(Map.of());
         assignedPartitions.forEach(tp -> {
             when(subscriptionState.isAssigned(tp)).thenReturn(true);
             when(metadata.currentLeader(tp)).thenReturn(leaderAndEpoch);
@@ -837,8 +839,8 @@ public class OffsetsRequestManagerTest {
     }
 
     private void 
mockSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition
 position, Node leader) {
-        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
-        when(subscriptionState.position(any())).thenReturn(position, position);
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Map.of(TEST_PARTITION_1,
 position));
+        when(subscriptionState.positionOrNull(any())).thenReturn(position, 
position);
         NodeApiVersions nodeApiVersions = NodeApiVersions.create();
         when(apiVersions.get(leader.idString())).thenReturn(nodeApiVersions);
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
index 67628c51340..1b869160f08 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
@@ -50,23 +50,29 @@ public class RequestManagersTest {
             config,
             GroupRebalanceConfig.ProtocolType.CONSUMER
         );
+        LogContext logContext = new LogContext();
+        MockTime time = new MockTime();
+        ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+        SubscriptionState subscriptions = mock(SubscriptionState.class);
+        ApiVersions apiVersions = mock(ApiVersions.class);
         final RequestManagers requestManagers = RequestManagers.supplier(
-            new MockTime(),
-            new LogContext(),
+            time,
+            logContext,
             mock(BackgroundEventHandler.class),
-            mock(ConsumerMetadata.class),
-            mock(SubscriptionState.class),
+            metadata,
+            subscriptions,
             mock(FetchBuffer.class),
             config,
             groupRebalanceConfig,
-            mock(ApiVersions.class),
+            apiVersions,
             mock(FetchMetricsManager.class),
             () -> mock(NetworkClientDelegate.class),
             Optional.empty(),
             new Metrics(),
             mock(OffsetCommitCallbackInvoker.class),
             listener,
-            Optional.empty()
+            Optional.empty(),
+            new PositionsValidator(logContext, time, subscriptions, metadata)
         ).get();
         assertTrue(requestManagers.consumerMembershipManager.isPresent());
         assertTrue(requestManagers.streamsMembershipManager.isEmpty());
@@ -90,23 +96,29 @@ public class RequestManagersTest {
             config,
             GroupRebalanceConfig.ProtocolType.CONSUMER
         );
+        LogContext logContext = new LogContext();
+        MockTime time = new MockTime();
+        ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+        SubscriptionState subscriptions = mock(SubscriptionState.class);
+        ApiVersions apiVersions = mock(ApiVersions.class);
         final RequestManagers requestManagers = RequestManagers.supplier(
-            new MockTime(),
-            new LogContext(),
+            time,
+            logContext,
             mock(BackgroundEventHandler.class),
-            mock(ConsumerMetadata.class),
-            mock(SubscriptionState.class),
+            metadata,
+            subscriptions,
             mock(FetchBuffer.class),
             config,
             groupRebalanceConfig,
-            mock(ApiVersions.class),
+            apiVersions,
             mock(FetchMetricsManager.class),
             () -> mock(NetworkClientDelegate.class),
             Optional.empty(),
             new Metrics(),
             mock(OffsetCommitCallbackInvoker.class),
             listener,
-            Optional.of(new StreamsRebalanceData(UUID.randomUUID(), 
Optional.empty(), Map.of(), Map.of()))
+            Optional.of(new StreamsRebalanceData(UUID.randomUUID(), 
Optional.empty(), Map.of(), Map.of())),
+            new PositionsValidator(logContext, time, subscriptions, metadata)
         ).get();
         assertTrue(requestManagers.streamsMembershipManager.isPresent());
         
assertTrue(requestManagers.streamsGroupHeartbeatRequestManager.isPresent());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 61b53f9c19d..f07a9da5ab3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -272,7 +272,7 @@ public class ApplicationEventProcessorTest {
 
         setupProcessor(true);
         
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
-        
when(offsetsRequestManager.updateFetchPositions(event.deadlineMs())).thenReturn(CompletableFuture.completedFuture(true));
+        
when(offsetsRequestManager.updateFetchPositions(event.deadlineMs())).thenReturn(CompletableFuture.completedFuture(null));
         
when(fetchRequestManager.createFetchRequests()).thenReturn(CompletableFuture.completedFuture(null));
         processor.process(event);
         assertTrue(event.isComplete());
@@ -693,7 +693,7 @@ public class ApplicationEventProcessorTest {
         when(cluster.topics()).thenReturn(Set.of(topic));
 
         
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
-        
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(CompletableFuture.completedFuture(true));
+        
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
 
         setupProcessor(true);
         processor.process(new AsyncPollEvent(110, 100));


Reply via email to