This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a7e865c0a75 KAFKA-15115 - KAFKA-15163; Reset/Validate positions
implementation & API integration (#14346)
a7e865c0a75 is described below
commit a7e865c0a756504cc7ae6f4eb0772cadd3333c53
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Sep 13 10:49:41 2023 -0400
KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API
integration (#14346)
Implementation of the required functionality for resetting and validating
positions in the new async consumer.
This PR includes:
1. New async application events ResetPositionsApplicationEvent and
ValidatePositionsApplicationEvent, both handled by the same
OfffsetsRequestManager.
2. Integration of the reset/validate functionality in the new async
consumer, to update fetch positions using the partitions offsets.
3. Minor refactoring to extract functionality that is reused from both
consumer implementations (moving logic without changes from OffsetFetcher into
OffsetFetchUtils, and from OffsetsForLeaderEpochClient into
OffsetsForLeaderEpochUtils)
Reviewers: Philip Nee <[email protected]>, Kirk True
<[email protected]>, Jun Rao<[email protected]>
---
.../clients/consumer/LogTruncationException.java | 5 +
.../internals/DefaultBackgroundThread.java | 10 +-
.../clients/consumer/internals/OffsetFetcher.java | 60 +---
.../consumer/internals/OffsetFetcherUtils.java | 88 +++++-
.../internals/OffsetsForLeaderEpochClient.java | 104 +------
...Client.java => OffsetsForLeaderEpochUtils.java} | 79 +++---
.../consumer/internals/OffsetsRequestManager.java | 224 ++++++++++++++-
.../consumer/internals/PrototypeAsyncConsumer.java | 34 +++
.../internals/events/ApplicationEvent.java | 2 +-
.../events/ApplicationEventProcessor.java | 14 +
.../events/CompletableApplicationEvent.java | 2 +-
.../events/ListOffsetsApplicationEvent.java | 2 +-
.../events/OffsetFetchApplicationEvent.java | 2 +-
...nt.java => ResetPositionsApplicationEvent.java} | 22 +-
...java => ValidatePositionsApplicationEvent.java} | 22 +-
.../internals/DefaultBackgroundThreadTest.java | 87 ++++++
.../internals/OffsetForLeaderEpochClientTest.java | 18 +-
.../internals/OffsetsRequestManagerTest.java | 315 +++++++++++++++++++--
18 files changed, 808 insertions(+), 282 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
index 336eed4a3b4..f13f61e156d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
@@ -33,6 +33,11 @@ public class LogTruncationException extends
OffsetOutOfRangeException {
private final Map<TopicPartition, OffsetAndMetadata> divergentOffsets;
+ public LogTruncationException(Map<TopicPartition, Long> fetchOffsets,
+ Map<TopicPartition, OffsetAndMetadata>
divergentOffsets) {
+ this("Truncated partitions detected with divergent offsets " +
divergentOffsets, fetchOffsets, divergentOffsets);
+ }
+
public LogTruncationException(String message,
Map<TopicPartition, Long> fetchOffsets,
Map<TopicPartition, OffsetAndMetadata>
divergentOffsets) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index de3ff49d328..1e2c8a7935f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -48,9 +48,15 @@ import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configur
* Background thread runnable that consumes {@code ApplicationEvent} and
* produces {@code BackgroundEvent}. It uses an event loop to consume and
* produce events, and poll the network client to handle network IO.
- * <p>
+ * <p/>
* It holds a reference to the {@link SubscriptionState}, which is
* initialized by the polling thread.
+ * <p/>
+ * For processing application events that have been submitted to the
+ * {@link #applicationEventQueue}, this relies on an {@link
ApplicationEventProcessor}. Processing includes generating requests and
+ * handling responses with the appropriate {@link RequestManager}. The network
operations for
+ * actually sending the requests is delegated to the {@link
NetworkClientDelegate}
+ * </li>
*/
public class DefaultBackgroundThread extends KafkaThread {
private static final long MAX_POLL_TIMEOUT_MS = 5000;
@@ -148,6 +154,7 @@ public class DefaultBackgroundThread extends KafkaThread {
this.groupState = new GroupState(rebalanceConfig);
long retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+ final int requestTimeoutMs =
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
OffsetsRequestManager offsetsRequestManager =
new OffsetsRequestManager(
@@ -156,6 +163,7 @@ public class DefaultBackgroundThread extends KafkaThread {
configuredIsolationLevel(config),
time,
retryBackoffMs,
+ requestTimeoutMs,
apiVersions,
logContext);
CoordinatorRequestManager coordinatorRequestManager = null;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index 234919eaf65..33a05bdcf89 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -21,12 +21,10 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.StaleMetadataException;
-import org.apache.kafka.clients.consumer.LogTruncationException;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
import
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
-import
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils.OffsetForEpochResult;
import
org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
@@ -41,13 +39,10 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -70,7 +65,6 @@ public class OffsetFetcher {
private final SubscriptionState subscriptions;
private final ConsumerNetworkClient client;
private final Time time;
- private final long retryBackoffMs;
private final long requestTimeoutMs;
private final IsolationLevel isolationLevel;
private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
@@ -91,7 +85,6 @@ public class OffsetFetcher {
this.client = client;
this.metadata = metadata;
this.subscriptions = subscriptions;
- this.retryBackoffMs = retryBackoffMs;
this.requestTimeoutMs = requestTimeoutMs;
this.isolationLevel = isolationLevel;
this.apiVersions = apiVersions;
@@ -227,16 +220,12 @@ public class OffsetFetcher {
future.addListener(new RequestFutureListener<ListOffsetResult>() {
@Override
public void onSuccess(ListOffsetResult result) {
-
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(
- resetTimestamps,
- result);
+
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
result);
}
@Override
public void onFailure(RuntimeException e) {
- offsetFetcherUtils.onFailedRequestForResettingPositions(
- resetTimestamps,
- e);
+
offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e);
}
});
}
@@ -284,55 +273,18 @@ public class OffsetFetcher {
future.addListener(new
RequestFutureListener<OffsetForEpochResult>() {
@Override
public void onSuccess(OffsetForEpochResult offsetsResult) {
- List<SubscriptionState.LogTruncation> truncations = new
ArrayList<>();
- if (!offsetsResult.partitionsToRetry().isEmpty()) {
-
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(),
time.milliseconds() + retryBackoffMs);
- metadata.requestUpdate(false);
- }
-
- // For each OffsetsForLeader response, check if the
end-offset is lower than our current offset
- // for the partition. If so, it means we have experienced
log truncation and need to reposition
- // that partition's offset.
- //
- // In addition, check whether the returned offset and
epoch are valid. If not, then we should reset
- // its offset if reset policy is configured, or throw out
of range exception.
- offsetsResult.endOffsets().forEach((topicPartition,
respEndOffset) -> {
- FetchPosition requestPosition =
fetchPositions.get(topicPartition);
- Optional<SubscriptionState.LogTruncation>
truncationOpt =
-
subscriptions.maybeCompleteValidation(topicPartition, requestPosition,
respEndOffset);
- truncationOpt.ifPresent(truncations::add);
- });
-
- if (!truncations.isEmpty()) {
-
offsetFetcherUtils.maybeSetOffsetForLeaderException(buildLogTruncationException(truncations));
- }
+
offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(fetchPositions,
+ offsetsResult);
}
@Override
public void onFailure(RuntimeException e) {
- subscriptions.requestFailed(fetchPositions.keySet(),
time.milliseconds() + retryBackoffMs);
- metadata.requestUpdate(false);
-
- if (!(e instanceof RetriableException)) {
- offsetFetcherUtils.maybeSetOffsetForLeaderException(e);
- }
+
offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e);
}
});
});
}
- private LogTruncationException
buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) {
- Map<TopicPartition, OffsetAndMetadata> divergentOffsets = new
HashMap<>();
- Map<TopicPartition, Long> truncatedFetchOffsets = new HashMap<>();
- for (SubscriptionState.LogTruncation truncation : truncations) {
- truncation.divergentOffsetOpt.ifPresent(divergentOffset ->
- divergentOffsets.put(truncation.topicPartition,
divergentOffset));
- truncatedFetchOffsets.put(truncation.topicPartition,
truncation.fetchPosition.offset);
- }
- return new LogTruncationException("Detected truncated partitions: " +
truncations,
- truncatedFetchOffsets, divergentOffsets);
- }
-
/**
* Search the offsets by target times for the specified partitions.
*
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index e8550bec645..b7fdefeb0d1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -18,6 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
@@ -37,9 +39,11 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -59,8 +63,19 @@ class OffsetFetcherUtils {
private final ApiVersions apiVersions;
private final Logger log;
- private final AtomicReference<RuntimeException>
cachedOffsetForLeaderException = new AtomicReference<>();
- private final AtomicReference<RuntimeException> cachedListOffsetsException
= new AtomicReference<>();
+ /**
+ * Exception that occurred while validating positions, that will be
propagated on the next
+ * call to validate positions. This could be an error received in the
+ * OffsetsForLeaderEpoch response, or a LogTruncationException detected
when using a
+ * successful response to validate the positions. It will be cleared when
thrown.
+ */
+ private final AtomicReference<RuntimeException>
cachedValidatePositionsException = new AtomicReference<>();
+ /**
+ * Exception that occurred while resetting positions, that will be
propagated on the next
+ * call to reset positions. This will have the error received in the
response to the
+ * ListOffsets request. It will be cleared when thrown on the next call to
reset.
+ */
+ private final AtomicReference<RuntimeException>
cachedResetPositionsException = new AtomicReference<>();
private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
OffsetFetcherUtils(LogContext logContext,
@@ -171,7 +186,7 @@ class OffsetFetcherUtils {
}
Map<TopicPartition, SubscriptionState.FetchPosition>
getPartitionsToValidate() {
- RuntimeException exception =
cachedOffsetForLeaderException.getAndSet(null);
+ RuntimeException exception =
cachedValidatePositionsException.getAndSet(null);
if (exception != null)
throw exception;
@@ -187,9 +202,9 @@ class OffsetFetcherUtils {
.collect(Collectors.toMap(Function.identity(),
subscriptionState::position));
}
- void maybeSetOffsetForLeaderException(RuntimeException e) {
- if (!cachedOffsetForLeaderException.compareAndSet(null, e)) {
- log.error("Discarding error in OffsetsForLeaderEpoch because
another error is pending", e);
+ void maybeSetValidatePositionsException(RuntimeException e) {
+ if (!cachedValidatePositionsException.compareAndSet(null, e)) {
+ log.error("Discarding error validating positions because another
error is pending", e);
}
}
@@ -209,7 +224,7 @@ class OffsetFetcherUtils {
Map<TopicPartition, Long> getOffsetResetTimestamp() {
// Raise exception from previous offset fetch if there is one
- RuntimeException exception =
cachedListOffsetsException.getAndSet(null);
+ RuntimeException exception =
cachedResetPositionsException.getAndSet(null);
if (exception != null)
throw exception;
@@ -285,7 +300,7 @@ class OffsetFetcherUtils {
return null;
}
- void onSuccessfulRequestForResettingPositions(
+ void onSuccessfulResponseForResettingPositions(
final Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
final ListOffsetResult result) {
if (!result.partitionsToRetry.isEmpty()) {
@@ -304,15 +319,66 @@ class OffsetFetcherUtils {
}
}
- void onFailedRequestForResettingPositions(
+ void onFailedResponseForResettingPositions(
final Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
final RuntimeException error) {
subscriptionState.requestFailed(resetTimestamps.keySet(),
time.milliseconds() + retryBackoffMs);
metadata.requestUpdate(false);
- if (!(error instanceof RetriableException) &&
!cachedListOffsetsException.compareAndSet(null,
+ if (!(error instanceof RetriableException) &&
!cachedResetPositionsException.compareAndSet(null,
error))
- log.error("Discarding error in ListOffsetResponse because another
error is pending", error);
+ log.error("Discarding error resetting positions because another
error is pending",
+ error);
+ }
+
+
+ void onSuccessfulResponseForValidatingPositions(
+ final Map<TopicPartition, SubscriptionState.FetchPosition>
fetchPositions,
+ final OffsetsForLeaderEpochUtils.OffsetForEpochResult
offsetsResult) {
+ List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
+ if (!offsetsResult.partitionsToRetry().isEmpty()) {
+
subscriptionState.setNextAllowedRetry(offsetsResult.partitionsToRetry(),
+ time.milliseconds() + retryBackoffMs);
+ metadata.requestUpdate(false);
+ }
+
+ // For each OffsetsForLeader response, check if the end-offset is
lower than our current offset
+ // for the partition. If so, it means we have experienced log
truncation and need to reposition
+ // that partition's offset.
+ // In addition, check whether the returned offset and epoch are valid.
If not, then we should reset
+ // its offset if reset policy is configured, or throw out of range
exception.
+ offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> {
+ SubscriptionState.FetchPosition requestPosition =
fetchPositions.get(topicPartition);
+ Optional<SubscriptionState.LogTruncation> truncationOpt =
+ subscriptionState.maybeCompleteValidation(topicPartition,
requestPosition,
+ respEndOffset);
+ truncationOpt.ifPresent(truncations::add);
+ });
+
+ if (!truncations.isEmpty()) {
+
maybeSetValidatePositionsException(buildLogTruncationException(truncations));
+ }
+ }
+
+ void onFailedResponseForValidatingPositions(final Map<TopicPartition,
SubscriptionState.FetchPosition> fetchPositions,
+ final RuntimeException error) {
+ subscriptionState.requestFailed(fetchPositions.keySet(),
time.milliseconds() + retryBackoffMs);
+ metadata.requestUpdate(false);
+
+ if (!(error instanceof RetriableException)) {
+ maybeSetValidatePositionsException(error);
+ }
+ }
+
+ private LogTruncationException
buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) {
+ Map<TopicPartition, OffsetAndMetadata> divergentOffsets = new
HashMap<>();
+ Map<TopicPartition, Long> truncatedFetchOffsets = new HashMap<>();
+ for (SubscriptionState.LogTruncation truncation : truncations) {
+ truncation.divergentOffsetOpt.ifPresent(divergentOffset ->
+ divergentOffsets.put(truncation.topicPartition,
divergentOffset));
+ truncatedFetchOffsets.put(truncation.topicPartition,
truncation.fetchPosition.offset);
+ }
+ return new LogTruncationException(truncatedFetchOffsets,
divergentOffsets);
}
// Visible for testing
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index 57650f52fe3..6bfdce392da 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -18,23 +18,12 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TopicAuthorizationException;
-import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
-import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic;
-import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
-import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
-import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.utils.LogContext;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
/**
* Convenience class for making asynchronous requests to the
OffsetsForLeaderEpoch API
@@ -43,7 +32,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
Map<TopicPartition, SubscriptionState.FetchPosition>,
OffsetsForLeaderEpochRequest,
OffsetsForLeaderEpochResponse,
- OffsetsForLeaderEpochClient.OffsetForEpochResult> {
+ OffsetsForLeaderEpochUtils.OffsetForEpochResult> {
OffsetsForLeaderEpochClient(ConsumerNetworkClient client, LogContext
logContext) {
super(client, logContext);
@@ -52,98 +41,15 @@ public class OffsetsForLeaderEpochClient extends
AsyncClient<
@Override
protected AbstractRequest.Builder<OffsetsForLeaderEpochRequest>
prepareRequest(
Node node, Map<TopicPartition, SubscriptionState.FetchPosition>
requestData) {
- OffsetForLeaderTopicCollection topics = new
OffsetForLeaderTopicCollection(requestData.size());
- requestData.forEach((topicPartition, fetchPosition) ->
- fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
- OffsetForLeaderTopic topic =
topics.find(topicPartition.topic());
- if (topic == null) {
- topic = new
OffsetForLeaderTopic().setTopic(topicPartition.topic());
- topics.add(topic);
- }
- topic.partitions().add(new OffsetForLeaderPartition()
- .setPartition(topicPartition.partition())
- .setLeaderEpoch(fetchEpoch)
- .setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
- .orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
- );
- })
- );
- return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
+ return OffsetsForLeaderEpochUtils.prepareRequest(requestData);
}
@Override
- protected OffsetForEpochResult handleResponse(
+ protected OffsetsForLeaderEpochUtils.OffsetForEpochResult handleResponse(
Node node,
Map<TopicPartition, SubscriptionState.FetchPosition> requestData,
OffsetsForLeaderEpochResponse response) {
- Set<TopicPartition> partitionsToRetry = new
HashSet<>(requestData.keySet());
- Set<String> unauthorizedTopics = new HashSet<>();
- Map<TopicPartition, EpochEndOffset> endOffsets = new HashMap<>();
-
- for (OffsetForLeaderTopicResult topic : response.data().topics()) {
- for (EpochEndOffset partition : topic.partitions()) {
- TopicPartition topicPartition = new
TopicPartition(topic.topic(), partition.partition());
-
- if (!requestData.containsKey(topicPartition)) {
- logger().warn("Received unrequested topic or partition {}
from response, ignoring.", topicPartition);
- continue;
- }
-
- Errors error = Errors.forCode(partition.errorCode());
- switch (error) {
- case NONE:
- logger().debug("Handling OffsetsForLeaderEpoch
response for {}. Got offset {} for epoch {}.",
- topicPartition, partition.endOffset(),
partition.leaderEpoch());
- endOffsets.put(topicPartition, partition);
- partitionsToRetry.remove(topicPartition);
- break;
- case NOT_LEADER_OR_FOLLOWER:
- case REPLICA_NOT_AVAILABLE:
- case KAFKA_STORAGE_ERROR:
- case OFFSET_NOT_AVAILABLE:
- case LEADER_NOT_AVAILABLE:
- case FENCED_LEADER_EPOCH:
- case UNKNOWN_LEADER_EPOCH:
- logger().debug("Attempt to fetch offsets for partition
{} failed due to {}, retrying.",
- topicPartition, error);
- break;
- case UNKNOWN_TOPIC_OR_PARTITION:
- logger().warn("Received unknown topic or partition
error in OffsetsForLeaderEpoch request for partition {}.",
- topicPartition);
- break;
- case TOPIC_AUTHORIZATION_FAILED:
- unauthorizedTopics.add(topicPartition.topic());
- partitionsToRetry.remove(topicPartition);
- break;
- default:
- logger().warn("Attempt to fetch offsets for partition
{} failed due to: {}, retrying.",
- topicPartition, error.message());
- }
- }
- }
-
- if (!unauthorizedTopics.isEmpty())
- throw new TopicAuthorizationException(unauthorizedTopics);
- else
- return new OffsetForEpochResult(endOffsets, partitionsToRetry);
- }
-
- public static class OffsetForEpochResult {
- private final Map<TopicPartition, EpochEndOffset> endOffsets;
- private final Set<TopicPartition> partitionsToRetry;
-
- OffsetForEpochResult(Map<TopicPartition, EpochEndOffset> endOffsets,
Set<TopicPartition> partitionsNeedingRetry) {
- this.endOffsets = endOffsets;
- this.partitionsToRetry = partitionsNeedingRetry;
- }
-
- public Map<TopicPartition, EpochEndOffset> endOffsets() {
- return endOffsets;
- }
-
- public Set<TopicPartition> partitionsToRetry() {
- return partitionsToRetry;
- }
+ return OffsetsForLeaderEpochUtils.handleResponse(requestData,
response);
}
-}
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
similarity index 64%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
index 57650f52fe3..4970446a3e5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
@@ -29,7 +28,8 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
-import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
@@ -37,43 +37,36 @@ import java.util.Map;
import java.util.Set;
/**
- * Convenience class for making asynchronous requests to the
OffsetsForLeaderEpoch API
+ * Utility methods for preparing requests to the OffsetsForLeaderEpoch API and
handling responses.
*/
-public class OffsetsForLeaderEpochClient extends AsyncClient<
- Map<TopicPartition, SubscriptionState.FetchPosition>,
- OffsetsForLeaderEpochRequest,
- OffsetsForLeaderEpochResponse,
- OffsetsForLeaderEpochClient.OffsetForEpochResult> {
-
- OffsetsForLeaderEpochClient(ConsumerNetworkClient client, LogContext
logContext) {
- super(client, logContext);
- }
+public final class OffsetsForLeaderEpochUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OffsetsForLeaderEpochUtils.class);
+
+ private OffsetsForLeaderEpochUtils(){}
- @Override
- protected AbstractRequest.Builder<OffsetsForLeaderEpochRequest>
prepareRequest(
- Node node, Map<TopicPartition, SubscriptionState.FetchPosition>
requestData) {
+ static AbstractRequest.Builder<OffsetsForLeaderEpochRequest>
prepareRequest(
+ Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
OffsetForLeaderTopicCollection topics = new
OffsetForLeaderTopicCollection(requestData.size());
requestData.forEach((topicPartition, fetchPosition) ->
- fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
- OffsetForLeaderTopic topic =
topics.find(topicPartition.topic());
- if (topic == null) {
- topic = new
OffsetForLeaderTopic().setTopic(topicPartition.topic());
- topics.add(topic);
- }
- topic.partitions().add(new OffsetForLeaderPartition()
- .setPartition(topicPartition.partition())
- .setLeaderEpoch(fetchEpoch)
- .setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
- .orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
- );
- })
+ fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
+ OffsetForLeaderTopic topic =
topics.find(topicPartition.topic());
+ if (topic == null) {
+ topic = new
OffsetForLeaderTopic().setTopic(topicPartition.topic());
+ topics.add(topic);
+ }
+ topic.partitions().add(new OffsetForLeaderPartition()
+ .setPartition(topicPartition.partition())
+ .setLeaderEpoch(fetchEpoch)
+
.setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
+
.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+ );
+ })
);
return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
}
- @Override
- protected OffsetForEpochResult handleResponse(
- Node node,
+ public static OffsetForEpochResult handleResponse(
Map<TopicPartition, SubscriptionState.FetchPosition> requestData,
OffsetsForLeaderEpochResponse response) {
@@ -86,15 +79,15 @@ public class OffsetsForLeaderEpochClient extends
AsyncClient<
TopicPartition topicPartition = new
TopicPartition(topic.topic(), partition.partition());
if (!requestData.containsKey(topicPartition)) {
- logger().warn("Received unrequested topic or partition {}
from response, ignoring.", topicPartition);
+ LOG.warn("Received unrequested topic or partition {} from
response, ignoring.", topicPartition);
continue;
}
Errors error = Errors.forCode(partition.errorCode());
switch (error) {
case NONE:
- logger().debug("Handling OffsetsForLeaderEpoch
response for {}. Got offset {} for epoch {}.",
- topicPartition, partition.endOffset(),
partition.leaderEpoch());
+ LOG.debug("Handling OffsetsForLeaderEpoch response for
{}. Got offset {} for epoch {}.",
+ topicPartition, partition.endOffset(),
partition.leaderEpoch());
endOffsets.put(topicPartition, partition);
partitionsToRetry.remove(topicPartition);
break;
@@ -105,31 +98,31 @@ public class OffsetsForLeaderEpochClient extends
AsyncClient<
case LEADER_NOT_AVAILABLE:
case FENCED_LEADER_EPOCH:
case UNKNOWN_LEADER_EPOCH:
- logger().debug("Attempt to fetch offsets for partition
{} failed due to {}, retrying.",
- topicPartition, error);
+ LOG.debug("Attempt to fetch offsets for partition {}
failed due to {}, retrying.",
+ topicPartition, error);
break;
case UNKNOWN_TOPIC_OR_PARTITION:
- logger().warn("Received unknown topic or partition
error in OffsetsForLeaderEpoch request for partition {}.",
- topicPartition);
+ LOG.warn("Received unknown topic or partition error in
OffsetsForLeaderEpoch request for partition {}.",
+ topicPartition);
break;
case TOPIC_AUTHORIZATION_FAILED:
unauthorizedTopics.add(topicPartition.topic());
partitionsToRetry.remove(topicPartition);
break;
default:
- logger().warn("Attempt to fetch offsets for partition
{} failed due to: {}, retrying.",
- topicPartition, error.message());
+ LOG.warn("Attempt to fetch offsets for partition {}
failed due to: {}, retrying.",
+ topicPartition, error.message());
}
}
}
if (!unauthorizedTopics.isEmpty())
throw new TopicAuthorizationException(unauthorizedTopics);
- else
- return new OffsetForEpochResult(endOffsets, partitionsToRetry);
+
+ return new OffsetForEpochResult(endOffsets, partitionsToRetry);
}
- public static class OffsetForEpochResult {
+ static class OffsetForEpochResult {
private final Map<TopicPartition, EpochEndOffset> endOffsets;
private final Set<TopicPartition> partitionsToRetry;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index a0e3b1013b7..10977b7f78e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -18,7 +18,9 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
import
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
@@ -28,8 +30,11 @@ import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData;
+import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
@@ -48,6 +53,8 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
+import static
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.hasUsableOffsetForLeaderEpochVersion;
+import static
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupFetchPositionsByLeader;
/**
* Manager responsible for building the following requests to retrieve
partition offsets, and
@@ -68,15 +75,20 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
private final IsolationLevel isolationLevel;
private final Logger log;
private final OffsetFetcherUtils offsetFetcherUtils;
+ private final SubscriptionState subscriptionState;
private final Set<ListOffsetsRequestState> requestsToRetry;
private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
+ private final long requestTimeoutMs;
+ private final Time time;
+ private final ApiVersions apiVersions;
public OffsetsRequestManager(final SubscriptionState subscriptionState,
final ConsumerMetadata metadata,
final IsolationLevel isolationLevel,
final Time time,
final long retryBackoffMs,
+ final long requestTimeoutMs,
final ApiVersions apiVersions,
final LogContext logContext) {
requireNonNull(subscriptionState);
@@ -91,6 +103,10 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
this.log = logContext.logger(getClass());
this.requestsToRetry = new HashSet<>();
this.requestsToSend = new ArrayList<>();
+ this.subscriptionState = subscriptionState;
+ this.time = time;
+ this.requestTimeoutMs = requestTimeoutMs;
+ this.apiVersions = apiVersions;
this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata,
subscriptionState,
time, retryBackoffMs, apiVersions);
// Register the cluster metadata update callback. Note this only
relies on the
@@ -154,6 +170,52 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch,
result.fetchedOffsets));
}
+ /**
+ * Reset offsets for all assigned partitions that require it. Offsets will
be reset
+ * with timestamps according to the reset strategy defined for each
partition. This will
+ * generate ListOffsets requests for the partitions and timestamps, and
enqueue them to be sent
+ * on the next call to {@link #poll(long)}.
+ *
+ * <p/>
+ *
+ * When a response is received, positions are updated in-memory, on the
subscription state. If
+ * an error is received in the response, it will be saved to be thrown on
the next call to
+ * this function (ex. {@link
org.apache.kafka.common.errors.TopicAuthorizationException})
+ */
+ public void resetPositionsIfNeeded() {
+ Map<TopicPartition, Long> offsetResetTimestamps =
offsetFetcherUtils.getOffsetResetTimestamp();
+
+ if (offsetResetTimestamps.isEmpty())
+ return;
+
+ List<NetworkClientDelegate.UnsentRequest> unsentRequests =
+
buildListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
+ requestsToSend.addAll(unsentRequests);
+ }
+
+ /**
+ * Validate positions for all assigned partitions for which a leader
change has been detected.
+ * This will generate OffsetsForLeaderEpoch requests for the partitions,
with the known offset
+ * epoch and current leader epoch. It will enqueue the generated requests,
to be sent on the
+ * next call to {@link #poll(long)}.
+ *
+ * <p/>
+ *
+ * When a response is received, positions are validated and, if a log
truncation is
+ * detected, a {@link LogTruncationException} will be saved in memory, to
be thrown on the
+ * next call to this function.
+ */
+ public void validatePositionsIfNeeded() {
+ Map<TopicPartition, SubscriptionState.FetchPosition>
partitionsToValidate =
+ offsetFetcherUtils.getPartitionsToValidate();
+ if (partitionsToValidate.isEmpty()) {
+ return;
+ }
+ List<NetworkClientDelegate.UnsentRequest> unsentRequests =
+
buildOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate);
+ requestsToSend.addAll(unsentRequests);
+ }
+
/**
* Generate requests for partitions with known leaders. Update the
listOffsetsRequestState by adding
* partitions with unknown leader to the
listOffsetsRequestState.remainingToSearch
@@ -205,14 +267,14 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
final boolean requireTimestamps,
final ListOffsetsRequestState listOffsetsRequestState) {
log.debug("Building ListOffsets request for partitions {}",
timestampsToSearch);
- Map<Node, Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition>> partitionResetTimestampsByNode =
+ Map<Node, Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch,
Optional.of(listOffsetsRequestState));
- if (partitionResetTimestampsByNode.isEmpty()) {
+ if (timestampsToSearchByNode.isEmpty()) {
throw new StaleMetadataException();
}
final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new
ArrayList<>();
- MultiNodeRequest multiNodeRequest = new
MultiNodeRequest(partitionResetTimestampsByNode.size());
+ MultiNodeRequest multiNodeRequest = new
MultiNodeRequest(timestampsToSearchByNode.size());
multiNodeRequest.onComplete((multiNodeResult, error) -> {
// Done sending request to a set of known leaders
if (error == null) {
@@ -235,7 +297,7 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
}
});
- for (Map.Entry<Node, Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition>> entry :
partitionResetTimestampsByNode.entrySet()) {
+ for (Map.Entry<Node, Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition>> entry :
timestampsToSearchByNode.entrySet()) {
Node node = entry.getKey();
CompletableFuture<ListOffsetResult> partialResult =
buildListOffsetRequestToNode(
@@ -268,7 +330,7 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
.forConsumer(requireTimestamps, isolationLevel, false)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
- log.debug("Creating ListOffsetRequest {} for broker {} to reset
positions", builder,
+ log.debug("Creating ListOffset request {} for broker {} to reset
positions", builder,
node);
NetworkClientDelegate.UnsentRequest unsentRequest = new
NetworkClientDelegate.UnsentRequest(
@@ -278,7 +340,7 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
CompletableFuture<ListOffsetResult> result = new CompletableFuture<>();
unsentRequest.future().whenComplete((response, error) -> {
if (error != null) {
- log.debug("Sending ListOffsetRequest {} to broker {} failed",
+ log.debug("Sending ListOffset request {} to broker {} failed",
builder,
node,
error);
@@ -298,6 +360,154 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
return result;
}
+ /**
+ * Make asynchronous ListOffsets request to fetch offsets by target times
for the specified
+ * partitions.
+ * Use the retrieved offsets to reset positions in the subscription state.
+ *
+ * @param timestampsToSearch the mapping between partitions and target time
+ * @return A list of
+ * {@link
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
+ * that can be polled to obtain the corresponding timestamps and offsets.
+ */
+ private List<NetworkClientDelegate.UnsentRequest>
buildListOffsetsRequestsAndResetPositions(
+ final Map<TopicPartition, Long> timestampsToSearch) {
+ Map<Node, Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
+ groupListOffsetRequests(timestampsToSearch, Optional.empty());
+
+ final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new
ArrayList<>();
+
+ timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
+ subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(),
+ time.milliseconds() + requestTimeoutMs);
+
+ CompletableFuture<ListOffsetResult> partialResult =
buildListOffsetRequestToNode(
+ node,
+ resetTimestamps,
+ false,
+ unsentRequests);
+
+ partialResult.whenComplete((result, error) -> {
+ if (error == null) {
+
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
+ result);
+ } else {
+ RuntimeException e;
+ if (error instanceof RuntimeException) {
+ e = (RuntimeException) error;
+ } else {
+ e = new RuntimeException("Unexpected failure in
ListOffsets request for " +
+ "resetting positions", error);
+ }
+
offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e);
+ }
+ });
+ });
+ return unsentRequests;
+ }
+
+ /**
+ * For each partition that needs validation, make an asynchronous request
to get the end-offsets
+ * for the partition with the epoch less than or equal to the epoch the
partition last saw.
+ * <p/>
+ * Requests are grouped by Node for efficiency.
+ */
+ private List<NetworkClientDelegate.UnsentRequest>
buildOffsetsForLeaderEpochRequestsAndValidatePositions(
+ Map<TopicPartition, SubscriptionState.FetchPosition>
partitionsToValidate) {
+
+ final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>>
regrouped =
+ regroupFetchPositionsByLeader(partitionsToValidate);
+
+ long nextResetTimeMs = time.milliseconds() + requestTimeoutMs;
+ final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new
ArrayList<>();
+ regrouped.forEach((node, fetchPositions) -> {
+
+ if (node.isEmpty()) {
+ metadata.requestUpdate(true);
+ return;
+ }
+
+ NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+ if (nodeApiVersions == null) {
+ return;
+ }
+
+ if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+ log.debug("Skipping validation of fetch offsets for partitions
{} since the broker does not " +
+ "support the required protocol version
(introduced in Kafka 2.3)",
+ fetchPositions.keySet());
+ for (TopicPartition partition : fetchPositions.keySet()) {
+ subscriptionState.completeValidation(partition);
+ }
+ return;
+ }
+
+ subscriptionState.setNextAllowedRetry(fetchPositions.keySet(),
nextResetTimeMs);
+
+ CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult>
partialResult =
+ buildOffsetsForLeaderEpochRequestToNode(node,
fetchPositions, unsentRequests);
+
+ partialResult.whenComplete((offsetsResult, error) -> {
+ if (error == null) {
+
offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(fetchPositions,
+ offsetsResult);
+ } else {
+ RuntimeException e;
+ if (error instanceof RuntimeException) {
+ e = (RuntimeException) error;
+ } else {
+ e = new RuntimeException("Unexpected failure in
OffsetsForLeaderEpoch " +
+ "request for validating positions", error);
+ }
+
offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e);
+ }
+ });
+
+ });
+
+ return unsentRequests;
+ }
+
+ /**
+ * Build OffsetsForLeaderEpoch request to send to a specific broker for
the partitions and
+ * positions to fetch. This also adds the request to the list of
unsentRequests.
+ **/
+ private CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult>
buildOffsetsForLeaderEpochRequestToNode(
+ final Node node,
+ final Map<TopicPartition, SubscriptionState.FetchPosition>
fetchPositions,
+ List<NetworkClientDelegate.UnsentRequest> unsentRequests) {
+ AbstractRequest.Builder<OffsetsForLeaderEpochRequest> builder =
+ OffsetsForLeaderEpochUtils.prepareRequest(fetchPositions);
+
+ log.debug("Creating OffsetsForLeaderEpoch request request {} to broker
{}", builder, node);
+
+ NetworkClientDelegate.UnsentRequest unsentRequest = new
NetworkClientDelegate.UnsentRequest(
+ builder,
+ Optional.ofNullable(node));
+ unsentRequests.add(unsentRequest);
+ CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult>
result = new CompletableFuture<>();
+ unsentRequest.future().whenComplete((response, error) -> {
+ if (error != null) {
+ log.debug("Sending OffsetsForLeaderEpoch request {} to broker
{} failed",
+ builder,
+ node,
+ error);
+ result.completeExceptionally(error);
+ } else {
+ OffsetsForLeaderEpochResponse offsetsForLeaderEpochResponse =
(OffsetsForLeaderEpochResponse) response.responseBody();
+ log.trace("Received OffsetsForLeaderEpoch response {} from
broker {}", offsetsForLeaderEpochResponse, node);
+ try {
+ OffsetsForLeaderEpochUtils.OffsetForEpochResult
listOffsetResult =
+
OffsetsForLeaderEpochUtils.handleResponse(fetchPositions,
offsetsForLeaderEpochResponse);
+ result.complete(listOffsetResult);
+ } catch (RuntimeException e) {
+ result.completeExceptionally(e);
+ }
+ }
+ });
+ return result;
+ }
+
private static class ListOffsetsRequestState {
private final Map<TopicPartition, Long> timestampsToSearch;
@@ -383,7 +593,7 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
if (!leaderAndEpoch.leader.isPresent()) {
log.debug("Leader for partition {} is unknown for fetching
offset {}", tp, offset);
- metadata.requestUpdate(false);
+ metadata.requestUpdate(true);
listOffsetsRequestState.ifPresent(offsetsRequestState ->
offsetsRequestState.remainingToSearch.put(tp, offset));
} else {
int currentLeaderEpoch =
leaderAndEpoch.epoch.orElse(ListOffsetsResponse.UNKNOWN_EPOCH);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index 88f3e239376..f6b8ede5745 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -35,6 +36,8 @@ import
org.apache.kafka.clients.consumer.internals.events.EventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -193,6 +196,9 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
// be processed in the collectFetches().
backgroundEvent.ifPresent(event -> processEvent(event,
timeout));
}
+
+ updateFetchPositionsIfNeeded();
+
// The idea here is to have the background thread sending
fetches autonomously, and the fetcher
// uses the poll loop to retrieve successful fetchResponse and
process them on the polling thread.
final Fetch<K, V> fetch = collectFetches();
@@ -209,6 +215,34 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
return ConsumerRecords.empty();
}
+ /**
+ * Set the fetch position to the committed position (if there is one) or
reset it using the
+ * offset reset policy the user has configured (if partitions require
reset)
+ *
+ * @return true if the operation completed without timing out
+ * @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
+ * @throws NoOffsetForPartitionException If no
offset is stored for a given partition and no offset reset policy is
+ * defined
+ */
+ private boolean updateFetchPositionsIfNeeded() {
+ // If any partitions have been truncated due to a leader change, we
need to validate the offsets
+ ValidatePositionsApplicationEvent validatePositionsEvent = new
ValidatePositionsApplicationEvent();
+ eventHandler.add(validatePositionsEvent);
+
+ // TODO: integrate logic for refreshing committed offsets if available
+
+ // If there are partitions still needing a position and a reset policy
is defined,
+ // request reset using the default policy. If no reset strategy is
defined and there
+ // are partitions with a missing position, then we will raise a
NoOffsetForPartitionException exception.
+ subscriptions.resetInitializingPositions();
+
+ // Finally send an asynchronous request to look up and update the
positions of any
+ // partitions which are awaiting reset.
+ ResetPositionsApplicationEvent resetPositionsEvent = new
ResetPositionsApplicationEvent();
+ eventHandler.add(resetPositionsEvent);
+ return true;
+ }
+
/**
* Commit offsets returned on the last {@link #poll(Duration) poll()} for
all the subscribed list of topics and
* partitions.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index e0fe1e0f306..eb1bffaf81d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -33,6 +33,6 @@ abstract public class ApplicationEvent {
public enum Type {
NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE,
ASSIGNMENT_CHANGE,
- LIST_OFFSETS,
+ LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS,
}
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index ab3291c47de..2cfbc2d04e4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -62,6 +62,10 @@ public class ApplicationEventProcessor {
return process((AssignmentChangeApplicationEvent) event);
case LIST_OFFSETS:
return process((ListOffsetsApplicationEvent) event);
+ case RESET_POSITIONS:
+ return processResetPositionsEvent();
+ case VALIDATE_POSITIONS:
+ return processValidatePositionsEvent();
}
return false;
}
@@ -140,4 +144,14 @@ public class ApplicationEventProcessor {
event.chain(future);
return true;
}
+
+ private boolean processResetPositionsEvent() {
+ requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
+ return true;
+ }
+
+ private boolean processValidatePositionsEvent() {
+ requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
+ return true;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
index 5deecc4150b..3bd862861a4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
@@ -91,7 +91,7 @@ public abstract class CompletableApplicationEvent<T> extends
ApplicationEvent {
@Override
public String toString() {
- return "CompletableApplicationEvent{" +
+ return getClass().getSimpleName() + "{" +
"future=" + future +
", type=" + type +
'}';
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
index a6d48cdcae0..91b032c3478 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
@@ -85,7 +85,7 @@ public class ListOffsetsApplicationEvent extends
CompletableApplicationEvent<Map
@Override
public String toString() {
- return "ListOffsetsApplicationEvent {" +
+ return getClass().getSimpleName() + " {" +
"timestampsToSearch=" + timestampsToSearch + ", " +
"requireTimestamps=" + requireTimestamps + '}';
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
index 7fa9dd1c432..e53248425d9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
@@ -55,7 +55,7 @@ public class OffsetFetchApplicationEvent extends
CompletableApplicationEvent<Map
@Override
public String toString() {
- return "OffsetFetchApplicationEvent{" +
+ return getClass().getSimpleName() + "{" +
"partitions=" + partitions +
", future=" + future +
", type=" + type +
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
similarity index 65%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
index e0fe1e0f306..5d9b07f9de0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
@@ -14,25 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.clients.consumer.internals.events;
/**
- * This is the abstract definition of the events created by the KafkaConsumer
API
+ * Event for resetting offsets for all assigned partitions that require it.
This is an
+ * asynchronous event that generates ListOffsets requests, and completes by
updating in-memory
+ * positions when responses are received.
*/
-abstract public class ApplicationEvent {
- public final Type type;
-
- protected ApplicationEvent(Type type) {
- this.type = type;
- }
-
- @Override
- public String toString() {
- return type + " ApplicationEvent";
- }
+public class ResetPositionsApplicationEvent extends
CompletableApplicationEvent<Void> {
- public enum Type {
- NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE,
ASSIGNMENT_CHANGE,
- LIST_OFFSETS,
+ public ResetPositionsApplicationEvent() {
+ super(Type.RESET_POSITIONS);
}
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
similarity index 65%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
index e0fe1e0f306..3b093e0b683 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
@@ -14,25 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.clients.consumer.internals.events;
/**
- * This is the abstract definition of the events created by the KafkaConsumer
API
+ * Event for validating offsets for all assigned partitions for which a leader
change has been
+ * detected. This is an asynchronous event that generates OffsetForLeaderEpoch
requests, and
+ * completes by validating in-memory positions against the offsets received in
the responses.
*/
-abstract public class ApplicationEvent {
- public final Type type;
-
- protected ApplicationEvent(Type type) {
- this.type = type;
- }
-
- @Override
- public String toString() {
- return type + " ApplicationEvent";
- }
+public class ValidatePositionsApplicationEvent extends
CompletableApplicationEvent<Void> {
- public enum Type {
- NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE,
ASSIGNMENT_CHANGE,
- LIST_OFFSETS,
+ public ValidatePositionsApplicationEvent() {
+ super(Type.VALIDATE_POSITIONS);
}
}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
index 112d8bacfb2..b5a1ced617a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
@@ -27,7 +28,10 @@ import
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent
import
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -53,9 +57,11 @@ import static
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -181,6 +187,87 @@ public class DefaultBackgroundThreadTest {
assertTrue(applicationEventsQueue.isEmpty());
backgroundThread.close();
}
+
+ @Test
+ public void testResetPositionsEventIsProcessed() {
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+ DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+ ResetPositionsApplicationEvent e = new
ResetPositionsApplicationEvent();
+ this.applicationEventsQueue.add(e);
+ backgroundThread.runOnce();
+
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
+ assertTrue(applicationEventsQueue.isEmpty());
+ backgroundThread.close();
+ }
+
+ @Test
+ public void testResetPositionsProcessFailure() {
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+ applicationEventProcessor = spy(new ApplicationEventProcessor(
+ this.backgroundEventsQueue,
+ mockRequestManagers(),
+ metadata));
+ DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+
+ TopicAuthorizationException authException = new
TopicAuthorizationException("Topic authorization failed");
+
doThrow(authException).when(offsetsRequestManager).resetPositionsIfNeeded();
+
+ ResetPositionsApplicationEvent event = new
ResetPositionsApplicationEvent();
+ this.applicationEventsQueue.add(event);
+ assertThrows(TopicAuthorizationException.class,
backgroundThread::runOnce);
+
+
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
+ backgroundThread.close();
+ }
+
+ @Test
+ public void testValidatePositionsEventIsProcessed() {
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+ DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+ ValidatePositionsApplicationEvent e = new
ValidatePositionsApplicationEvent();
+ this.applicationEventsQueue.add(e);
+ backgroundThread.runOnce();
+
verify(applicationEventProcessor).process(any(ValidatePositionsApplicationEvent.class));
+ assertTrue(applicationEventsQueue.isEmpty());
+ backgroundThread.close();
+ }
+
+ @Test
+ public void testValidatePositionsProcessFailure() {
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+ when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+ applicationEventProcessor = spy(new ApplicationEventProcessor(
+ this.backgroundEventsQueue,
+ mockRequestManagers(),
+ metadata));
+ DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+
+ LogTruncationException logTruncationException = new
LogTruncationException(Collections.emptyMap(), Collections.emptyMap());
+
doThrow(logTruncationException).when(offsetsRequestManager).validatePositionsIfNeeded();
+
+ ValidatePositionsApplicationEvent event = new
ValidatePositionsApplicationEvent();
+ this.applicationEventsQueue.add(event);
+ assertThrows(LogTruncationException.class, backgroundThread::runOnce);
+
+
verify(applicationEventProcessor).process(any(ValidatePositionsApplicationEvent.class));
+ backgroundThread.close();
+ }
+
@Test
public void testAssignmentChangeEvent() {
this.applicationEventsQueue = new LinkedBlockingQueue<>();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
index 43c4130a485..57c61babd11 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
@@ -55,7 +55,7 @@ public class OffsetForLeaderEpochClientTest {
@Test
public void testEmptyResponse() {
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
- RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future
=
+ RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
offsetClient.sendAsyncRequest(Node.noNode(),
Collections.emptyMap());
OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(
@@ -63,7 +63,7 @@ public class OffsetForLeaderEpochClientTest {
client.prepareResponse(resp);
consumerClient.pollNoWakeup();
- OffsetsForLeaderEpochClient.OffsetForEpochResult result =
future.value();
+ OffsetsForLeaderEpochUtils.OffsetForEpochResult result =
future.value();
assertTrue(result.partitionsToRetry().isEmpty());
assertTrue(result.endOffsets().isEmpty());
}
@@ -75,7 +75,7 @@ public class OffsetForLeaderEpochClientTest {
new Metadata.LeaderAndEpoch(Optional.empty(),
Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
- RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future
=
+ RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(
@@ -83,7 +83,7 @@ public class OffsetForLeaderEpochClientTest {
client.prepareResponse(resp);
consumerClient.pollNoWakeup();
- OffsetsForLeaderEpochClient.OffsetForEpochResult result =
future.value();
+ OffsetsForLeaderEpochUtils.OffsetForEpochResult result =
future.value();
assertFalse(result.partitionsToRetry().isEmpty());
assertTrue(result.endOffsets().isEmpty());
}
@@ -95,14 +95,14 @@ public class OffsetForLeaderEpochClientTest {
new Metadata.LeaderAndEpoch(Optional.empty(),
Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
- RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future
=
+ RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
client.prepareResponse(prepareOffsetForLeaderEpochResponse(
tp0, Errors.NONE, 1, 10L));
consumerClient.pollNoWakeup();
- OffsetsForLeaderEpochClient.OffsetForEpochResult result =
future.value();
+ OffsetsForLeaderEpochUtils.OffsetForEpochResult result =
future.value();
assertTrue(result.partitionsToRetry().isEmpty());
assertTrue(result.endOffsets().containsKey(tp0));
assertEquals(result.endOffsets().get(tp0).errorCode(),
Errors.NONE.code());
@@ -117,7 +117,7 @@ public class OffsetForLeaderEpochClientTest {
new Metadata.LeaderAndEpoch(Optional.empty(),
Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
- RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future
=
+ RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
client.prepareResponse(prepareOffsetForLeaderEpochResponse(
@@ -136,7 +136,7 @@ public class OffsetForLeaderEpochClientTest {
new Metadata.LeaderAndEpoch(Optional.empty(),
Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
- RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future
=
+ RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
client.prepareResponse(prepareOffsetForLeaderEpochResponse(
@@ -144,7 +144,7 @@ public class OffsetForLeaderEpochClientTest {
consumerClient.pollNoWakeup();
assertFalse(future.failed());
- OffsetsForLeaderEpochClient.OffsetForEpochResult result =
future.value();
+ OffsetsForLeaderEpochUtils.OffsetForEpochResult result =
future.value();
assertTrue(result.partitionsToRetry().contains(tp0));
assertFalse(result.endOffsets().containsKey(tp0));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index 720fc1fec93..da762edf31c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.IsolationLevel;
@@ -29,11 +31,14 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@@ -64,7 +69,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -75,6 +83,7 @@ public class OffsetsRequestManagerTest {
private ConsumerMetadata metadata;
private SubscriptionState subscriptionState;
private MockTime time;
+ private ApiVersions apiVersions;
private static final String TEST_TOPIC = "t1";
private static final TopicPartition TEST_PARTITION_1 = new
TopicPartition(TEST_TOPIC, 1);
private static final TopicPartition TEST_PARTITION_2 = new
TopicPartition(TEST_TOPIC, 2);
@@ -82,15 +91,16 @@ public class OffsetsRequestManagerTest {
private static final Node LEADER_2 = new Node(0, "host2", 9092);
private static final IsolationLevel DEFAULT_ISOLATION_LEVEL =
IsolationLevel.READ_COMMITTED;
private static final int RETRY_BACKOFF_MS = 500;
+ private static final int REQUEST_TIMEOUT_MS = 500;
@BeforeEach
public void setup() {
metadata = mock(ConsumerMetadata.class);
subscriptionState = mock(SubscriptionState.class);
this.time = new MockTime(0);
- ApiVersions apiVersions = mock(ApiVersions.class);
+ apiVersions = mock(ApiVersions.class);
requestManager = new OffsetsRequestManager(subscriptionState, metadata,
- DEFAULT_ISOLATION_LEVEL, time, RETRY_BACKOFF_MS,
+ DEFAULT_ISOLATION_LEVEL, time, RETRY_BACKOFF_MS,
REQUEST_TIMEOUT_MS,
apiVersions, new LogContext());
}
@@ -99,7 +109,7 @@ public class OffsetsRequestManagerTest {
Map<TopicPartition, Long> timestampsToSearch =
Collections.singletonMap(TEST_PARTITION_1,
ListOffsetsRequest.EARLIEST_TIMESTAMP);
- expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
+ mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result =
requestManager.fetchOffsets(
timestampsToSearch,
false);
@@ -116,11 +126,12 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// Building list offsets request fails with unknown leader
- expectFailedRequest_MissingLeader();
+ mockFailedRequest_MissingLeader();
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>>
fetchOffsetsFuture =
requestManager.fetchOffsets(timestampsToSearch, false);
assertEquals(0, requestManager.requestsToSend());
assertEquals(1, requestManager.requestsToRetry());
+ verify(metadata).requestUpdate(true);
NetworkClientDelegate.PollResult res =
requestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
// Metadata update not happening within the time boundaries of the
request future, so
@@ -139,7 +150,7 @@ public class OffsetsRequestManagerTest {
Map<TopicPartition, Node> partitionLeaders = new HashMap<>();
partitionLeaders.put(TEST_PARTITION_1, LEADER_1);
partitionLeaders.put(TEST_PARTITION_2, LEADER_1);
- expectSuccessfulRequest(partitionLeaders);
+ mockSuccessfulRequest(partitionLeaders);
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result =
requestManager.fetchOffsets(
timestampsToSearch,
false);
@@ -176,7 +187,7 @@ public class OffsetsRequestManagerTest {
Map<TopicPartition, Long> timestampsToSearch =
Collections.singletonMap(TEST_PARTITION_1,
ListOffsetsRequest.EARLIEST_TIMESTAMP);
- expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
+ mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result =
requestManager.fetchOffsets(
timestampsToSearch,
false);
@@ -204,19 +215,21 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// Building list offsets request fails with unknown leader
- expectFailedRequest_MissingLeader();
+ mockFailedRequest_MissingLeader();
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>>
fetchOffsetsFuture =
requestManager.fetchOffsets(timestampsToSearch,
false);
assertEquals(0, requestManager.requestsToSend());
assertEquals(1, requestManager.requestsToRetry());
+ verify(metadata).requestUpdate(true);
+
NetworkClientDelegate.PollResult res =
requestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
assertFalse(fetchOffsetsFuture.isDone());
// Cluster metadata update. Previously failed attempt to build the
request should be retried
// and succeed
- expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
+ mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
requestManager.onUpdate(new ClusterResource(""));
assertEquals(1, requestManager.requestsToSend());
@@ -232,7 +245,7 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// List offsets request successfully built
- expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
+ mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>>
fetchOffsetsFuture = requestManager.fetchOffsets(
timestampsToSearch,
false);
@@ -255,7 +268,7 @@ public class OffsetsRequestManagerTest {
assertEquals(0, requestManager.requestsToSend());
// Cluster metadata update. Failed requests should be retried and
succeed
- expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
+ mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
requestManager.onUpdate(new ClusterResource(""));
assertEquals(1, requestManager.requestsToSend());
@@ -280,7 +293,7 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// List offsets request successfully built
- expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
+ mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>>
fetchOffsetsFuture = requestManager.fetchOffsets(
timestampsToSearch,
false);
@@ -317,7 +330,7 @@ public class OffsetsRequestManagerTest {
Map<TopicPartition, Node> partitionLeaders = new HashMap<>();
partitionLeaders.put(TEST_PARTITION_1, LEADER_1);
partitionLeaders.put(TEST_PARTITION_2, LEADER_2);
- expectSuccessfulRequest(partitionLeaders);
+ mockSuccessfulRequest(partitionLeaders);
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>>
fetchOffsetsFuture = requestManager.fetchOffsets(
timestampsToSearch,
false);
@@ -347,7 +360,7 @@ public class OffsetsRequestManagerTest {
assertEquals(0, requestManager.requestsToSend());
// Cluster metadata update. Failed requests should be retried
- expectSuccessfulRequest(partitionLeaders);
+ mockSuccessfulRequest(partitionLeaders);
requestManager.onUpdate(new ClusterResource(""));
assertEquals(1, requestManager.requestsToSend());
@@ -370,7 +383,7 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// List offsets request successfully built
- expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
+ mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>>
fetchOffsetsFuture =
requestManager.fetchOffsets(
timestampsToSearch,
@@ -399,7 +412,7 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// List offsets request successfully built
- expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
+ mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>>
fetchOffsetsFuture =
requestManager.fetchOffsets(
timestampsToSearch,
@@ -431,7 +444,7 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// List offsets request successfully built
- expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
+ mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>>
fetchOffsetsFuture =
requestManager.fetchOffsets(
timestampsToSearch,
@@ -455,6 +468,192 @@ public class OffsetsRequestManagerTest {
assertEquals(0, requestManager.requestsToSend());
}
+ @Test
+ public void testResetPositionsSendNoRequestIfNoPartitionsNeedingReset() {
+
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.emptySet());
+ requestManager.resetPositionsIfNeeded();
+ assertEquals(0, requestManager.requestsToSend());
+ }
+
+ @Test
+ public void testResetPositionsMissingLeader() {
+ mockFailedRequest_MissingLeader();
+
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+
when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST);
+ requestManager.resetPositionsIfNeeded();
+ verify(metadata).requestUpdate(true);
+ assertEquals(0, requestManager.requestsToSend());
+ }
+
+ @Test
+ public void testResetPositionsSuccess_NoLeaderEpochInResponse() {
+
testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch.noLeaderOrEpoch());
+ verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
+ }
+
+ @Test
+ public void testResetPositionsSuccess_LeaderEpochInResponse() {
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+ Optional.of(5));
+ testResetPositionsSuccessWithLeaderEpoch(leaderAndEpoch);
+ verify(metadata).updateLastSeenEpochIfNewer(TEST_PARTITION_1,
leaderAndEpoch.epoch.get());
+ }
+
+ @Test
+ public void testResetPositionsThrowsPreviousException() {
+
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+
when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST);
+ mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1,
LEADER_1));
+
+ requestManager.resetPositionsIfNeeded();
+
+ // Reset positions response with TopicAuthorizationException
+ NetworkClientDelegate.PollResult res =
requestManager.poll(time.milliseconds());
+ NetworkClientDelegate.UnsentRequest unsentRequest =
res.unsentRequests.get(0);
+ ClientResponse clientResponse = buildClientResponseWithErrors(
+ unsentRequest, Collections.singletonMap(TEST_PARTITION_1,
Errors.TOPIC_AUTHORIZATION_FAILED));
+ clientResponse.onComplete();
+
+ assertTrue(unsentRequest.future().isDone());
+ assertFalse(unsentRequest.future().isCompletedExceptionally());
+
+ verify(subscriptionState).requestFailed(any(), anyLong());
+ verify(metadata).requestUpdate(false);
+
+ // Following resetPositions should raise the previous exception
without performing any
+ // request
+ assertThrows(TopicAuthorizationException.class,
+ () -> requestManager.resetPositionsIfNeeded());
+ assertEquals(0, requestManager.requestsToSend());
+ }
+
+ @Test
+ public void testValidatePositionsSuccess() {
+ int currentOffset = 5;
+ int expectedEndOffset = 100;
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+ Optional.of(3));
+ TopicPartition tp = TEST_PARTITION_1;
+ SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(currentOffset,
+ Optional.of(10), leaderAndEpoch);
+
+ mockSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
+
+ requestManager.validatePositionsIfNeeded();
+ assertEquals(1, requestManager.requestsToSend(), "Invalid request
count");
+
+ verify(subscriptionState).setNextAllowedRetry(any(), anyLong());
+
+ // Validate positions response with end offsets
+ when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(LEADER_1,
leaderAndEpoch.epoch));
+ NetworkClientDelegate.PollResult pollResult =
requestManager.poll(time.milliseconds());
+ NetworkClientDelegate.UnsentRequest unsentRequest =
pollResult.unsentRequests.get(0);
+ ClientResponse clientResponse =
buildOffsetsForLeaderEpochResponse(unsentRequest,
+ Collections.singletonList(tp), expectedEndOffset);
+ clientResponse.onComplete();
+ assertTrue(unsentRequest.future().isDone());
+ assertFalse(unsentRequest.future().isCompletedExceptionally());
+ verify(subscriptionState).maybeCompleteValidation(any(), any(), any());
+ }
+
+ @Test
+ public void testValidatePositionsMissingLeader() {
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.of(Node.noNode()),
+ Optional.of(5));
+ SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(5L,
+ Optional.of(10), leaderAndEpoch);
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+ when(subscriptionState.position(any())).thenReturn(position, position);
+ NodeApiVersions nodeApiVersions = NodeApiVersions.create();
+ when(apiVersions.get(LEADER_1.idString())).thenReturn(nodeApiVersions);
+ requestManager.validatePositionsIfNeeded();
+ verify(metadata).requestUpdate(true);
+ assertEquals(0, requestManager.requestsToSend());
+ }
+
+ @Test
+ public void testValidatePositionsFailureWithUnrecoverableAuthException() {
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+ Optional.of(5));
+ SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(5L,
+ Optional.of(10), leaderAndEpoch);
+ mockSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
+
+ requestManager.validatePositionsIfNeeded();
+
+ // Validate positions response with TopicAuthorizationException
+ NetworkClientDelegate.PollResult res =
requestManager.poll(time.milliseconds());
+ NetworkClientDelegate.UnsentRequest unsentRequest =
res.unsentRequests.get(0);
+ ClientResponse clientResponse =
+ buildOffsetsForLeaderEpochResponseWithErrors(unsentRequest,
Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED));
+ clientResponse.onComplete();
+
+ assertTrue(unsentRequest.future().isDone());
+ assertFalse(unsentRequest.future().isCompletedExceptionally());
+
+ // Following validatePositions should raise the previous exception
without performing any
+ // request
+ assertThrows(TopicAuthorizationException.class, () ->
requestManager.validatePositionsIfNeeded());
+ assertEquals(0, requestManager.requestsToSend());
+ }
+
+ @Test
+ public void
testValidatePositionsAbortIfNoApiVersionsToCheckAgainstThenRecovers() {
+ int currentOffset = 5;
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+ Optional.of(3));
+ SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(currentOffset,
+ Optional.of(10), leaderAndEpoch);
+
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+ when(subscriptionState.position(any())).thenReturn(position, position);
+
+ // No api version info initially available
+ when(apiVersions.get(LEADER_1.idString())).thenReturn(null);
+ requestManager.validatePositionsIfNeeded();
+ assertEquals(0, requestManager.requestsToSend(), "Invalid request
count");
+ verify(subscriptionState,
never()).completeValidation(TEST_PARTITION_1);
+ verify(subscriptionState, never()).setNextAllowedRetry(any(),
anyLong());
+
+ // Api version updated, next validate positions should successfully
build the request
+
when(apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create());
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+ when(subscriptionState.position(any())).thenReturn(position, position);
+ requestManager.validatePositionsIfNeeded();
+ assertEquals(1, requestManager.requestsToSend(), "Invalid request
count");
+ }
+
+ private void
mockSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition
position, Node leader) {
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+ when(subscriptionState.position(any())).thenReturn(position, position);
+ NodeApiVersions nodeApiVersions = NodeApiVersions.create();
+ when(apiVersions.get(leader.idString())).thenReturn(nodeApiVersions);
+ }
+
+ private void
testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch
leaderAndEpoch) {
+ TopicPartition tp = TEST_PARTITION_1;
+ Node leader = LEADER_1;
+ OffsetResetStrategy strategy = OffsetResetStrategy.EARLIEST;
+ long offset = 5L;
+ Map<TopicPartition, OffsetAndTimestamp> expectedOffsets =
Collections.singletonMap(tp,
+ new OffsetAndTimestamp(offset, 1L, leaderAndEpoch.epoch));
+
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(tp));
+ when(subscriptionState.resetStrategy(any())).thenReturn(strategy);
+ mockSuccessfulRequest(Collections.singletonMap(tp, leader));
+
+ requestManager.resetPositionsIfNeeded();
+ assertEquals(1, requestManager.requestsToSend());
+
+ // Reset positions response with offsets
+ when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(leader,
leaderAndEpoch.epoch));
+ NetworkClientDelegate.PollResult pollResult =
requestManager.poll(time.milliseconds());
+ NetworkClientDelegate.UnsentRequest unsentRequest =
pollResult.unsentRequests.get(0);
+ ClientResponse clientResponse = buildClientResponse(unsentRequest,
expectedOffsets);
+ clientResponse.onComplete();
+ assertTrue(unsentRequest.future().isDone());
+ assertFalse(unsentRequest.future().isCompletedExceptionally());
+ }
+
private ListOffsetsResponseData.ListOffsetsTopicResponse
mockUnknownOffsetResponse(
TopicPartition tp) {
return new ListOffsetsResponseData.ListOffsetsTopicResponse()
@@ -488,21 +687,21 @@ public class OffsetsRequestManagerTest {
NetworkClientDelegate.PollResult retriedPoll =
requestManager.poll(time.milliseconds());
verifySuccessfulPollAwaitingResponse(retriedPoll);
NetworkClientDelegate.UnsentRequest unsentRequest =
retriedPoll.unsentRequests.get(0);
- ClientResponse clientResponse = buildClientResponse(unsentRequest,
- expectedResult);
+ ClientResponse clientResponse = buildClientResponse(unsentRequest,
expectedResult);
clientResponse.onComplete();
verifyRequestSuccessfullyCompleted(actualResult, expectedResult);
}
- private void expectSuccessfulRequest(Map<TopicPartition, Node>
partitionLeaders) {
+ private void mockSuccessfulRequest(Map<TopicPartition, Node>
partitionLeaders) {
partitionLeaders.forEach((tp, broker) -> {
-
when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(broker));
+ when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(broker,
+ Metadata.LeaderAndEpoch.noLeaderOrEpoch().epoch));
when(subscriptionState.isAssigned(tp)).thenReturn(true);
});
when(metadata.fetch()).thenReturn(testClusterMetadata(partitionLeaders));
}
- private void expectFailedRequest_MissingLeader() {
+ private void mockFailedRequest_MissingLeader() {
when(metadata.currentLeader(any(TopicPartition.class))).thenReturn(
new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1)));
when(subscriptionState.isAssigned(any(TopicPartition.class))).thenReturn(true);
@@ -563,9 +762,8 @@ public class OffsetsRequestManagerTest {
assertEquals(expectedFailure, failure.getCause().getClass());
}
- private Metadata.LeaderAndEpoch testLeaderEpoch(Node leader) {
- return new Metadata.LeaderAndEpoch(Optional.of(leader),
- Optional.of(1));
+ private Metadata.LeaderAndEpoch testLeaderEpoch(Node leader,
Optional<Integer> epoch) {
+ return new Metadata.LeaderAndEpoch(Optional.of(leader), epoch);
}
private Cluster testClusterMetadata(Map<TopicPartition, Node>
partitionLeaders) {
@@ -597,6 +795,75 @@ public class OffsetsRequestManagerTest {
return buildClientResponse(request, topicResponses, false, null);
}
+ private ClientResponse buildOffsetsForLeaderEpochResponse(
+ final NetworkClientDelegate.UnsentRequest request,
+ final List<TopicPartition> partitions,
+ final int endOffset) {
+
+ AbstractRequest abstractRequest = request.requestBuilder().build();
+ assertTrue(abstractRequest instanceof OffsetsForLeaderEpochRequest);
+ OffsetsForLeaderEpochRequest offsetsForLeaderEpochRequest =
(OffsetsForLeaderEpochRequest) abstractRequest;
+ OffsetForLeaderEpochResponseData data = new
OffsetForLeaderEpochResponseData();
+ partitions.forEach(tp -> {
+ OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult topic
= data.topics().find(tp.topic());
+ if (topic == null) {
+ topic = new
OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(tp.topic());
+ data.topics().add(topic);
+ }
+ topic.partitions().add(new
OffsetForLeaderEpochResponseData.EpochEndOffset()
+ .setPartition(tp.partition())
+ .setErrorCode(Errors.NONE.code())
+ .setLeaderEpoch(3)
+ .setEndOffset(endOffset));
+ });
+
+ OffsetsForLeaderEpochResponse response = new
OffsetsForLeaderEpochResponse(data);
+ return new ClientResponse(
+ new RequestHeader(ApiKeys.OFFSET_FOR_LEADER_EPOCH,
offsetsForLeaderEpochRequest.version(), "", 1),
+ request.callback(),
+ "-1",
+ time.milliseconds(),
+ time.milliseconds(),
+ false,
+ null,
+ null,
+ response
+ );
+ }
+
+ private ClientResponse buildOffsetsForLeaderEpochResponseWithErrors(
+ final NetworkClientDelegate.UnsentRequest request,
+ final Map<TopicPartition, Errors> partitionErrors) {
+
+ AbstractRequest abstractRequest = request.requestBuilder().build();
+ assertTrue(abstractRequest instanceof OffsetsForLeaderEpochRequest);
+ OffsetsForLeaderEpochRequest offsetsForLeaderEpochRequest =
(OffsetsForLeaderEpochRequest) abstractRequest;
+ OffsetForLeaderEpochResponseData data = new
OffsetForLeaderEpochResponseData();
+ partitionErrors.keySet().forEach(tp -> {
+ OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult topic
= data.topics().find(tp.topic());
+ if (topic == null) {
+ topic = new
OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(tp.topic());
+ data.topics().add(topic);
+ }
+ topic.partitions().add(new
OffsetForLeaderEpochResponseData.EpochEndOffset()
+ .setPartition(tp.partition())
+ .setErrorCode(partitionErrors.get(tp).code()));
+ });
+
+ OffsetsForLeaderEpochResponse response = new
OffsetsForLeaderEpochResponse(data);
+ return new ClientResponse(
+ new RequestHeader(ApiKeys.OFFSET_FOR_LEADER_EPOCH,
offsetsForLeaderEpochRequest.version(), "", 1),
+ request.callback(),
+ "-1",
+ time.milliseconds(),
+ time.milliseconds(),
+ false,
+ null,
+ null,
+ response
+ );
+ }
+
private ClientResponse buildClientResponse(
final NetworkClientDelegate.UnsentRequest request,
final List<ListOffsetsResponseData.ListOffsetsTopicResponse>
topicResponses) {