This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a7e865c0a75 KAFKA-15115 - KAFKA-15163; Reset/Validate positions 
implementation & API integration (#14346)
a7e865c0a75 is described below

commit a7e865c0a756504cc7ae6f4eb0772cadd3333c53
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Sep 13 10:49:41 2023 -0400

    KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API 
integration (#14346)
    
    Implementation of the required functionality for resetting and validating 
positions in the new async consumer.
    
    This PR includes:
    1. New async application events ResetPositionsApplicationEvent and 
ValidatePositionsApplicationEvent, both handled by the same 
OfffsetsRequestManager.
    2. Integration of the reset/validate functionality in the new async 
consumer, to update fetch positions using the partitions offsets.
    3. Minor refactoring to extract functionality that is reused from both 
consumer implementations (moving logic without changes from OffsetFetcher into 
OffsetFetchUtils, and from OffsetsForLeaderEpochClient into 
OffsetsForLeaderEpochUtils)
    
    Reviewers: Philip Nee <[email protected]>, Kirk True 
<[email protected]>, Jun Rao<[email protected]>
---
 .../clients/consumer/LogTruncationException.java   |   5 +
 .../internals/DefaultBackgroundThread.java         |  10 +-
 .../clients/consumer/internals/OffsetFetcher.java  |  60 +---
 .../consumer/internals/OffsetFetcherUtils.java     |  88 +++++-
 .../internals/OffsetsForLeaderEpochClient.java     | 104 +------
 ...Client.java => OffsetsForLeaderEpochUtils.java} |  79 +++---
 .../consumer/internals/OffsetsRequestManager.java  | 224 ++++++++++++++-
 .../consumer/internals/PrototypeAsyncConsumer.java |  34 +++
 .../internals/events/ApplicationEvent.java         |   2 +-
 .../events/ApplicationEventProcessor.java          |  14 +
 .../events/CompletableApplicationEvent.java        |   2 +-
 .../events/ListOffsetsApplicationEvent.java        |   2 +-
 .../events/OffsetFetchApplicationEvent.java        |   2 +-
 ...nt.java => ResetPositionsApplicationEvent.java} |  22 +-
 ...java => ValidatePositionsApplicationEvent.java} |  22 +-
 .../internals/DefaultBackgroundThreadTest.java     |  87 ++++++
 .../internals/OffsetForLeaderEpochClientTest.java  |  18 +-
 .../internals/OffsetsRequestManagerTest.java       | 315 +++++++++++++++++++--
 18 files changed, 808 insertions(+), 282 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
index 336eed4a3b4..f13f61e156d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
@@ -33,6 +33,11 @@ public class LogTruncationException extends 
OffsetOutOfRangeException {
 
     private final Map<TopicPartition, OffsetAndMetadata> divergentOffsets;
 
+    public LogTruncationException(Map<TopicPartition, Long> fetchOffsets,
+                                  Map<TopicPartition, OffsetAndMetadata> 
divergentOffsets) {
+        this("Truncated partitions detected with divergent offsets " + 
divergentOffsets, fetchOffsets, divergentOffsets);
+    }
+
     public LogTruncationException(String message,
                                   Map<TopicPartition, Long> fetchOffsets,
                                   Map<TopicPartition, OffsetAndMetadata> 
divergentOffsets) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index de3ff49d328..1e2c8a7935f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -48,9 +48,15 @@ import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configur
  * Background thread runnable that consumes {@code ApplicationEvent} and
  * produces {@code BackgroundEvent}. It uses an event loop to consume and
  * produce events, and poll the network client to handle network IO.
- * <p>
+ * <p/>
  * It holds a reference to the {@link SubscriptionState}, which is
  * initialized by the polling thread.
+ * <p/>
+ * For processing application events that have been submitted to the
+ * {@link #applicationEventQueue}, this relies on an {@link 
ApplicationEventProcessor}. Processing includes generating requests and
+ * handling responses with the appropriate {@link RequestManager}. The network 
operations for
+ * actually sending the requests is delegated to the {@link 
NetworkClientDelegate}
+ * </li>
  */
 public class DefaultBackgroundThread extends KafkaThread {
     private static final long MAX_POLL_TIMEOUT_MS = 5000;
@@ -148,6 +154,7 @@ public class DefaultBackgroundThread extends KafkaThread {
             this.groupState = new GroupState(rebalanceConfig);
             long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
             long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+            final int requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 
             OffsetsRequestManager offsetsRequestManager =
                     new OffsetsRequestManager(
@@ -156,6 +163,7 @@ public class DefaultBackgroundThread extends KafkaThread {
                             configuredIsolationLevel(config),
                             time,
                             retryBackoffMs,
+                            requestTimeoutMs,
                             apiVersions,
                             logContext);
             CoordinatorRequestManager coordinatorRequestManager = null;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index 234919eaf65..33a05bdcf89 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -21,12 +21,10 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.StaleMetadataException;
-import org.apache.kafka.clients.consumer.LogTruncationException;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
 import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
-import 
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import 
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils.OffsetForEpochResult;
 import 
org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.Node;
@@ -41,13 +39,10 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
@@ -70,7 +65,6 @@ public class OffsetFetcher {
     private final SubscriptionState subscriptions;
     private final ConsumerNetworkClient client;
     private final Time time;
-    private final long retryBackoffMs;
     private final long requestTimeoutMs;
     private final IsolationLevel isolationLevel;
     private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
@@ -91,7 +85,6 @@ public class OffsetFetcher {
         this.client = client;
         this.metadata = metadata;
         this.subscriptions = subscriptions;
-        this.retryBackoffMs = retryBackoffMs;
         this.requestTimeoutMs = requestTimeoutMs;
         this.isolationLevel = isolationLevel;
         this.apiVersions = apiVersions;
@@ -227,16 +220,12 @@ public class OffsetFetcher {
             future.addListener(new RequestFutureListener<ListOffsetResult>() {
                 @Override
                 public void onSuccess(ListOffsetResult result) {
-                    
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(
-                            resetTimestamps,
-                            result);
+                    
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, 
result);
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
-                    offsetFetcherUtils.onFailedRequestForResettingPositions(
-                            resetTimestamps,
-                            e);
+                    
offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e);
                 }
             });
         }
@@ -284,55 +273,18 @@ public class OffsetFetcher {
             future.addListener(new 
RequestFutureListener<OffsetForEpochResult>() {
                 @Override
                 public void onSuccess(OffsetForEpochResult offsetsResult) {
-                    List<SubscriptionState.LogTruncation> truncations = new 
ArrayList<>();
-                    if (!offsetsResult.partitionsToRetry().isEmpty()) {
-                        
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), 
time.milliseconds() + retryBackoffMs);
-                        metadata.requestUpdate(false);
-                    }
-
-                    // For each OffsetsForLeader response, check if the 
end-offset is lower than our current offset
-                    // for the partition. If so, it means we have experienced 
log truncation and need to reposition
-                    // that partition's offset.
-                    //
-                    // In addition, check whether the returned offset and 
epoch are valid. If not, then we should reset
-                    // its offset if reset policy is configured, or throw out 
of range exception.
-                    offsetsResult.endOffsets().forEach((topicPartition, 
respEndOffset) -> {
-                        FetchPosition requestPosition = 
fetchPositions.get(topicPartition);
-                        Optional<SubscriptionState.LogTruncation> 
truncationOpt =
-                                
subscriptions.maybeCompleteValidation(topicPartition, requestPosition, 
respEndOffset);
-                        truncationOpt.ifPresent(truncations::add);
-                    });
-
-                    if (!truncations.isEmpty()) {
-                        
offsetFetcherUtils.maybeSetOffsetForLeaderException(buildLogTruncationException(truncations));
-                    }
+                    
offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(fetchPositions,
+                            offsetsResult);
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
-                    subscriptions.requestFailed(fetchPositions.keySet(), 
time.milliseconds() + retryBackoffMs);
-                    metadata.requestUpdate(false);
-
-                    if (!(e instanceof RetriableException)) {
-                        offsetFetcherUtils.maybeSetOffsetForLeaderException(e);
-                    }
+                    
offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e);
                 }
             });
         });
     }
 
-    private LogTruncationException 
buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) {
-        Map<TopicPartition, OffsetAndMetadata> divergentOffsets = new 
HashMap<>();
-        Map<TopicPartition, Long> truncatedFetchOffsets = new HashMap<>();
-        for (SubscriptionState.LogTruncation truncation : truncations) {
-            truncation.divergentOffsetOpt.ifPresent(divergentOffset ->
-                    divergentOffsets.put(truncation.topicPartition, 
divergentOffset));
-            truncatedFetchOffsets.put(truncation.topicPartition, 
truncation.fetchPosition.offset);
-        }
-        return new LogTruncationException("Detected truncated partitions: " + 
truncations,
-                truncatedFetchOffsets, divergentOffsets);
-    }
-
     /**
      * Search the offsets by target times for the specified partitions.
      *
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index e8550bec645..b7fdefeb0d1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -18,6 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.IsolationLevel;
@@ -37,9 +39,11 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -59,8 +63,19 @@ class OffsetFetcherUtils {
     private final ApiVersions apiVersions;
     private final Logger log;
 
-    private final AtomicReference<RuntimeException> 
cachedOffsetForLeaderException = new AtomicReference<>();
-    private final AtomicReference<RuntimeException> cachedListOffsetsException 
= new AtomicReference<>();
+    /**
+     * Exception that occurred while validating positions, that will be 
propagated on the next
+     * call to validate positions. This could be an error received in the
+     * OffsetsForLeaderEpoch response, or a LogTruncationException detected 
when using a
+     * successful response to validate the positions. It will be cleared when 
thrown.
+     */
+    private final AtomicReference<RuntimeException> 
cachedValidatePositionsException = new AtomicReference<>();
+    /**
+     * Exception that occurred while resetting positions, that will be 
propagated on the next
+     * call to reset positions. This will have the error received in the 
response to the
+     * ListOffsets request. It will be cleared when thrown on the next call to 
reset.
+     */
+    private final AtomicReference<RuntimeException> 
cachedResetPositionsException = new AtomicReference<>();
     private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
 
     OffsetFetcherUtils(LogContext logContext,
@@ -171,7 +186,7 @@ class OffsetFetcherUtils {
     }
 
     Map<TopicPartition, SubscriptionState.FetchPosition> 
getPartitionsToValidate() {
-        RuntimeException exception = 
cachedOffsetForLeaderException.getAndSet(null);
+        RuntimeException exception = 
cachedValidatePositionsException.getAndSet(null);
         if (exception != null)
             throw exception;
 
@@ -187,9 +202,9 @@ class OffsetFetcherUtils {
                 .collect(Collectors.toMap(Function.identity(), 
subscriptionState::position));
     }
 
-    void maybeSetOffsetForLeaderException(RuntimeException e) {
-        if (!cachedOffsetForLeaderException.compareAndSet(null, e)) {
-            log.error("Discarding error in OffsetsForLeaderEpoch because 
another error is pending", e);
+    void maybeSetValidatePositionsException(RuntimeException e) {
+        if (!cachedValidatePositionsException.compareAndSet(null, e)) {
+            log.error("Discarding error validating positions because another 
error is pending", e);
         }
     }
 
@@ -209,7 +224,7 @@ class OffsetFetcherUtils {
 
     Map<TopicPartition, Long> getOffsetResetTimestamp() {
         // Raise exception from previous offset fetch if there is one
-        RuntimeException exception = 
cachedListOffsetsException.getAndSet(null);
+        RuntimeException exception = 
cachedResetPositionsException.getAndSet(null);
         if (exception != null)
             throw exception;
 
@@ -285,7 +300,7 @@ class OffsetFetcherUtils {
             return null;
     }
 
-    void onSuccessfulRequestForResettingPositions(
+    void onSuccessfulResponseForResettingPositions(
             final Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
             final ListOffsetResult result) {
         if (!result.partitionsToRetry.isEmpty()) {
@@ -304,15 +319,66 @@ class OffsetFetcherUtils {
         }
     }
 
-    void onFailedRequestForResettingPositions(
+    void onFailedResponseForResettingPositions(
             final Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
             final RuntimeException error) {
         subscriptionState.requestFailed(resetTimestamps.keySet(), 
time.milliseconds() + retryBackoffMs);
         metadata.requestUpdate(false);
 
-        if (!(error instanceof RetriableException) && 
!cachedListOffsetsException.compareAndSet(null,
+        if (!(error instanceof RetriableException) && 
!cachedResetPositionsException.compareAndSet(null,
                 error))
-            log.error("Discarding error in ListOffsetResponse because another 
error is pending", error);
+            log.error("Discarding error resetting positions because another 
error is pending",
+                    error);
+    }
+
+
+    void onSuccessfulResponseForValidatingPositions(
+            final Map<TopicPartition, SubscriptionState.FetchPosition> 
fetchPositions,
+            final OffsetsForLeaderEpochUtils.OffsetForEpochResult 
offsetsResult) {
+        List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
+        if (!offsetsResult.partitionsToRetry().isEmpty()) {
+            
subscriptionState.setNextAllowedRetry(offsetsResult.partitionsToRetry(),
+                    time.milliseconds() + retryBackoffMs);
+            metadata.requestUpdate(false);
+        }
+
+        // For each OffsetsForLeader response, check if the end-offset is 
lower than our current offset
+        // for the partition. If so, it means we have experienced log 
truncation and need to reposition
+        // that partition's offset.
+        // In addition, check whether the returned offset and epoch are valid. 
If not, then we should reset
+        // its offset if reset policy is configured, or throw out of range 
exception.
+        offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> {
+            SubscriptionState.FetchPosition requestPosition = 
fetchPositions.get(topicPartition);
+            Optional<SubscriptionState.LogTruncation> truncationOpt =
+                    subscriptionState.maybeCompleteValidation(topicPartition, 
requestPosition,
+                            respEndOffset);
+            truncationOpt.ifPresent(truncations::add);
+        });
+
+        if (!truncations.isEmpty()) {
+            
maybeSetValidatePositionsException(buildLogTruncationException(truncations));
+        }
+    }
+
+    void onFailedResponseForValidatingPositions(final Map<TopicPartition, 
SubscriptionState.FetchPosition> fetchPositions,
+                                                final RuntimeException error) {
+        subscriptionState.requestFailed(fetchPositions.keySet(), 
time.milliseconds() + retryBackoffMs);
+        metadata.requestUpdate(false);
+
+        if (!(error instanceof RetriableException)) {
+            maybeSetValidatePositionsException(error);
+        }
+    }
+
+    private LogTruncationException 
buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) {
+        Map<TopicPartition, OffsetAndMetadata> divergentOffsets = new 
HashMap<>();
+        Map<TopicPartition, Long> truncatedFetchOffsets = new HashMap<>();
+        for (SubscriptionState.LogTruncation truncation : truncations) {
+            truncation.divergentOffsetOpt.ifPresent(divergentOffset ->
+                    divergentOffsets.put(truncation.topicPartition, 
divergentOffset));
+            truncatedFetchOffsets.put(truncation.topicPartition, 
truncation.fetchPosition.offset);
+        }
+        return new LogTruncationException(truncatedFetchOffsets, 
divergentOffsets);
     }
 
     // Visible for testing
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index 57650f52fe3..6bfdce392da 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -18,23 +18,12 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TopicAuthorizationException;
-import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
-import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic;
-import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
-import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
-import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
 import org.apache.kafka.common.utils.LogContext;
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * Convenience class for making asynchronous requests to the 
OffsetsForLeaderEpoch API
@@ -43,7 +32,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
         Map<TopicPartition, SubscriptionState.FetchPosition>,
         OffsetsForLeaderEpochRequest,
         OffsetsForLeaderEpochResponse,
-        OffsetsForLeaderEpochClient.OffsetForEpochResult> {
+        OffsetsForLeaderEpochUtils.OffsetForEpochResult> {
 
     OffsetsForLeaderEpochClient(ConsumerNetworkClient client, LogContext 
logContext) {
         super(client, logContext);
@@ -52,98 +41,15 @@ public class OffsetsForLeaderEpochClient extends 
AsyncClient<
     @Override
     protected AbstractRequest.Builder<OffsetsForLeaderEpochRequest> 
prepareRequest(
             Node node, Map<TopicPartition, SubscriptionState.FetchPosition> 
requestData) {
-        OffsetForLeaderTopicCollection topics = new 
OffsetForLeaderTopicCollection(requestData.size());
-        requestData.forEach((topicPartition, fetchPosition) ->
-            fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
-                OffsetForLeaderTopic topic = 
topics.find(topicPartition.topic());
-                if (topic == null) {
-                    topic = new 
OffsetForLeaderTopic().setTopic(topicPartition.topic());
-                    topics.add(topic);
-                }
-                topic.partitions().add(new OffsetForLeaderPartition()
-                    .setPartition(topicPartition.partition())
-                    .setLeaderEpoch(fetchEpoch)
-                    .setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
-                        .orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
-                );
-            })
-        );
-        return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
+        return OffsetsForLeaderEpochUtils.prepareRequest(requestData);
     }
 
     @Override
-    protected OffsetForEpochResult handleResponse(
+    protected OffsetsForLeaderEpochUtils.OffsetForEpochResult handleResponse(
             Node node,
             Map<TopicPartition, SubscriptionState.FetchPosition> requestData,
             OffsetsForLeaderEpochResponse response) {
 
-        Set<TopicPartition> partitionsToRetry = new 
HashSet<>(requestData.keySet());
-        Set<String> unauthorizedTopics = new HashSet<>();
-        Map<TopicPartition, EpochEndOffset> endOffsets = new HashMap<>();
-
-        for (OffsetForLeaderTopicResult topic : response.data().topics()) {
-            for (EpochEndOffset partition : topic.partitions()) {
-                TopicPartition topicPartition = new 
TopicPartition(topic.topic(), partition.partition());
-
-                if (!requestData.containsKey(topicPartition)) {
-                    logger().warn("Received unrequested topic or partition {} 
from response, ignoring.", topicPartition);
-                    continue;
-                }
-
-                Errors error = Errors.forCode(partition.errorCode());
-                switch (error) {
-                    case NONE:
-                        logger().debug("Handling OffsetsForLeaderEpoch 
response for {}. Got offset {} for epoch {}.",
-                            topicPartition, partition.endOffset(), 
partition.leaderEpoch());
-                        endOffsets.put(topicPartition, partition);
-                        partitionsToRetry.remove(topicPartition);
-                        break;
-                    case NOT_LEADER_OR_FOLLOWER:
-                    case REPLICA_NOT_AVAILABLE:
-                    case KAFKA_STORAGE_ERROR:
-                    case OFFSET_NOT_AVAILABLE:
-                    case LEADER_NOT_AVAILABLE:
-                    case FENCED_LEADER_EPOCH:
-                    case UNKNOWN_LEADER_EPOCH:
-                        logger().debug("Attempt to fetch offsets for partition 
{} failed due to {}, retrying.",
-                            topicPartition, error);
-                        break;
-                    case UNKNOWN_TOPIC_OR_PARTITION:
-                        logger().warn("Received unknown topic or partition 
error in OffsetsForLeaderEpoch request for partition {}.",
-                            topicPartition);
-                        break;
-                    case TOPIC_AUTHORIZATION_FAILED:
-                        unauthorizedTopics.add(topicPartition.topic());
-                        partitionsToRetry.remove(topicPartition);
-                        break;
-                    default:
-                        logger().warn("Attempt to fetch offsets for partition 
{} failed due to: {}, retrying.",
-                            topicPartition, error.message());
-                }
-            }
-        }
-
-        if (!unauthorizedTopics.isEmpty())
-            throw new TopicAuthorizationException(unauthorizedTopics);
-        else
-            return new OffsetForEpochResult(endOffsets, partitionsToRetry);
-    }
-
-    public static class OffsetForEpochResult {
-        private final Map<TopicPartition, EpochEndOffset> endOffsets;
-        private final Set<TopicPartition> partitionsToRetry;
-
-        OffsetForEpochResult(Map<TopicPartition, EpochEndOffset> endOffsets, 
Set<TopicPartition> partitionsNeedingRetry) {
-            this.endOffsets = endOffsets;
-            this.partitionsToRetry = partitionsNeedingRetry;
-        }
-
-        public Map<TopicPartition, EpochEndOffset> endOffsets() {
-            return endOffsets;
-        }
-
-        public Set<TopicPartition> partitionsToRetry() {
-            return partitionsToRetry;
-        }
+        return OffsetsForLeaderEpochUtils.handleResponse(requestData, 
response);
     }
-}
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
similarity index 64%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
index 57650f52fe3..4970446a3e5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
@@ -29,7 +28,8 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
-import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -37,43 +37,36 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * Convenience class for making asynchronous requests to the 
OffsetsForLeaderEpoch API
+ * Utility methods for preparing requests to the OffsetsForLeaderEpoch API and 
handling responses.
  */
-public class OffsetsForLeaderEpochClient extends AsyncClient<
-        Map<TopicPartition, SubscriptionState.FetchPosition>,
-        OffsetsForLeaderEpochRequest,
-        OffsetsForLeaderEpochResponse,
-        OffsetsForLeaderEpochClient.OffsetForEpochResult> {
-
-    OffsetsForLeaderEpochClient(ConsumerNetworkClient client, LogContext 
logContext) {
-        super(client, logContext);
-    }
+public final class OffsetsForLeaderEpochUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OffsetsForLeaderEpochUtils.class);
+
+    private OffsetsForLeaderEpochUtils(){}
 
-    @Override
-    protected AbstractRequest.Builder<OffsetsForLeaderEpochRequest> 
prepareRequest(
-            Node node, Map<TopicPartition, SubscriptionState.FetchPosition> 
requestData) {
+    static AbstractRequest.Builder<OffsetsForLeaderEpochRequest> 
prepareRequest(
+            Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
         OffsetForLeaderTopicCollection topics = new 
OffsetForLeaderTopicCollection(requestData.size());
         requestData.forEach((topicPartition, fetchPosition) ->
-            fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
-                OffsetForLeaderTopic topic = 
topics.find(topicPartition.topic());
-                if (topic == null) {
-                    topic = new 
OffsetForLeaderTopic().setTopic(topicPartition.topic());
-                    topics.add(topic);
-                }
-                topic.partitions().add(new OffsetForLeaderPartition()
-                    .setPartition(topicPartition.partition())
-                    .setLeaderEpoch(fetchEpoch)
-                    .setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
-                        .orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
-                );
-            })
+                fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
+                    OffsetForLeaderTopic topic = 
topics.find(topicPartition.topic());
+                    if (topic == null) {
+                        topic = new 
OffsetForLeaderTopic().setTopic(topicPartition.topic());
+                        topics.add(topic);
+                    }
+                    topic.partitions().add(new OffsetForLeaderPartition()
+                            .setPartition(topicPartition.partition())
+                            .setLeaderEpoch(fetchEpoch)
+                            
.setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
+                                    
.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+                    );
+                })
         );
         return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
     }
 
-    @Override
-    protected OffsetForEpochResult handleResponse(
-            Node node,
+    public static OffsetForEpochResult handleResponse(
             Map<TopicPartition, SubscriptionState.FetchPosition> requestData,
             OffsetsForLeaderEpochResponse response) {
 
@@ -86,15 +79,15 @@ public class OffsetsForLeaderEpochClient extends 
AsyncClient<
                 TopicPartition topicPartition = new 
TopicPartition(topic.topic(), partition.partition());
 
                 if (!requestData.containsKey(topicPartition)) {
-                    logger().warn("Received unrequested topic or partition {} 
from response, ignoring.", topicPartition);
+                    LOG.warn("Received unrequested topic or partition {} from 
response, ignoring.", topicPartition);
                     continue;
                 }
 
                 Errors error = Errors.forCode(partition.errorCode());
                 switch (error) {
                     case NONE:
-                        logger().debug("Handling OffsetsForLeaderEpoch 
response for {}. Got offset {} for epoch {}.",
-                            topicPartition, partition.endOffset(), 
partition.leaderEpoch());
+                        LOG.debug("Handling OffsetsForLeaderEpoch response for 
{}. Got offset {} for epoch {}.",
+                                topicPartition, partition.endOffset(), 
partition.leaderEpoch());
                         endOffsets.put(topicPartition, partition);
                         partitionsToRetry.remove(topicPartition);
                         break;
@@ -105,31 +98,31 @@ public class OffsetsForLeaderEpochClient extends 
AsyncClient<
                     case LEADER_NOT_AVAILABLE:
                     case FENCED_LEADER_EPOCH:
                     case UNKNOWN_LEADER_EPOCH:
-                        logger().debug("Attempt to fetch offsets for partition 
{} failed due to {}, retrying.",
-                            topicPartition, error);
+                        LOG.debug("Attempt to fetch offsets for partition {} 
failed due to {}, retrying.",
+                                topicPartition, error);
                         break;
                     case UNKNOWN_TOPIC_OR_PARTITION:
-                        logger().warn("Received unknown topic or partition 
error in OffsetsForLeaderEpoch request for partition {}.",
-                            topicPartition);
+                        LOG.warn("Received unknown topic or partition error in 
OffsetsForLeaderEpoch request for partition {}.",
+                                topicPartition);
                         break;
                     case TOPIC_AUTHORIZATION_FAILED:
                         unauthorizedTopics.add(topicPartition.topic());
                         partitionsToRetry.remove(topicPartition);
                         break;
                     default:
-                        logger().warn("Attempt to fetch offsets for partition 
{} failed due to: {}, retrying.",
-                            topicPartition, error.message());
+                        LOG.warn("Attempt to fetch offsets for partition {} 
failed due to: {}, retrying.",
+                                topicPartition, error.message());
                 }
             }
         }
 
         if (!unauthorizedTopics.isEmpty())
             throw new TopicAuthorizationException(unauthorizedTopics);
-        else
-            return new OffsetForEpochResult(endOffsets, partitionsToRetry);
+
+        return new OffsetForEpochResult(endOffsets, partitionsToRetry);
     }
 
-    public static class OffsetForEpochResult {
+    static class OffsetForEpochResult {
         private final Map<TopicPartition, EpochEndOffset> endOffsets;
         private final Set<TopicPartition> partitionsToRetry;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index a0e3b1013b7..10977b7f78e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -18,7 +18,9 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
 import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
@@ -28,8 +30,11 @@ import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.ListOffsetsRequestData;
+import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
@@ -48,6 +53,8 @@ import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.hasUsableOffsetForLeaderEpochVersion;
+import static 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupFetchPositionsByLeader;
 
 /**
  * Manager responsible for building the following requests to retrieve 
partition offsets, and
@@ -68,15 +75,20 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
     private final IsolationLevel isolationLevel;
     private final Logger log;
     private final OffsetFetcherUtils offsetFetcherUtils;
+    private final SubscriptionState subscriptionState;
 
     private final Set<ListOffsetsRequestState> requestsToRetry;
     private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
+    private final long requestTimeoutMs;
+    private final Time time;
+    private final ApiVersions apiVersions;
 
     public OffsetsRequestManager(final SubscriptionState subscriptionState,
                                  final ConsumerMetadata metadata,
                                  final IsolationLevel isolationLevel,
                                  final Time time,
                                  final long retryBackoffMs,
+                                 final long requestTimeoutMs,
                                  final ApiVersions apiVersions,
                                  final LogContext logContext) {
         requireNonNull(subscriptionState);
@@ -91,6 +103,10 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
         this.log = logContext.logger(getClass());
         this.requestsToRetry = new HashSet<>();
         this.requestsToSend = new ArrayList<>();
+        this.subscriptionState = subscriptionState;
+        this.time = time;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.apiVersions = apiVersions;
         this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, 
subscriptionState,
                 time, retryBackoffMs, apiVersions);
         // Register the cluster metadata update callback. Note this only 
relies on the
@@ -154,6 +170,52 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
                 
OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, 
result.fetchedOffsets));
     }
 
+    /**
+     * Reset offsets for all assigned partitions that require it. Offsets will 
be reset
+     * with timestamps according to the reset strategy defined for each 
partition. This will
+     * generate ListOffsets requests for the partitions and timestamps, and 
enqueue them to be sent
+     * on the next call to {@link #poll(long)}.
+     *
+     * <p/>
+     *
+     * When a response is received, positions are updated in-memory, on the 
subscription state. If
+     * an error is received in the response, it will be saved to be thrown on 
the next call to
+     * this function (ex. {@link 
org.apache.kafka.common.errors.TopicAuthorizationException})
+     */
+    public void resetPositionsIfNeeded() {
+        Map<TopicPartition, Long> offsetResetTimestamps = 
offsetFetcherUtils.getOffsetResetTimestamp();
+
+        if (offsetResetTimestamps.isEmpty())
+            return;
+
+        List<NetworkClientDelegate.UnsentRequest> unsentRequests =
+                
buildListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
+        requestsToSend.addAll(unsentRequests);
+    }
+
+    /**
+     * Validate positions for all assigned partitions for which a leader 
change has been detected.
+     * This will generate OffsetsForLeaderEpoch requests for the partitions, 
with the known offset
+     * epoch and current leader epoch. It will enqueue the generated requests, 
to be sent on the
+     * next call to {@link #poll(long)}.
+     *
+     * <p/>
+     *
+     * When a response is received, positions are validated and, if a log 
truncation is
+     * detected, a {@link LogTruncationException} will be saved in memory, to 
be thrown on the
+     * next call to this function.
+     */
+    public void validatePositionsIfNeeded() {
+        Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate =
+                offsetFetcherUtils.getPartitionsToValidate();
+        if (partitionsToValidate.isEmpty()) {
+            return;
+        }
+        List<NetworkClientDelegate.UnsentRequest> unsentRequests =
+                
buildOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate);
+        requestsToSend.addAll(unsentRequests);
+    }
+
     /**
      * Generate requests for partitions with known leaders. Update the 
listOffsetsRequestState by adding
      * partitions with unknown leader to the 
listOffsetsRequestState.remainingToSearch
@@ -205,14 +267,14 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
             final boolean requireTimestamps,
             final ListOffsetsRequestState listOffsetsRequestState) {
         log.debug("Building ListOffsets request for partitions {}", 
timestampsToSearch);
-        Map<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> partitionResetTimestampsByNode =
+        Map<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
                 groupListOffsetRequests(timestampsToSearch, 
Optional.of(listOffsetsRequestState));
-        if (partitionResetTimestampsByNode.isEmpty()) {
+        if (timestampsToSearchByNode.isEmpty()) {
             throw new StaleMetadataException();
         }
 
         final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
-        MultiNodeRequest multiNodeRequest = new 
MultiNodeRequest(partitionResetTimestampsByNode.size());
+        MultiNodeRequest multiNodeRequest = new 
MultiNodeRequest(timestampsToSearchByNode.size());
         multiNodeRequest.onComplete((multiNodeResult, error) -> {
             // Done sending request to a set of known leaders
             if (error == null) {
@@ -235,7 +297,7 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
             }
         });
 
-        for (Map.Entry<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> entry : 
partitionResetTimestampsByNode.entrySet()) {
+        for (Map.Entry<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> entry : 
timestampsToSearchByNode.entrySet()) {
             Node node = entry.getKey();
 
             CompletableFuture<ListOffsetResult> partialResult = 
buildListOffsetRequestToNode(
@@ -268,7 +330,7 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
                 .forConsumer(requireTimestamps, isolationLevel, false)
                 
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
 
-        log.debug("Creating ListOffsetRequest {} for broker {} to reset 
positions", builder,
+        log.debug("Creating ListOffset request {} for broker {} to reset 
positions", builder,
                 node);
 
         NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
@@ -278,7 +340,7 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
         CompletableFuture<ListOffsetResult> result = new CompletableFuture<>();
         unsentRequest.future().whenComplete((response, error) -> {
             if (error != null) {
-                log.debug("Sending ListOffsetRequest {} to broker {} failed",
+                log.debug("Sending ListOffset request {} to broker {} failed",
                         builder,
                         node,
                         error);
@@ -298,6 +360,154 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
         return result;
     }
 
+    /**
+     * Make asynchronous ListOffsets request to fetch offsets by target times 
for the specified
+     * partitions.
+     * Use the retrieved offsets to reset positions in the subscription state.
+     *
+     * @param timestampsToSearch the mapping between partitions and target time
+     * @return A list of
+     * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
+     * that can be polled to obtain the corresponding timestamps and offsets.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
buildListOffsetsRequestsAndResetPositions(
+            final Map<TopicPartition, Long> timestampsToSearch) {
+        Map<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
+                groupListOffsetRequests(timestampsToSearch, Optional.empty());
+
+        final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
+
+        timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
+            subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(),
+                    time.milliseconds() + requestTimeoutMs);
+
+            CompletableFuture<ListOffsetResult> partialResult = 
buildListOffsetRequestToNode(
+                    node,
+                    resetTimestamps,
+                    false,
+                    unsentRequests);
+
+            partialResult.whenComplete((result, error) -> {
+                if (error == null) {
+                    
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
+                            result);
+                } else {
+                    RuntimeException e;
+                    if (error instanceof RuntimeException) {
+                        e = (RuntimeException) error;
+                    } else {
+                        e = new RuntimeException("Unexpected failure in 
ListOffsets request for " +
+                                "resetting positions", error);
+                    }
+                    
offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e);
+                }
+            });
+        });
+        return unsentRequests;
+    }
+
+    /**
+     * For each partition that needs validation, make an asynchronous request 
to get the end-offsets
+     * for the partition with the epoch less than or equal to the epoch the 
partition last saw.
+     * <p/>
+     * Requests are grouped by Node for efficiency.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
buildOffsetsForLeaderEpochRequestsAndValidatePositions(
+            Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate) {
+
+        final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> 
regrouped =
+                regroupFetchPositionsByLeader(partitionsToValidate);
+
+        long nextResetTimeMs = time.milliseconds() + requestTimeoutMs;
+        final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
+        regrouped.forEach((node, fetchPositions) -> {
+
+            if (node.isEmpty()) {
+                metadata.requestUpdate(true);
+                return;
+            }
+
+            NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+            if (nodeApiVersions == null) {
+                return;
+            }
+
+            if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+                log.debug("Skipping validation of fetch offsets for partitions 
{} since the broker does not " +
+                                "support the required protocol version 
(introduced in Kafka 2.3)",
+                        fetchPositions.keySet());
+                for (TopicPartition partition : fetchPositions.keySet()) {
+                    subscriptionState.completeValidation(partition);
+                }
+                return;
+            }
+
+            subscriptionState.setNextAllowedRetry(fetchPositions.keySet(), 
nextResetTimeMs);
+
+            CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> 
partialResult =
+                    buildOffsetsForLeaderEpochRequestToNode(node, 
fetchPositions, unsentRequests);
+
+            partialResult.whenComplete((offsetsResult, error) -> {
+                if (error == null) {
+                    
offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(fetchPositions,
+                            offsetsResult);
+                } else {
+                    RuntimeException e;
+                    if (error instanceof RuntimeException) {
+                        e = (RuntimeException) error;
+                    } else {
+                        e = new RuntimeException("Unexpected failure in 
OffsetsForLeaderEpoch " +
+                                "request for validating positions", error);
+                    }
+                    
offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e);
+                }
+            });
+
+        });
+
+        return unsentRequests;
+    }
+
+    /**
+     * Build OffsetsForLeaderEpoch request to send to a specific broker for 
the partitions and
+     * positions to fetch. This also adds the request to the list of 
unsentRequests.
+     **/
+    private CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> 
buildOffsetsForLeaderEpochRequestToNode(
+            final Node node,
+            final Map<TopicPartition, SubscriptionState.FetchPosition> 
fetchPositions,
+            List<NetworkClientDelegate.UnsentRequest> unsentRequests) {
+        AbstractRequest.Builder<OffsetsForLeaderEpochRequest> builder =
+                OffsetsForLeaderEpochUtils.prepareRequest(fetchPositions);
+
+        log.debug("Creating OffsetsForLeaderEpoch request request {} to broker 
{}", builder, node);
+
+        NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                builder,
+                Optional.ofNullable(node));
+        unsentRequests.add(unsentRequest);
+        CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> 
result = new CompletableFuture<>();
+        unsentRequest.future().whenComplete((response, error) -> {
+            if (error != null) {
+                log.debug("Sending OffsetsForLeaderEpoch request {} to broker 
{} failed",
+                        builder,
+                        node,
+                        error);
+                result.completeExceptionally(error);
+            } else {
+                OffsetsForLeaderEpochResponse offsetsForLeaderEpochResponse = 
(OffsetsForLeaderEpochResponse) response.responseBody();
+                log.trace("Received OffsetsForLeaderEpoch response {} from 
broker {}", offsetsForLeaderEpochResponse, node);
+                try {
+                    OffsetsForLeaderEpochUtils.OffsetForEpochResult 
listOffsetResult =
+                            
OffsetsForLeaderEpochUtils.handleResponse(fetchPositions, 
offsetsForLeaderEpochResponse);
+                    result.complete(listOffsetResult);
+                } catch (RuntimeException e) {
+                    result.completeExceptionally(e);
+                }
+            }
+        });
+        return result;
+    }
+
     private static class ListOffsetsRequestState {
 
         private final Map<TopicPartition, Long> timestampsToSearch;
@@ -383,7 +593,7 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
 
             if (!leaderAndEpoch.leader.isPresent()) {
                 log.debug("Leader for partition {} is unknown for fetching 
offset {}", tp, offset);
-                metadata.requestUpdate(false);
+                metadata.requestUpdate(true);
                 listOffsetsRequestState.ifPresent(offsetsRequestState -> 
offsetsRequestState.remainingToSearch.put(tp, offset));
             } else {
                 int currentLeaderEpoch = 
leaderAndEpoch.epoch.orElse(ListOffsetsResponse.UNKNOWN_EPOCH);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index 88f3e239376..f6b8ede5745 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerInterceptor;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -35,6 +36,8 @@ import 
org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -193,6 +196,9 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
                     // be processed in the collectFetches().
                     backgroundEvent.ifPresent(event -> processEvent(event, 
timeout));
                 }
+
+                updateFetchPositionsIfNeeded();
+
                 // The idea here is to have the background thread sending 
fetches autonomously, and the fetcher
                 // uses the poll loop to retrieve successful fetchResponse and 
process them on the polling thread.
                 final Fetch<K, V> fetch = collectFetches();
@@ -209,6 +215,34 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
         return ConsumerRecords.empty();
     }
 
+    /**
+     * Set the fetch position to the committed position (if there is one) or 
reset it using the
+     * offset reset policy the user has configured (if partitions require 
reset)
+     *
+     * @return true if the operation completed without timing out
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws NoOffsetForPartitionException                          If no 
offset is stored for a given partition and no offset reset policy is
+     *                                                                defined
+     */
+    private boolean updateFetchPositionsIfNeeded() {
+        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+        ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
+        eventHandler.add(validatePositionsEvent);
+
+        // TODO: integrate logic for refreshing committed offsets if available
+
+        // If there are partitions still needing a position and a reset policy 
is defined,
+        // request reset using the default policy. If no reset strategy is 
defined and there
+        // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
+        subscriptions.resetInitializingPositions();
+
+        // Finally send an asynchronous request to look up and update the 
positions of any
+        // partitions which are awaiting reset.
+        ResetPositionsApplicationEvent resetPositionsEvent = new 
ResetPositionsApplicationEvent();
+        eventHandler.add(resetPositionsEvent);
+        return true;
+    }
+
     /**
      * Commit offsets returned on the last {@link #poll(Duration) poll()} for 
all the subscribed list of topics and
      * partitions.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index e0fe1e0f306..eb1bffaf81d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -33,6 +33,6 @@ abstract public class ApplicationEvent {
 
     public enum Type {
         NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-        LIST_OFFSETS,
+        LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS,
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index ab3291c47de..2cfbc2d04e4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -62,6 +62,10 @@ public class ApplicationEventProcessor {
                 return process((AssignmentChangeApplicationEvent) event);
             case LIST_OFFSETS:
                 return process((ListOffsetsApplicationEvent) event);
+            case RESET_POSITIONS:
+                return processResetPositionsEvent();
+            case VALIDATE_POSITIONS:
+                return processValidatePositionsEvent();
         }
         return false;
     }
@@ -140,4 +144,14 @@ public class ApplicationEventProcessor {
         event.chain(future);
         return true;
     }
+
+    private boolean processResetPositionsEvent() {
+        requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
+        return true;
+    }
+
+    private boolean processValidatePositionsEvent() {
+        requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
+        return true;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
index 5deecc4150b..3bd862861a4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
@@ -91,7 +91,7 @@ public abstract class CompletableApplicationEvent<T> extends 
ApplicationEvent {
 
     @Override
     public String toString() {
-        return "CompletableApplicationEvent{" +
+        return getClass().getSimpleName() + "{" +
                 "future=" + future +
                 ", type=" + type +
                 '}';
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
index a6d48cdcae0..91b032c3478 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
@@ -85,7 +85,7 @@ public class ListOffsetsApplicationEvent extends 
CompletableApplicationEvent<Map
 
     @Override
     public String toString() {
-        return "ListOffsetsApplicationEvent {" +
+        return getClass().getSimpleName() + " {" +
                 "timestampsToSearch=" + timestampsToSearch + ", " +
                 "requireTimestamps=" + requireTimestamps + '}';
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
index 7fa9dd1c432..e53248425d9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
@@ -55,7 +55,7 @@ public class OffsetFetchApplicationEvent extends 
CompletableApplicationEvent<Map
 
     @Override
     public String toString() {
-        return "OffsetFetchApplicationEvent{" +
+        return getClass().getSimpleName() + "{" +
                 "partitions=" + partitions +
                 ", future=" + future +
                 ", type=" + type +
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
similarity index 65%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
index e0fe1e0f306..5d9b07f9de0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
@@ -14,25 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.clients.consumer.internals.events;
 
 /**
- * This is the abstract definition of the events created by the KafkaConsumer 
API
+ * Event for resetting offsets for all assigned partitions that require it. 
This is an
+ * asynchronous event that generates ListOffsets requests, and completes by 
updating in-memory
+ * positions when responses are received.
  */
-abstract public class ApplicationEvent {
-    public final Type type;
-
-    protected ApplicationEvent(Type type) {
-        this.type = type;
-    }
-
-    @Override
-    public String toString() {
-        return type + " ApplicationEvent";
-    }
+public class ResetPositionsApplicationEvent extends 
CompletableApplicationEvent<Void> {
 
-    public enum Type {
-        NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-        LIST_OFFSETS,
+    public ResetPositionsApplicationEvent() {
+        super(Type.RESET_POSITIONS);
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
similarity index 65%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
index e0fe1e0f306..3b093e0b683 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
@@ -14,25 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.clients.consumer.internals.events;
 
 /**
- * This is the abstract definition of the events created by the KafkaConsumer 
API
+ * Event for validating offsets for all assigned partitions for which a leader 
change has been
+ * detected. This is an asynchronous event that generates OffsetForLeaderEpoch 
requests, and
+ * completes by validating in-memory positions against the offsets received in 
the responses.
  */
-abstract public class ApplicationEvent {
-    public final Type type;
-
-    protected ApplicationEvent(Type type) {
-        this.type = type;
-    }
-
-    @Override
-    public String toString() {
-        return type + " ApplicationEvent";
-    }
+public class ValidatePositionsApplicationEvent extends 
CompletableApplicationEvent<Void> {
 
-    public enum Type {
-        NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-        LIST_OFFSETS,
+    public ValidatePositionsApplicationEvent() {
+        super(Type.VALIDATE_POSITIONS);
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
index 112d8bacfb2..b5a1ced617a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.LogTruncationException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
@@ -27,7 +28,10 @@ import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent
 import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
 import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -53,9 +57,11 @@ import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -181,6 +187,87 @@ public class DefaultBackgroundThreadTest {
         assertTrue(applicationEventsQueue.isEmpty());
         backgroundThread.close();
     }
+
+    @Test
+    public void testResetPositionsEventIsProcessed() {
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+        ResetPositionsApplicationEvent e = new 
ResetPositionsApplicationEvent();
+        this.applicationEventsQueue.add(e);
+        backgroundThread.runOnce();
+        
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
+        assertTrue(applicationEventsQueue.isEmpty());
+        backgroundThread.close();
+    }
+
+    @Test
+    public void testResetPositionsProcessFailure() {
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+        applicationEventProcessor = spy(new ApplicationEventProcessor(
+                this.backgroundEventsQueue,
+                mockRequestManagers(),
+                metadata));
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+
+        TopicAuthorizationException authException = new 
TopicAuthorizationException("Topic authorization failed");
+        
doThrow(authException).when(offsetsRequestManager).resetPositionsIfNeeded();
+
+        ResetPositionsApplicationEvent event = new 
ResetPositionsApplicationEvent();
+        this.applicationEventsQueue.add(event);
+        assertThrows(TopicAuthorizationException.class, 
backgroundThread::runOnce);
+
+        
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
+        backgroundThread.close();
+    }
+
+    @Test
+    public void testValidatePositionsEventIsProcessed() {
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+        ValidatePositionsApplicationEvent e = new 
ValidatePositionsApplicationEvent();
+        this.applicationEventsQueue.add(e);
+        backgroundThread.runOnce();
+        
verify(applicationEventProcessor).process(any(ValidatePositionsApplicationEvent.class));
+        assertTrue(applicationEventsQueue.isEmpty());
+        backgroundThread.close();
+    }
+
+    @Test
+    public void testValidatePositionsProcessFailure() {
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+        applicationEventProcessor = spy(new ApplicationEventProcessor(
+                this.backgroundEventsQueue,
+                mockRequestManagers(),
+                metadata));
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+
+        LogTruncationException logTruncationException = new 
LogTruncationException(Collections.emptyMap(), Collections.emptyMap());
+        
doThrow(logTruncationException).when(offsetsRequestManager).validatePositionsIfNeeded();
+
+        ValidatePositionsApplicationEvent event = new 
ValidatePositionsApplicationEvent();
+        this.applicationEventsQueue.add(event);
+        assertThrows(LogTruncationException.class, backgroundThread::runOnce);
+
+        
verify(applicationEventProcessor).process(any(ValidatePositionsApplicationEvent.class));
+        backgroundThread.close();
+    }
+
     @Test
     public void testAssignmentChangeEvent() {
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
index 43c4130a485..57c61babd11 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
@@ -55,7 +55,7 @@ public class OffsetForLeaderEpochClientTest {
     @Test
     public void testEmptyResponse() {
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
-        RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future 
=
+        RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
                 offsetClient.sendAsyncRequest(Node.noNode(), 
Collections.emptyMap());
 
         OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(
@@ -63,7 +63,7 @@ public class OffsetForLeaderEpochClientTest {
         client.prepareResponse(resp);
         consumerClient.pollNoWakeup();
 
-        OffsetsForLeaderEpochClient.OffsetForEpochResult result = 
future.value();
+        OffsetsForLeaderEpochUtils.OffsetForEpochResult result = 
future.value();
         assertTrue(result.partitionsToRetry().isEmpty());
         assertTrue(result.endOffsets().isEmpty());
     }
@@ -75,7 +75,7 @@ public class OffsetForLeaderEpochClientTest {
                 new Metadata.LeaderAndEpoch(Optional.empty(), 
Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
-        RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future 
=
+        RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
                 offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
 
         OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(
@@ -83,7 +83,7 @@ public class OffsetForLeaderEpochClientTest {
         client.prepareResponse(resp);
         consumerClient.pollNoWakeup();
 
-        OffsetsForLeaderEpochClient.OffsetForEpochResult result = 
future.value();
+        OffsetsForLeaderEpochUtils.OffsetForEpochResult result = 
future.value();
         assertFalse(result.partitionsToRetry().isEmpty());
         assertTrue(result.endOffsets().isEmpty());
     }
@@ -95,14 +95,14 @@ public class OffsetForLeaderEpochClientTest {
                 new Metadata.LeaderAndEpoch(Optional.empty(), 
Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
-        RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future 
=
+        RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
                 offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
 
         client.prepareResponse(prepareOffsetForLeaderEpochResponse(
             tp0, Errors.NONE, 1, 10L));
         consumerClient.pollNoWakeup();
 
-        OffsetsForLeaderEpochClient.OffsetForEpochResult result = 
future.value();
+        OffsetsForLeaderEpochUtils.OffsetForEpochResult result = 
future.value();
         assertTrue(result.partitionsToRetry().isEmpty());
         assertTrue(result.endOffsets().containsKey(tp0));
         assertEquals(result.endOffsets().get(tp0).errorCode(), 
Errors.NONE.code());
@@ -117,7 +117,7 @@ public class OffsetForLeaderEpochClientTest {
                 new Metadata.LeaderAndEpoch(Optional.empty(), 
Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
-        RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future 
=
+        RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
                 offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
 
         client.prepareResponse(prepareOffsetForLeaderEpochResponse(
@@ -136,7 +136,7 @@ public class OffsetForLeaderEpochClientTest {
                 new Metadata.LeaderAndEpoch(Optional.empty(), 
Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
-        RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future 
=
+        RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
                 offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
 
         client.prepareResponse(prepareOffsetForLeaderEpochResponse(
@@ -144,7 +144,7 @@ public class OffsetForLeaderEpochClientTest {
         consumerClient.pollNoWakeup();
 
         assertFalse(future.failed());
-        OffsetsForLeaderEpochClient.OffsetForEpochResult result = 
future.value();
+        OffsetsForLeaderEpochUtils.OffsetForEpochResult result = 
future.value();
         assertTrue(result.partitionsToRetry().contains(tp0));
         assertFalse(result.endOffsets().containsKey(tp0));
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index 720fc1fec93..da762edf31c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.IsolationLevel;
@@ -29,11 +31,14 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
@@ -64,7 +69,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -75,6 +83,7 @@ public class OffsetsRequestManagerTest {
     private ConsumerMetadata metadata;
     private SubscriptionState subscriptionState;
     private MockTime time;
+    private ApiVersions apiVersions;
     private static final String TEST_TOPIC = "t1";
     private static final TopicPartition TEST_PARTITION_1 = new 
TopicPartition(TEST_TOPIC, 1);
     private static final TopicPartition TEST_PARTITION_2 = new 
TopicPartition(TEST_TOPIC, 2);
@@ -82,15 +91,16 @@ public class OffsetsRequestManagerTest {
     private static final Node LEADER_2 = new Node(0, "host2", 9092);
     private static final IsolationLevel DEFAULT_ISOLATION_LEVEL = 
IsolationLevel.READ_COMMITTED;
     private static final int RETRY_BACKOFF_MS = 500;
+    private static final int REQUEST_TIMEOUT_MS = 500;
 
     @BeforeEach
     public void setup() {
         metadata = mock(ConsumerMetadata.class);
         subscriptionState = mock(SubscriptionState.class);
         this.time = new MockTime(0);
-        ApiVersions apiVersions = mock(ApiVersions.class);
+        apiVersions = mock(ApiVersions.class);
         requestManager = new OffsetsRequestManager(subscriptionState, metadata,
-                DEFAULT_ISOLATION_LEVEL, time, RETRY_BACKOFF_MS,
+                DEFAULT_ISOLATION_LEVEL, time, RETRY_BACKOFF_MS, 
REQUEST_TIMEOUT_MS,
                 apiVersions, new LogContext());
     }
 
@@ -99,7 +109,7 @@ public class OffsetsRequestManagerTest {
         Map<TopicPartition, Long> timestampsToSearch = 
Collections.singletonMap(TEST_PARTITION_1,
                 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
-        expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
+        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
         CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = 
requestManager.fetchOffsets(
                 timestampsToSearch,
                 false);
@@ -116,11 +126,12 @@ public class OffsetsRequestManagerTest {
                 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
         // Building list offsets request fails with unknown leader
-        expectFailedRequest_MissingLeader();
+        mockFailedRequest_MissingLeader();
         CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture =
                 requestManager.fetchOffsets(timestampsToSearch, false);
         assertEquals(0, requestManager.requestsToSend());
         assertEquals(1, requestManager.requestsToRetry());
+        verify(metadata).requestUpdate(true);
         NetworkClientDelegate.PollResult res = 
requestManager.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
         // Metadata update not happening within the time boundaries of the 
request future, so
@@ -139,7 +150,7 @@ public class OffsetsRequestManagerTest {
         Map<TopicPartition, Node> partitionLeaders = new HashMap<>();
         partitionLeaders.put(TEST_PARTITION_1, LEADER_1);
         partitionLeaders.put(TEST_PARTITION_2, LEADER_1);
-        expectSuccessfulRequest(partitionLeaders);
+        mockSuccessfulRequest(partitionLeaders);
         CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = 
requestManager.fetchOffsets(
                 timestampsToSearch,
                 false);
@@ -176,7 +187,7 @@ public class OffsetsRequestManagerTest {
         Map<TopicPartition, Long> timestampsToSearch = 
Collections.singletonMap(TEST_PARTITION_1,
                 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
-        expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
+        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
         CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = 
requestManager.fetchOffsets(
                 timestampsToSearch,
                 false);
@@ -204,19 +215,21 @@ public class OffsetsRequestManagerTest {
                 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
         // Building list offsets request fails with unknown leader
-        expectFailedRequest_MissingLeader();
+        mockFailedRequest_MissingLeader();
         CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture =
                 requestManager.fetchOffsets(timestampsToSearch,
                         false);
         assertEquals(0, requestManager.requestsToSend());
         assertEquals(1, requestManager.requestsToRetry());
+        verify(metadata).requestUpdate(true);
+
         NetworkClientDelegate.PollResult res = 
requestManager.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
         assertFalse(fetchOffsetsFuture.isDone());
 
         // Cluster metadata update. Previously failed attempt to build the 
request should be retried
         // and succeed
-        expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
+        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
         requestManager.onUpdate(new ClusterResource(""));
         assertEquals(1, requestManager.requestsToSend());
 
@@ -232,7 +245,7 @@ public class OffsetsRequestManagerTest {
                 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
         // List offsets request successfully built
-        expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
+        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
         CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture = requestManager.fetchOffsets(
                 timestampsToSearch,
                 false);
@@ -255,7 +268,7 @@ public class OffsetsRequestManagerTest {
         assertEquals(0, requestManager.requestsToSend());
 
         // Cluster metadata update. Failed requests should be retried and 
succeed
-        expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
+        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
         requestManager.onUpdate(new ClusterResource(""));
         assertEquals(1, requestManager.requestsToSend());
 
@@ -280,7 +293,7 @@ public class OffsetsRequestManagerTest {
                 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
         // List offsets request successfully built
-        expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
+        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
         CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture = requestManager.fetchOffsets(
                 timestampsToSearch,
                 false);
@@ -317,7 +330,7 @@ public class OffsetsRequestManagerTest {
         Map<TopicPartition, Node> partitionLeaders = new HashMap<>();
         partitionLeaders.put(TEST_PARTITION_1, LEADER_1);
         partitionLeaders.put(TEST_PARTITION_2, LEADER_2);
-        expectSuccessfulRequest(partitionLeaders);
+        mockSuccessfulRequest(partitionLeaders);
         CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture = requestManager.fetchOffsets(
                 timestampsToSearch,
                 false);
@@ -347,7 +360,7 @@ public class OffsetsRequestManagerTest {
         assertEquals(0, requestManager.requestsToSend());
 
         // Cluster metadata update. Failed requests should be retried
-        expectSuccessfulRequest(partitionLeaders);
+        mockSuccessfulRequest(partitionLeaders);
         requestManager.onUpdate(new ClusterResource(""));
         assertEquals(1, requestManager.requestsToSend());
 
@@ -370,7 +383,7 @@ public class OffsetsRequestManagerTest {
                 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
         // List offsets request successfully built
-        expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
+        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
         CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture =
                 requestManager.fetchOffsets(
                         timestampsToSearch,
@@ -399,7 +412,7 @@ public class OffsetsRequestManagerTest {
                 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
         // List offsets request successfully built
-        expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
+        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
         CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture =
                 requestManager.fetchOffsets(
                         timestampsToSearch,
@@ -431,7 +444,7 @@ public class OffsetsRequestManagerTest {
                 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
         // List offsets request successfully built
-        expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
+        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
         CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture =
                 requestManager.fetchOffsets(
                         timestampsToSearch,
@@ -455,6 +468,192 @@ public class OffsetsRequestManagerTest {
         assertEquals(0, requestManager.requestsToSend());
     }
 
+    @Test
+    public void testResetPositionsSendNoRequestIfNoPartitionsNeedingReset() {
+        
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.emptySet());
+        requestManager.resetPositionsIfNeeded();
+        assertEquals(0, requestManager.requestsToSend());
+    }
+
+    @Test
+    public void testResetPositionsMissingLeader() {
+        mockFailedRequest_MissingLeader();
+        
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        
when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST);
+        requestManager.resetPositionsIfNeeded();
+        verify(metadata).requestUpdate(true);
+        assertEquals(0, requestManager.requestsToSend());
+    }
+
+    @Test
+    public void testResetPositionsSuccess_NoLeaderEpochInResponse() {
+        
testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch.noLeaderOrEpoch());
+        verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
+    }
+
+    @Test
+    public void testResetPositionsSuccess_LeaderEpochInResponse() {
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+                Optional.of(5));
+        testResetPositionsSuccessWithLeaderEpoch(leaderAndEpoch);
+        verify(metadata).updateLastSeenEpochIfNewer(TEST_PARTITION_1, 
leaderAndEpoch.epoch.get());
+    }
+
+    @Test
+    public void testResetPositionsThrowsPreviousException() {
+        
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        
when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST);
+        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
+
+        requestManager.resetPositionsIfNeeded();
+
+        // Reset positions response with TopicAuthorizationException
+        NetworkClientDelegate.PollResult res = 
requestManager.poll(time.milliseconds());
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
res.unsentRequests.get(0);
+        ClientResponse clientResponse = buildClientResponseWithErrors(
+                unsentRequest, Collections.singletonMap(TEST_PARTITION_1, 
Errors.TOPIC_AUTHORIZATION_FAILED));
+        clientResponse.onComplete();
+
+        assertTrue(unsentRequest.future().isDone());
+        assertFalse(unsentRequest.future().isCompletedExceptionally());
+
+        verify(subscriptionState).requestFailed(any(), anyLong());
+        verify(metadata).requestUpdate(false);
+
+        // Following resetPositions should raise the previous exception 
without performing any
+        // request
+        assertThrows(TopicAuthorizationException.class,
+                () -> requestManager.resetPositionsIfNeeded());
+        assertEquals(0, requestManager.requestsToSend());
+    }
+
+    @Test
+    public void testValidatePositionsSuccess() {
+        int currentOffset = 5;
+        int expectedEndOffset = 100;
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+                Optional.of(3));
+        TopicPartition tp = TEST_PARTITION_1;
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(currentOffset,
+                Optional.of(10), leaderAndEpoch);
+
+        mockSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
+
+        requestManager.validatePositionsIfNeeded();
+        assertEquals(1, requestManager.requestsToSend(), "Invalid request 
count");
+
+        verify(subscriptionState).setNextAllowedRetry(any(), anyLong());
+
+        // Validate positions response with end offsets
+        when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(LEADER_1, 
leaderAndEpoch.epoch));
+        NetworkClientDelegate.PollResult pollResult = 
requestManager.poll(time.milliseconds());
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
pollResult.unsentRequests.get(0);
+        ClientResponse clientResponse = 
buildOffsetsForLeaderEpochResponse(unsentRequest,
+                Collections.singletonList(tp), expectedEndOffset);
+        clientResponse.onComplete();
+        assertTrue(unsentRequest.future().isDone());
+        assertFalse(unsentRequest.future().isCompletedExceptionally());
+        verify(subscriptionState).maybeCompleteValidation(any(), any(), any());
+    }
+
+    @Test
+    public void testValidatePositionsMissingLeader() {
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(Node.noNode()),
+                Optional.of(5));
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(5L,
+                Optional.of(10), leaderAndEpoch);
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);
+        NodeApiVersions nodeApiVersions = NodeApiVersions.create();
+        when(apiVersions.get(LEADER_1.idString())).thenReturn(nodeApiVersions);
+        requestManager.validatePositionsIfNeeded();
+        verify(metadata).requestUpdate(true);
+        assertEquals(0, requestManager.requestsToSend());
+    }
+
+    @Test
+    public void testValidatePositionsFailureWithUnrecoverableAuthException() {
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+                Optional.of(5));
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(5L,
+                Optional.of(10), leaderAndEpoch);
+        mockSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
+
+        requestManager.validatePositionsIfNeeded();
+
+        // Validate positions response with TopicAuthorizationException
+        NetworkClientDelegate.PollResult res = 
requestManager.poll(time.milliseconds());
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
res.unsentRequests.get(0);
+        ClientResponse clientResponse =
+                buildOffsetsForLeaderEpochResponseWithErrors(unsentRequest, 
Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED));
+        clientResponse.onComplete();
+
+        assertTrue(unsentRequest.future().isDone());
+        assertFalse(unsentRequest.future().isCompletedExceptionally());
+
+        // Following validatePositions should raise the previous exception 
without performing any
+        // request
+        assertThrows(TopicAuthorizationException.class, () -> 
requestManager.validatePositionsIfNeeded());
+        assertEquals(0, requestManager.requestsToSend());
+    }
+
+    @Test
+    public void 
testValidatePositionsAbortIfNoApiVersionsToCheckAgainstThenRecovers() {
+        int currentOffset = 5;
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+                Optional.of(3));
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(currentOffset,
+                Optional.of(10), leaderAndEpoch);
+
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);
+
+        // No api version info initially available
+        when(apiVersions.get(LEADER_1.idString())).thenReturn(null);
+        requestManager.validatePositionsIfNeeded();
+        assertEquals(0, requestManager.requestsToSend(), "Invalid request 
count");
+        verify(subscriptionState, 
never()).completeValidation(TEST_PARTITION_1);
+        verify(subscriptionState, never()).setNextAllowedRetry(any(), 
anyLong());
+
+        // Api version updated, next validate positions should successfully 
build the request
+        
when(apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create());
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);
+        requestManager.validatePositionsIfNeeded();
+        assertEquals(1, requestManager.requestsToSend(), "Invalid request 
count");
+    }
+
+    private void 
mockSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition
 position, Node leader) {
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);
+        NodeApiVersions nodeApiVersions = NodeApiVersions.create();
+        when(apiVersions.get(leader.idString())).thenReturn(nodeApiVersions);
+    }
+
+    private void 
testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch 
leaderAndEpoch) {
+        TopicPartition tp = TEST_PARTITION_1;
+        Node leader = LEADER_1;
+        OffsetResetStrategy strategy = OffsetResetStrategy.EARLIEST;
+        long offset = 5L;
+        Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = 
Collections.singletonMap(tp,
+                new OffsetAndTimestamp(offset, 1L, leaderAndEpoch.epoch));
+        
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(tp));
+        when(subscriptionState.resetStrategy(any())).thenReturn(strategy);
+        mockSuccessfulRequest(Collections.singletonMap(tp, leader));
+
+        requestManager.resetPositionsIfNeeded();
+        assertEquals(1, requestManager.requestsToSend());
+
+        // Reset positions response with offsets
+        when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(leader, 
leaderAndEpoch.epoch));
+        NetworkClientDelegate.PollResult pollResult = 
requestManager.poll(time.milliseconds());
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
pollResult.unsentRequests.get(0);
+        ClientResponse clientResponse = buildClientResponse(unsentRequest, 
expectedOffsets);
+        clientResponse.onComplete();
+        assertTrue(unsentRequest.future().isDone());
+        assertFalse(unsentRequest.future().isCompletedExceptionally());
+    }
+
     private ListOffsetsResponseData.ListOffsetsTopicResponse 
mockUnknownOffsetResponse(
             TopicPartition tp) {
         return new ListOffsetsResponseData.ListOffsetsTopicResponse()
@@ -488,21 +687,21 @@ public class OffsetsRequestManagerTest {
         NetworkClientDelegate.PollResult retriedPoll = 
requestManager.poll(time.milliseconds());
         verifySuccessfulPollAwaitingResponse(retriedPoll);
         NetworkClientDelegate.UnsentRequest unsentRequest = 
retriedPoll.unsentRequests.get(0);
-        ClientResponse clientResponse = buildClientResponse(unsentRequest,
-                expectedResult);
+        ClientResponse clientResponse = buildClientResponse(unsentRequest, 
expectedResult);
         clientResponse.onComplete();
         verifyRequestSuccessfullyCompleted(actualResult, expectedResult);
     }
 
-    private void expectSuccessfulRequest(Map<TopicPartition, Node> 
partitionLeaders) {
+    private void mockSuccessfulRequest(Map<TopicPartition, Node> 
partitionLeaders) {
         partitionLeaders.forEach((tp, broker) -> {
-            
when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(broker));
+            when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(broker,
+                    Metadata.LeaderAndEpoch.noLeaderOrEpoch().epoch));
             when(subscriptionState.isAssigned(tp)).thenReturn(true);
         });
         
when(metadata.fetch()).thenReturn(testClusterMetadata(partitionLeaders));
     }
 
-    private void expectFailedRequest_MissingLeader() {
+    private void mockFailedRequest_MissingLeader() {
         when(metadata.currentLeader(any(TopicPartition.class))).thenReturn(
                 new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1)));
         
when(subscriptionState.isAssigned(any(TopicPartition.class))).thenReturn(true);
@@ -563,9 +762,8 @@ public class OffsetsRequestManagerTest {
         assertEquals(expectedFailure, failure.getCause().getClass());
     }
 
-    private Metadata.LeaderAndEpoch testLeaderEpoch(Node leader) {
-        return new Metadata.LeaderAndEpoch(Optional.of(leader),
-                Optional.of(1));
+    private Metadata.LeaderAndEpoch testLeaderEpoch(Node leader, 
Optional<Integer> epoch) {
+        return new Metadata.LeaderAndEpoch(Optional.of(leader), epoch);
     }
 
     private Cluster testClusterMetadata(Map<TopicPartition, Node> 
partitionLeaders) {
@@ -597,6 +795,75 @@ public class OffsetsRequestManagerTest {
         return buildClientResponse(request, topicResponses, false, null);
     }
 
+    private ClientResponse buildOffsetsForLeaderEpochResponse(
+            final NetworkClientDelegate.UnsentRequest request,
+            final List<TopicPartition> partitions,
+            final int endOffset) {
+
+        AbstractRequest abstractRequest = request.requestBuilder().build();
+        assertTrue(abstractRequest instanceof OffsetsForLeaderEpochRequest);
+        OffsetsForLeaderEpochRequest offsetsForLeaderEpochRequest = 
(OffsetsForLeaderEpochRequest) abstractRequest;
+        OffsetForLeaderEpochResponseData data = new 
OffsetForLeaderEpochResponseData();
+        partitions.forEach(tp -> {
+            OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult topic 
= data.topics().find(tp.topic());
+            if (topic == null) {
+                topic = new 
OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(tp.topic());
+                data.topics().add(topic);
+            }
+            topic.partitions().add(new 
OffsetForLeaderEpochResponseData.EpochEndOffset()
+                    .setPartition(tp.partition())
+                    .setErrorCode(Errors.NONE.code())
+                    .setLeaderEpoch(3)
+                    .setEndOffset(endOffset));
+        });
+
+        OffsetsForLeaderEpochResponse response = new 
OffsetsForLeaderEpochResponse(data);
+        return new ClientResponse(
+                new RequestHeader(ApiKeys.OFFSET_FOR_LEADER_EPOCH, 
offsetsForLeaderEpochRequest.version(), "", 1),
+                request.callback(),
+                "-1",
+                time.milliseconds(),
+                time.milliseconds(),
+                false,
+                null,
+                null,
+                response
+        );
+    }
+
+    private ClientResponse buildOffsetsForLeaderEpochResponseWithErrors(
+            final NetworkClientDelegate.UnsentRequest request,
+            final Map<TopicPartition, Errors> partitionErrors) {
+
+        AbstractRequest abstractRequest = request.requestBuilder().build();
+        assertTrue(abstractRequest instanceof OffsetsForLeaderEpochRequest);
+        OffsetsForLeaderEpochRequest offsetsForLeaderEpochRequest = 
(OffsetsForLeaderEpochRequest) abstractRequest;
+        OffsetForLeaderEpochResponseData data = new 
OffsetForLeaderEpochResponseData();
+        partitionErrors.keySet().forEach(tp -> {
+            OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult topic 
= data.topics().find(tp.topic());
+            if (topic == null) {
+                topic = new 
OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(tp.topic());
+                data.topics().add(topic);
+            }
+            topic.partitions().add(new 
OffsetForLeaderEpochResponseData.EpochEndOffset()
+                    .setPartition(tp.partition())
+                    .setErrorCode(partitionErrors.get(tp).code()));
+        });
+
+        OffsetsForLeaderEpochResponse response = new 
OffsetsForLeaderEpochResponse(data);
+        return new ClientResponse(
+                new RequestHeader(ApiKeys.OFFSET_FOR_LEADER_EPOCH, 
offsetsForLeaderEpochRequest.version(), "", 1),
+                request.callback(),
+                "-1",
+                time.milliseconds(),
+                time.milliseconds(),
+                false,
+                null,
+                null,
+                response
+        );
+    }
+
     private ClientResponse buildClientResponse(
             final NetworkClientDelegate.UnsentRequest request,
             final List<ListOffsetsResponseData.ListOffsetsTopicResponse> 
topicResponses) {

Reply via email to