This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 51bb0367945 KAFKA-19589: Check for ability to skip position validation
in application thread when collecting buffered data (#20324)
51bb0367945 is described below
commit 51bb03679456fb55ef0db38fd3ce8d6ac44f3b35
Author: Kirk True <[email protected]>
AuthorDate: Wed Nov 26 12:18:19 2025 -0800
KAFKA-19589: Check for ability to skip position validation in application
thread when collecting buffered data (#20324)
Introduces `PositionsValidator` which queries the assignment data from
the `SubscriptionState` from the application thread, allowing
`AsyncKafkaConsumer` to potentially avoid the need to wait.
The impetus for this change is the observation of these two points:
1. Waiting for the completion of
`OffsetsRequestManager.updateFetchPositions()` on the application thread
means either busy-waiting or blocking, either of which add significant
(~60%) CPU load to the `AsyncKafkaConsumer` compared to the
`ClassicKafkaConsumer`
2. In testing, data shows that 99.99+% of the time that
`OffsetsRequestManager.updateFetchPositions()` is called, the partitions
are up-to-date and there is no need to fetch offsets.
This patch allows the check for stable partitions to be made in the
application thread, resulting in far less waiting in the critical path
of `AsyncKafkaConsumer.poll()`.
Reviewers: Lianet Magrans <[email protected]>, Andrew Schofield
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 17 ++-
.../clients/consumer/internals/OffsetFetcher.java | 2 +-
.../consumer/internals/OffsetFetcherUtils.java | 59 +++-----
.../consumer/internals/OffsetsRequestManager.java | 15 +-
.../consumer/internals/PositionsValidator.java | 151 +++++++++++++++++++++
.../consumer/internals/RequestManagers.java | 4 +-
.../consumer/internals/SubscriptionState.java | 22 ++-
.../events/ApplicationEventProcessor.java | 4 +-
.../events/CheckAndUpdatePositionsEvent.java | 8 +-
.../consumer/internals/AsyncKafkaConsumerTest.java | 7 +-
.../internals/OffsetsRequestManagerTest.java | 24 ++--
.../consumer/internals/RequestManagersTest.java | 36 +++--
.../events/ApplicationEventProcessorTest.java | 4 +-
13 files changed, 267 insertions(+), 86 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 8b70cc778fd..fee082e2481 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -325,6 +325,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// Init value is needed to avoid NPE in case of exception raised in the
constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter =
Optional.empty();
+ private final PositionsValidator positionsValidator;
private AsyncPollEvent inflightPoll;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
@@ -429,6 +430,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// This FetchBuffer is shared between the application and network
threads.
this.fetchBuffer = new FetchBuffer(logContext);
+ this.positionsValidator = new PositionsValidator(logContext, time,
subscriptions, metadata);
final Supplier<NetworkClientDelegate>
networkClientDelegateSupplier = NetworkClientDelegate.supplier(time,
logContext,
metadata,
@@ -458,7 +460,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
metrics,
offsetCommitCallbackInvoker,
memberStateListener,
- streamsRebalanceData
+ streamsRebalanceData,
+ positionsValidator
);
final Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier =
ApplicationEventProcessor.supplier(logContext,
metadata,
@@ -533,7 +536,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
int requestTimeoutMs,
int defaultApiTimeoutMs,
String groupId,
- boolean autoCommitEnabled) {
+ boolean autoCommitEnabled,
+ PositionsValidator positionsValidator) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = clientId;
@@ -565,6 +569,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
time,
asyncConsumerMetrics
);
+ this.positionsValidator = positionsValidator;
}
AsyncKafkaConsumer(LogContext logContext,
@@ -624,6 +629,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
new RebalanceCallbackMetricsManager(metrics)
);
ApiVersions apiVersions = new ApiVersions();
+ this.positionsValidator = new PositionsValidator(logContext, time,
subscriptions, metadata);
Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
NetworkClientDelegate.supplier(
time,
config,
@@ -651,7 +657,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
metrics,
offsetCommitCallbackInvoker,
memberStateListener,
- Optional.empty()
+ Optional.empty(),
+ positionsValidator
);
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier
= ApplicationEventProcessor.supplier(
logContext,
@@ -1930,6 +1937,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// thread has not completed that stage for the inflight event, don't
attempt to collect data from the fetch
// buffer. If the inflight event was nulled out by
checkInflightPoll(), that implies that it is safe to
// attempt to collect data from the fetch buffer.
+ if (positionsValidator.canSkipUpdateFetchPositions()) {
+ return fetchCollector.collectFetch(fetchBuffer);
+ }
+
if (inflightPoll != null &&
!inflightPoll.isValidatePositionsComplete()) {
return Fetch.empty();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index bb01510e906..f9cca5c1339 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -115,7 +115,7 @@ public class OffsetFetcher {
*/
public void validatePositionsIfNeeded() {
Map<TopicPartition, SubscriptionState.FetchPosition>
partitionsToValidate =
- offsetFetcherUtils.getPartitionsToValidate();
+ offsetFetcherUtils.refreshAndGetPartitionsToValidate();
validatePositionsAsync(partitionsToValidate);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index 0b7813eaad6..a92237d0fc9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -47,10 +47,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
-import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -62,22 +60,15 @@ class OffsetFetcherUtils {
private final Time time;
private final long retryBackoffMs;
private final ApiVersions apiVersions;
+ private final PositionsValidator positionsValidator;
private final Logger log;
- /**
- * Exception that occurred while validating positions, that will be
propagated on the next
- * call to validate positions. This could be an error received in the
- * OffsetsForLeaderEpoch response, or a LogTruncationException detected
when using a
- * successful response to validate the positions. It will be cleared when
thrown.
- */
- private final AtomicReference<RuntimeException>
cachedValidatePositionsException = new AtomicReference<>();
/**
* Exception that occurred while resetting positions, that will be
propagated on the next
* call to reset positions. This will have the error received in the
response to the
* ListOffsets request. It will be cleared when thrown on the next call to
reset.
*/
private final AtomicReference<RuntimeException>
cachedResetPositionsException = new AtomicReference<>();
- private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
OffsetFetcherUtils(LogContext logContext,
ConsumerMetadata metadata,
@@ -85,12 +76,25 @@ class OffsetFetcherUtils {
Time time,
long retryBackoffMs,
ApiVersions apiVersions) {
+ this(logContext, metadata, subscriptionState,
+ time, retryBackoffMs, apiVersions,
+ new PositionsValidator(logContext, time, subscriptionState,
metadata));
+ }
+
+ OffsetFetcherUtils(LogContext logContext,
+ ConsumerMetadata metadata,
+ SubscriptionState subscriptionState,
+ Time time,
+ long retryBackoffMs,
+ ApiVersions apiVersions,
+ PositionsValidator positionsValidator) {
this.log = logContext.logger(getClass());
this.metadata = metadata;
this.subscriptionState = subscriptionState;
this.time = time;
this.retryBackoffMs = retryBackoffMs;
this.apiVersions = apiVersions;
+ this.positionsValidator = positionsValidator;
}
/**
@@ -168,27 +172,8 @@ class OffsetFetcherUtils {
Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
}
- Map<TopicPartition, SubscriptionState.FetchPosition>
getPartitionsToValidate() {
- RuntimeException exception =
cachedValidatePositionsException.getAndSet(null);
- if (exception != null)
- throw exception;
-
- // Validate each partition against the current leader and epoch
- // If we see a new metadata version, check all partitions
- validatePositionsOnMetadataChange();
-
- // Collect positions needing validation, with backoff
- return subscriptionState
- .partitionsNeedingValidation(time.milliseconds())
- .stream()
- .filter(tp -> subscriptionState.position(tp) != null)
- .collect(Collectors.toMap(Function.identity(),
subscriptionState::position));
- }
-
- void maybeSetValidatePositionsException(RuntimeException e) {
- if (!cachedValidatePositionsException.compareAndSet(null, e)) {
- log.error("Discarding error validating positions because another
error is pending", e);
- }
+ Map<TopicPartition, SubscriptionState.FetchPosition>
refreshAndGetPartitionsToValidate() {
+ return
positionsValidator.refreshAndGetPartitionsToValidate(apiVersions);
}
/**
@@ -196,13 +181,7 @@ class OffsetFetcherUtils {
* we should check that all the assignments have a valid position.
*/
void validatePositionsOnMetadataChange() {
- int newMetadataUpdateVersion = metadata.updateVersion();
- if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) !=
newMetadataUpdateVersion) {
- subscriptionState.assignedPartitions().forEach(topicPartition -> {
- ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(topicPartition);
-
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions,
topicPartition, leaderAndEpoch);
- });
- }
+ positionsValidator.validatePositionsOnMetadataChange(apiVersions);
}
/**
@@ -357,7 +336,7 @@ class OffsetFetcherUtils {
});
if (!truncations.isEmpty()) {
-
maybeSetValidatePositionsException(buildLogTruncationException(truncations));
+
positionsValidator.maybeSetError(buildLogTruncationException(truncations));
}
}
@@ -367,7 +346,7 @@ class OffsetFetcherUtils {
metadata.requestUpdate(false);
if (!(error instanceof RetriableException)) {
- maybeSetValidatePositionsException(error);
+ positionsValidator.maybeSetError(error);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 4c8d10ad323..080dfa5cd96 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -119,6 +119,7 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
final ApiVersions apiVersions,
final NetworkClientDelegate
networkClientDelegate,
final CommitRequestManager
commitRequestManager,
+ final PositionsValidator positionsValidator,
final LogContext logContext) {
requireNonNull(subscriptionState);
requireNonNull(metadata);
@@ -140,7 +141,7 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
this.apiVersions = apiVersions;
this.networkClientDelegate = networkClientDelegate;
this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata,
subscriptionState,
- time, retryBackoffMs, apiVersions);
+ time, retryBackoffMs, apiVersions, positionsValidator);
// Register the cluster metadata update callback. Note this only
relies on the
// requestsToRetry initialized above, and won't be invoked until all
managers are
// initialized and the network thread started.
@@ -231,8 +232,8 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
* on {@link SubscriptionState#hasAllFetchPositions()}). It will complete
immediately, with true, if all positions
* are already available. If some positions are missing, the future will
complete once the offsets are retrieved and positions are updated.
*/
- public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
- CompletableFuture<Boolean> result = new CompletableFuture<>();
+ public CompletableFuture<Void> updateFetchPositions(long deadlineMs) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
try {
if (maybeCompleteWithPreviousException(result)) {
@@ -243,7 +244,7 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
if (subscriptionState.hasAllFetchPositions()) {
// All positions are already available
- result.complete(true);
+ result.complete(null);
return result;
}
@@ -252,7 +253,7 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
if (error != null) {
result.completeExceptionally(error);
} else {
- result.complete(subscriptionState.hasAllFetchPositions());
+ result.complete(null);
}
});
@@ -262,7 +263,7 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
return result;
}
- private boolean
maybeCompleteWithPreviousException(CompletableFuture<Boolean> result) {
+ private boolean maybeCompleteWithPreviousException(CompletableFuture<Void>
result) {
Throwable cachedException =
cachedUpdatePositionsException.getAndSet(null);
if (cachedException != null) {
result.completeExceptionally(cachedException);
@@ -501,7 +502,7 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
* next call to this function.
*/
void validatePositionsIfNeeded() {
- Map<TopicPartition, SubscriptionState.FetchPosition>
partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate();
+ Map<TopicPartition, SubscriptionState.FetchPosition>
partitionsToValidate = offsetFetcherUtils.refreshAndGetPartitionsToValidate();
if (partitionsToValidate.isEmpty()) {
return;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PositionsValidator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PositionsValidator.java
new file mode 100644
index 00000000000..a714157400d
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PositionsValidator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
+import
org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import org.slf4j.Logger;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * As named, this class validates positions in the {@link SubscriptionState}
based on current {@link ConsumerMetadata}
+ * version. It maintains just enough shared state to determine when it can
avoid costly inter-thread communication
+ * in the {@link Consumer#poll(Duration)} method.
+ *
+ * <p/>
+ *
+ * Callers from the application thread should not mutate any of the state
contained within this class.
+ * It should be considered as <em>read-only</em>, and only the background
thread should mutate the state.
+ */
+public class PositionsValidator {
+
+ private final Logger log;
+ private final Time time;
+ private final ConsumerMetadata metadata;
+ private final SubscriptionState subscriptions;
+
+ /**
+ * Exception that occurred while validating positions, that will be
propagated on the next
+ * call to validate positions. This could be an error received in the
+ * OffsetsForLeaderEpoch response, or a LogTruncationException detected
when using a
+ * successful response to validate the positions. It will be cleared when
thrown.
+ */
+ private final AtomicReference<RuntimeException>
cachedValidatePositionsException = new AtomicReference<>();
+
+ private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+ public PositionsValidator(LogContext logContext,
+ Time time,
+ SubscriptionState subscriptions,
+ ConsumerMetadata metadata) {
+ this.log = requireNonNull(logContext).logger(getClass());
+ this.time = requireNonNull(time);
+ this.metadata = requireNonNull(metadata);
+ this.subscriptions = requireNonNull(subscriptions);
+ }
+
+ /**
+ * This method is called by the background thread in response to {@link
AsyncPollEvent} and
+ * {@link CheckAndUpdatePositionsEvent}.
+ */
+ Map<TopicPartition, SubscriptionState.FetchPosition>
refreshAndGetPartitionsToValidate(ApiVersions apiVersions) {
+ maybeThrowError();
+
+ // Validate each partition against the current leader and epoch
+ // If we see a new metadata version, check all partitions
+ validatePositionsOnMetadataChange(apiVersions);
+
+ // Collect positions needing validation, with backoff
+ return subscriptions.partitionsNeedingValidation(time.milliseconds());
+ }
+
+ /**
+ * If we have seen new metadata (as tracked by {@link
org.apache.kafka.clients.Metadata#updateVersion()}), then
+ * we should check that all the assignments have a valid position.
+ */
+ void validatePositionsOnMetadataChange(ApiVersions apiVersions) {
+ int newMetadataUpdateVersion = metadata.updateVersion();
+ if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) !=
newMetadataUpdateVersion) {
+ subscriptions.assignedPartitions().forEach(topicPartition -> {
+ ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(topicPartition);
+
subscriptions.maybeValidatePositionForCurrentLeader(apiVersions,
topicPartition, leaderAndEpoch);
+ });
+ }
+ }
+
+ void maybeSetError(RuntimeException e) {
+ if (!cachedValidatePositionsException.compareAndSet(null, e)) {
+ log.error("Discarding error validating positions because another
error is pending", e);
+ }
+ }
+
+ void maybeThrowError() {
+ RuntimeException exception =
cachedValidatePositionsException.getAndSet(null);
+ if (exception != null)
+ throw exception;
+ }
+
+ /**
+ * This method is used by {@code AsyncKafkaConsumer} to determine if it
can skip the step of validating
+ * positions as this is in the critical path for the {@link
Consumer#poll(Duration)}. If the application thread
+ * can safely and accurately determine that it doesn't need to perform the
+ * {@link OffsetsRequestManager#updateFetchPositions(long)} call, a big
performance savings can be realized.
+ *
+ * <p/>
+ *
+ * <ol>
+ * <li>
+ * Checks for previous errors from validation, and throws the
error if present
+ * </li>
+ * <li>
+ * Checks that the current {@link
ConsumerMetadata#updateVersion()} matches its current cached
+ * value to ensure that it is not stale
+ * </li>
+ * <li>
+ * Checks that all positions are in the {@link
SubscriptionState.FetchStates#FETCHING} state
+ * ({@link SubscriptionState#hasAllFetchPositions()})
+ * </li>
+ * </ol>
+ *
+ * If any checks fail, this method will return {@code false}, otherwise,
it will return {@code true}, which
+ * signals to the application thread that the position validation step can
be skipped.
+ *
+ * @return true if all checks pass, false if any checks fail
+ */
+ boolean canSkipUpdateFetchPositions() {
+ maybeThrowError();
+
+ if (metadataUpdateVersion.get() != metadata.updateVersion()) {
+ return false;
+ }
+
+ // If there are no partitions in the AWAIT_RESET, AWAIT_VALIDATION, or
INITIALIZING states, it's ok to skip.
+ return subscriptions.hasAllFetchPositions();
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index e9585d2d806..fe61d5c53ba 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -168,7 +168,8 @@ public class RequestManagers implements Closeable {
final Metrics metrics,
final
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
final MemberStateListener
applicationThreadMemberStateListener,
- final
Optional<StreamsRebalanceData> streamsRebalanceData
+ final
Optional<StreamsRebalanceData> streamsRebalanceData,
+ final PositionsValidator
positionsValidator
) {
return new CachedSupplier<>() {
@Override
@@ -293,6 +294,7 @@ public class RequestManagers implements Closeable {
apiVersions,
networkClientDelegate,
commitRequestManager,
+ positionsValidator,
logContext);
return new RequestManagers(
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 4629dd0934a..38ed6c668d9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -879,8 +879,26 @@ public class SubscriptionState {
return collectPartitions(state -> state.awaitingReset() &&
!state.awaitingRetryBackoff(nowMs));
}
- public synchronized Set<TopicPartition> partitionsNeedingValidation(long
nowMs) {
- return collectPartitions(state -> state.awaitingValidation() &&
!state.awaitingRetryBackoff(nowMs));
+ public synchronized Map<TopicPartition, FetchPosition>
partitionsNeedingValidation(long nowMs) {
+ Map<TopicPartition, FetchPosition> result = new HashMap<>();
+
+ assignment.forEach((tp, tps) -> {
+ if (tps.awaitingValidation() && !tps.awaitingRetryBackoff(nowMs)
&& tps.position != null) {
+ result.put(tp, tps.position);
+ }
+ });
+
+ return result;
+ }
+
+ public synchronized boolean hasPartitionsNeedingValidation(long nowMs) {
+ for (TopicPartitionState tps : assignment.partitionStateValues()) {
+ if (tps.awaitingValidation() && !tps.awaitingRetryBackoff(nowMs)
&& tps.position != null) {
+ return true;
+ }
+ }
+
+ return false;
}
public synchronized boolean isAssigned(TopicPartition tp) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 568518d4c48..daa9fe6a3a9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -442,7 +442,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* them to update positions in the subscription state.
*/
private void process(final CheckAndUpdatePositionsEvent event) {
- CompletableFuture<Boolean> future =
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
+ CompletableFuture<Void> future =
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
future.whenComplete(complete(event.future()));
}
@@ -735,7 +735,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
});
}
- CompletableFuture<Boolean> updatePositionsFuture =
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
+ CompletableFuture<Void> updatePositionsFuture =
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
event.markValidatePositionsComplete();
updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
index 4fd834eaf09..978e3cebf24 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java
@@ -18,6 +18,7 @@
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;
@@ -28,10 +29,11 @@ import java.time.Duration;
* offsets and update positions when it gets them. This will first attempt to
use the committed offsets if available. If
* no committed offsets available, it will use the partition offsets retrieved
from the leader.
* <p/>
- * The event completes with a boolean indicating if all assigned partitions
have valid fetch positions
- * (based on {@link SubscriptionState#hasAllFetchPositions()}).
+ * The event completes when {@link OffsetsRequestManager} has completed its
attempt to update the positions. There
+ * is no guarantee that {@link SubscriptionState#hasAllFetchPositions()} will
return {@code true} just because the
+ * event has completed.
*/
-public class CheckAndUpdatePositionsEvent extends
CompletableApplicationEvent<Boolean> implements MetadataErrorNotifiableEvent {
+public class CheckAndUpdatePositionsEvent extends
CompletableApplicationEvent<Void> implements MetadataErrorNotifiableEvent {
public CheckAndUpdatePositionsEvent(long deadlineMs) {
super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index e00eb268ae5..40cc0e8df83 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -272,7 +272,8 @@ public class AsyncKafkaConsumerTest {
requestTimeoutMs,
defaultApiTimeoutMs,
"group-id",
- false);
+ false,
+ new PositionsValidator(new LogContext(), time, subscriptions,
metadata));
}
@Test
@@ -1317,6 +1318,7 @@ public class AsyncKafkaConsumerTest {
any(),
any(),
applicationThreadMemberStateListener.capture(),
+ any(),
any()
));
return applicationThreadMemberStateListener.getValue();
@@ -1389,7 +1391,8 @@ public class AsyncKafkaConsumerTest {
any(),
any(),
any(),
- streamRebalanceData.capture()
+ streamRebalanceData.capture(),
+ any()
));
return streamRebalanceData.getValue();
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index ed96b817900..2251c0f138a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -118,6 +118,7 @@ public class OffsetsRequestManagerTest {
apiVersions,
mock(NetworkClientDelegate.class),
commitRequestManager,
+ new PositionsValidator(logContext, time, subscriptionState,
metadata),
logContext
);
}
@@ -602,7 +603,7 @@ public class OffsetsRequestManagerTest {
Optional.of(5));
SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(5L,
Optional.of(10), leaderAndEpoch);
-
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Map.of(TEST_PARTITION_1,
position));
when(subscriptionState.position(any())).thenReturn(position, position);
NodeApiVersions nodeApiVersions = NodeApiVersions.create();
when(apiVersions.get(LEADER_1.idString())).thenReturn(nodeApiVersions);
@@ -645,7 +646,7 @@ public class OffsetsRequestManagerTest {
SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(currentOffset,
Optional.of(10), leaderAndEpoch);
-
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Map.of(TEST_PARTITION_1,
position));
when(subscriptionState.position(any())).thenReturn(position, position);
// No api version info initially available
@@ -657,7 +658,7 @@ public class OffsetsRequestManagerTest {
// Api version updated, next validate positions should successfully
build the request
when(apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create());
-
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Map.of(TEST_PARTITION_1,
position));
when(subscriptionState.position(any())).thenReturn(position, position);
requestManager.validatePositionsIfNeeded();
assertEquals(1, requestManager.requestsToSend(), "Invalid request
count");
@@ -676,7 +677,7 @@ public class OffsetsRequestManagerTest {
// Call to updateFetchPositions. Should send an OffsetFetch request
and use the response to set positions
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult
= new CompletableFuture<>();
when(commitRequestManager.fetchOffsets(initPartitions1,
internalFetchCommittedTimeout)).thenReturn(fetchResult);
- CompletableFuture<Boolean> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
+ CompletableFuture<Void> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
assertFalse(updatePositions1.isDone(), "Update positions should wait
for the OffsetFetch request");
verify(commitRequestManager).fetchOffsets(initPartitions1,
internalFetchCommittedTimeout);
@@ -705,13 +706,13 @@ public class OffsetsRequestManagerTest {
// call to updateFetchPositions. Should send an OffsetFetch request
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult
= new CompletableFuture<>();
when(commitRequestManager.fetchOffsets(initPartitions1,
internalFetchCommittedTimeout)).thenReturn(fetchResult);
- CompletableFuture<Boolean> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
+ CompletableFuture<Void> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
assertFalse(updatePositions1.isDone(), "Update positions should wait
for the OffsetFetch request");
verify(commitRequestManager).fetchOffsets(initPartitions1,
internalFetchCommittedTimeout);
clearInvocations(commitRequestManager);
// Call to updateFetchPositions again with the same set of
initializing partitions should reuse request
- CompletableFuture<Boolean> updatePositions2 =
requestManager.updateFetchPositions(time.milliseconds());
+ CompletableFuture<Void> updatePositions2 =
requestManager.updateFetchPositions(time.milliseconds());
verify(commitRequestManager, never()).fetchOffsets(initPartitions1,
internalFetchCommittedTimeout);
// Receive response with committed offsets, should complete both calls
@@ -738,7 +739,7 @@ public class OffsetsRequestManagerTest {
// call to updateFetchPositions will trigger an OffsetFetch request
for tp1 (won't complete just yet)
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult
= new CompletableFuture<>();
when(commitRequestManager.fetchOffsets(initPartitions1,
internalFetchCommittedTimeout)).thenReturn(fetchResult);
- CompletableFuture<Boolean> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
+ CompletableFuture<Void> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
assertFalse(updatePositions1.isDone());
verify(commitRequestManager).fetchOffsets(initPartitions1,
internalFetchCommittedTimeout);
clearInvocations(commitRequestManager);
@@ -766,7 +767,7 @@ public class OffsetsRequestManagerTest {
// call to updateFetchPositions will trigger an OffsetFetch request
for tp1 (won't complete just yet)
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult
= new CompletableFuture<>();
when(commitRequestManager.fetchOffsets(initPartitions1,
internalFetchCommittedTimeout)).thenReturn(fetchResult);
- CompletableFuture<Boolean> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
+ CompletableFuture<Void> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
assertFalse(updatePositions1.isDone());
verify(commitRequestManager).fetchOffsets(initPartitions1,
internalFetchCommittedTimeout);
clearInvocations(commitRequestManager);
@@ -807,6 +808,7 @@ public class OffsetsRequestManagerTest {
apiVersions,
mock(NetworkClientDelegate.class),
commitRequestManager,
+ new PositionsValidator(new LogContext(), time,
subscriptionState, metadata),
new LogContext()
);
@@ -826,7 +828,7 @@ public class OffsetsRequestManagerTest {
private void mockAssignedPartitionsMissingPositions(Set<TopicPartition>
assignedPartitions,
Set<TopicPartition>
initializingPartitions,
Metadata.LeaderAndEpoch leaderAndEpoch) {
-
when(subscriptionState.partitionsNeedingValidation(anyLong())).thenReturn(Collections.emptySet());
+
when(subscriptionState.partitionsNeedingValidation(anyLong())).thenReturn(Map.of());
assignedPartitions.forEach(tp -> {
when(subscriptionState.isAssigned(tp)).thenReturn(true);
when(metadata.currentLeader(tp)).thenReturn(leaderAndEpoch);
@@ -837,8 +839,8 @@ public class OffsetsRequestManagerTest {
}
private void
mockSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition
position, Node leader) {
-
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
- when(subscriptionState.position(any())).thenReturn(position, position);
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Map.of(TEST_PARTITION_1,
position));
+ when(subscriptionState.positionOrNull(any())).thenReturn(position,
position);
NodeApiVersions nodeApiVersions = NodeApiVersions.create();
when(apiVersions.get(leader.idString())).thenReturn(nodeApiVersions);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
index 67628c51340..1b869160f08 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
@@ -50,23 +50,29 @@ public class RequestManagersTest {
config,
GroupRebalanceConfig.ProtocolType.CONSUMER
);
+ LogContext logContext = new LogContext();
+ MockTime time = new MockTime();
+ ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+ SubscriptionState subscriptions = mock(SubscriptionState.class);
+ ApiVersions apiVersions = mock(ApiVersions.class);
final RequestManagers requestManagers = RequestManagers.supplier(
- new MockTime(),
- new LogContext(),
+ time,
+ logContext,
mock(BackgroundEventHandler.class),
- mock(ConsumerMetadata.class),
- mock(SubscriptionState.class),
+ metadata,
+ subscriptions,
mock(FetchBuffer.class),
config,
groupRebalanceConfig,
- mock(ApiVersions.class),
+ apiVersions,
mock(FetchMetricsManager.class),
() -> mock(NetworkClientDelegate.class),
Optional.empty(),
new Metrics(),
mock(OffsetCommitCallbackInvoker.class),
listener,
- Optional.empty()
+ Optional.empty(),
+ new PositionsValidator(logContext, time, subscriptions, metadata)
).get();
assertTrue(requestManagers.consumerMembershipManager.isPresent());
assertTrue(requestManagers.streamsMembershipManager.isEmpty());
@@ -90,23 +96,29 @@ public class RequestManagersTest {
config,
GroupRebalanceConfig.ProtocolType.CONSUMER
);
+ LogContext logContext = new LogContext();
+ MockTime time = new MockTime();
+ ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+ SubscriptionState subscriptions = mock(SubscriptionState.class);
+ ApiVersions apiVersions = mock(ApiVersions.class);
final RequestManagers requestManagers = RequestManagers.supplier(
- new MockTime(),
- new LogContext(),
+ time,
+ logContext,
mock(BackgroundEventHandler.class),
- mock(ConsumerMetadata.class),
- mock(SubscriptionState.class),
+ metadata,
+ subscriptions,
mock(FetchBuffer.class),
config,
groupRebalanceConfig,
- mock(ApiVersions.class),
+ apiVersions,
mock(FetchMetricsManager.class),
() -> mock(NetworkClientDelegate.class),
Optional.empty(),
new Metrics(),
mock(OffsetCommitCallbackInvoker.class),
listener,
- Optional.of(new StreamsRebalanceData(UUID.randomUUID(),
Optional.empty(), Map.of(), Map.of()))
+ Optional.of(new StreamsRebalanceData(UUID.randomUUID(),
Optional.empty(), Map.of(), Map.of())),
+ new PositionsValidator(logContext, time, subscriptions, metadata)
).get();
assertTrue(requestManagers.streamsMembershipManager.isPresent());
assertTrue(requestManagers.streamsGroupHeartbeatRequestManager.isPresent());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 61b53f9c19d..f07a9da5ab3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -272,7 +272,7 @@ public class ApplicationEventProcessorTest {
setupProcessor(true);
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
-
when(offsetsRequestManager.updateFetchPositions(event.deadlineMs())).thenReturn(CompletableFuture.completedFuture(true));
+
when(offsetsRequestManager.updateFetchPositions(event.deadlineMs())).thenReturn(CompletableFuture.completedFuture(null));
when(fetchRequestManager.createFetchRequests()).thenReturn(CompletableFuture.completedFuture(null));
processor.process(event);
assertTrue(event.isComplete());
@@ -693,7 +693,7 @@ public class ApplicationEventProcessorTest {
when(cluster.topics()).thenReturn(Set.of(topic));
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
-
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(CompletableFuture.completedFuture(true));
+
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
setupProcessor(true);
processor.process(new AsyncPollEvent(110, 100));