This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a61b48d3dc KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic 
(#13898)
4a61b48d3dc is described below

commit 4a61b48d3dca81e28b57f10af6052f36c50a05e3
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Jul 5 20:20:49 2023 -0400

    KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic (#13898)
    
    This is a follow up on the initial OffsetFetcher refactoring to extract 
reusable logic, needed for the new consumer implementation (initial refactoring 
merged with PR-13815).
    
    Similar to the initial refactoring, this PR brings no changes to the 
existing logic, just extracting functions or pieces of logic.
    
    There were no individual tests for the extracted functions, so no tests 
were migrated.
    
    Reviewers: Jun Rao <[email protected]>
---
 .../clients/consumer/internals/OffsetFetcher.java  | 85 ++++-----------------
 .../consumer/internals/OffsetFetcherUtils.java     | 86 +++++++++++++++++++++-
 .../consumer/internals/SubscriptionState.java      |  2 +-
 3 files changed, 98 insertions(+), 75 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index a97a63a47e3..b7acb89d32c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.StaleMetadataException;
 import org.apache.kafka.clients.consumer.LogTruncationException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
 import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
 import 
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
@@ -34,12 +33,9 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
-import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
@@ -54,10 +50,14 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult;
+import static 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.hasUsableOffsetForLeaderEpochVersion;
+import static 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupFetchPositionsByLeader;
+import static 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.topicsForPartitions;
+
 /**
  * {@link OffsetFetcher} is responsible for fetching the {@link 
OffsetAndTimestamp offsets} for
  * a given set of {@link TopicPartition topic and partition pairs} and for 
validation and resetting of positions,
@@ -73,7 +73,6 @@ public class OffsetFetcher {
     private final long retryBackoffMs;
     private final long requestTimeoutMs;
     private final IsolationLevel isolationLevel;
-    private final AtomicReference<RuntimeException> cachedListOffsetsException 
= new AtomicReference<>();
     private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
     private final ApiVersions apiVersions;
     private final OffsetFetcherUtils offsetFetcherUtils;
@@ -98,16 +97,7 @@ public class OffsetFetcher {
         this.apiVersions = apiVersions;
         this.offsetsForLeaderEpochClient = new 
OffsetsForLeaderEpochClient(client, logContext);
         this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, 
subscriptions,
-                time, apiVersions);
-    }
-
-    private OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) 
{
-        if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
-            return OffsetResetStrategy.EARLIEST;
-        else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
-            return OffsetResetStrategy.LATEST;
-        else
-            return null;
+                time, retryBackoffMs, apiVersions);
     }
 
     /**
@@ -117,11 +107,6 @@ public class OffsetFetcher {
      *                                                                         
and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
      */
     public void resetPositionsIfNeeded() {
-        // Raise exception from previous offset fetch if there is one
-        RuntimeException exception = 
cachedListOffsetsException.getAndSet(null);
-        if (exception != null)
-            throw exception;
-
         Map<TopicPartition, Long> offsetResetTimestamps = 
offsetFetcherUtils.getOffsetResetTimestamp();
 
         if (offsetResetTimestamps.isEmpty())
@@ -142,13 +127,13 @@ public class OffsetFetcher {
 
     public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch,
                                                                    Timer 
timer) {
-        
metadata.addTransientTopics(offsetFetcherUtils.topicsForPartitions(timestampsToSearch.keySet()));
+        
metadata.addTransientTopics(topicsForPartitions(timestampsToSearch.keySet()));
 
         try {
             Map<TopicPartition, ListOffsetData> fetchedOffsets = 
fetchOffsetsByTimes(timestampsToSearch,
                     timer, true).fetchedOffsets;
 
-            return 
offsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, 
fetchedOffsets);
+            return buildOffsetsForTimesResult(timestampsToSearch, 
fetchedOffsets);
         } finally {
             metadata.clearTransientTopics();
         }
@@ -215,7 +200,7 @@ public class OffsetFetcher {
     private Map<TopicPartition, Long> 
beginningOrEndOffset(Collection<TopicPartition> partitions,
                                                            long timestamp,
                                                            Timer timer) {
-        
metadata.addTransientTopics(offsetFetcherUtils.topicsForPartitions(partitions));
+        metadata.addTransientTopics(topicsForPartitions(partitions));
         try {
             Map<TopicPartition, Long> timestampsToSearch = partitions.stream()
                     .distinct()
@@ -230,16 +215,6 @@ public class OffsetFetcher {
         }
     }
 
-    // Visible for testing
-    void resetPositionIfNeeded(TopicPartition partition, OffsetResetStrategy 
requestedResetStrategy, ListOffsetData offsetData) {
-        FetchPosition position = new FetchPosition(
-                offsetData.offset,
-                Optional.empty(), // This will ensure we skip validation
-                metadata.currentLeader(partition));
-        offsetData.leaderEpoch.ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(partition, epoch));
-        subscriptions.maybeSeekUnvalidated(partition, position, 
requestedResetStrategy);
-    }
-
     private void resetPositionsAsync(Map<TopicPartition, Long> 
partitionResetTimestamps) {
         Map<Node, Map<TopicPartition, ListOffsetsPartition>> 
timestampsToSearchByNode =
                 groupListOffsetRequests(partitionResetTimestamps, new 
HashSet<>());
@@ -252,39 +227,17 @@ public class OffsetFetcher {
             future.addListener(new RequestFutureListener<ListOffsetResult>() {
                 @Override
                 public void onSuccess(ListOffsetResult result) {
-                    if (!result.partitionsToRetry.isEmpty()) {
-                        subscriptions.requestFailed(result.partitionsToRetry, 
time.milliseconds() + retryBackoffMs);
-                        metadata.requestUpdate();
-                    }
-
-                    for (Map.Entry<TopicPartition, ListOffsetData> 
fetchedOffset : result.fetchedOffsets.entrySet()) {
-                        TopicPartition partition = fetchedOffset.getKey();
-                        ListOffsetData offsetData = fetchedOffset.getValue();
-                        ListOffsetsPartition requestedReset = 
resetTimestamps.get(partition);
-                        resetPositionIfNeeded(partition, 
timestampToOffsetResetStrategy(requestedReset.timestamp()), offsetData);
-                    }
+                    
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps, 
result);
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
-                    subscriptions.requestFailed(resetTimestamps.keySet(), 
time.milliseconds() + retryBackoffMs);
-                    metadata.requestUpdate();
-
-                    if (!(e instanceof RetriableException) && 
!cachedListOffsetsException.compareAndSet(null, e))
-                        log.error("Discarding error in ListOffsetResponse 
because another error is pending", e);
+                    
offsetFetcherUtils.onFailedRequestForResettingPositions(resetTimestamps, e);
                 }
             });
         }
     }
 
-    static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions 
nodeApiVersions) {
-        ApiVersion apiVersion = 
nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
-        if (apiVersion == null)
-            return false;
-
-        return 
OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion());
-    }
-
     /**
      * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
      * with the epoch less than or equal to the epoch the partition last saw.
@@ -294,8 +247,7 @@ public class OffsetFetcher {
      * Requests are grouped by Node for efficiency.
      */
     private void validatePositionsAsync(Map<TopicPartition, FetchPosition> 
partitionsToValidate) {
-        final Map<Node, Map<TopicPartition, FetchPosition>> regrouped =
-                regroupFetchPositionsByLeader(partitionsToValidate);
+        final Map<Node, Map<TopicPartition, FetchPosition>> regrouped = 
regroupFetchPositionsByLeader(partitionsToValidate);
 
         long nextResetTimeMs = time.milliseconds() + requestTimeoutMs;
         regrouped.forEach((node, fetchPositions) -> {
@@ -518,7 +470,6 @@ public class OffsetFetcher {
         }
     }
 
-
     /**
      * If we have seen new metadata (as tracked by {@link 
org.apache.kafka.clients.Metadata#updateVersion()}), then
      * we should check that all the assignments have a valid position.
@@ -526,14 +477,4 @@ public class OffsetFetcher {
     public void validatePositionsOnMetadataChange() {
         offsetFetcherUtils.validatePositionsOnMetadataChange();
     }
-
-    private Map<Node, Map<TopicPartition, FetchPosition>> 
regroupFetchPositionsByLeader(
-            Map<TopicPartition, FetchPosition> partitionMap) {
-        return partitionMap.entrySet()
-                .stream()
-                .filter(entry -> 
entry.getValue().currentLeader.leader.isPresent())
-                .collect(Collectors.groupingBy(entry -> 
entry.getValue().currentLeader.leader.get(),
-                        Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
-    }
-
-}
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index c4c16d5eaf6..c2b3847d5a5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -17,16 +17,22 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.common.message.ListOffsetsRequestData;
 import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
@@ -49,21 +55,25 @@ class OffsetFetcherUtils {
     private final ConsumerMetadata metadata;
     private final SubscriptionState subscriptionState;
     private final Time time;
+    private final long retryBackoffMs;
     private final ApiVersions apiVersions;
     private final Logger log;
 
     private final AtomicReference<RuntimeException> 
cachedOffsetForLeaderException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedListOffsetsException 
= new AtomicReference<>();
     private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
 
     OffsetFetcherUtils(LogContext logContext,
                        ConsumerMetadata metadata,
                        SubscriptionState subscriptionState,
                        Time time,
+                       long retryBackoffMs,
                        ApiVersions apiVersions) {
         this.log = logContext.logger(getClass());
         this.metadata = metadata;
         this.subscriptionState = subscriptionState;
         this.time = time;
+        this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
     }
 
@@ -198,6 +208,11 @@ class OffsetFetcherUtils {
     }
 
     Map<TopicPartition, Long> getOffsetResetTimestamp() {
+        // Raise exception from previous offset fetch if there is one
+        RuntimeException exception = 
cachedListOffsetsException.getAndSet(null);
+        if (exception != null)
+            throw exception;
+
         Set<TopicPartition> partitions = 
subscriptionState.partitionsNeedingReset(time.milliseconds());
         final Map<TopicPartition, Long> offsetResetTimestamps = new 
HashMap<>();
         for (final TopicPartition partition : partitions) {
@@ -209,7 +224,7 @@ class OffsetFetcherUtils {
         return offsetResetTimestamps;
     }
 
-    Map<TopicPartition, OffsetAndTimestamp> buildOffsetsForTimesResult(final 
Map<TopicPartition, Long> timestampsToSearch,
+    static Map<TopicPartition, OffsetAndTimestamp> 
buildOffsetsForTimesResult(final Map<TopicPartition, Long> timestampsToSearch,
                                                                        final 
Map<TopicPartition, ListOffsetData> fetchedOffsets) {
         HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new 
HashMap<>(timestampsToSearch.size());
         for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet())
@@ -235,7 +250,7 @@ class OffsetFetcherUtils {
             return null;
     }
 
-    Set<String> topicsForPartitions(Collection<TopicPartition> partitions) {
+    static Set<String> topicsForPartitions(Collection<TopicPartition> 
partitions) {
         return 
partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
     }
 
@@ -261,6 +276,73 @@ class OffsetFetcherUtils {
         }
     }
 
+    static OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
+        if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
+            return OffsetResetStrategy.EARLIEST;
+        else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
+            return OffsetResetStrategy.LATEST;
+        else
+            return null;
+    }
+
+    void onSuccessfulRequestForResettingPositions(
+            final Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
+            final ListOffsetResult result) {
+        if (!result.partitionsToRetry.isEmpty()) {
+            subscriptionState.requestFailed(result.partitionsToRetry, 
time.milliseconds() + retryBackoffMs);
+            metadata.requestUpdate();
+        }
+
+        for (Map.Entry<TopicPartition, ListOffsetData> fetchedOffset : 
result.fetchedOffsets.entrySet()) {
+            TopicPartition partition = fetchedOffset.getKey();
+            ListOffsetData offsetData = fetchedOffset.getValue();
+            ListOffsetsRequestData.ListOffsetsPartition requestedReset = 
resetTimestamps.get(partition);
+            resetPositionIfNeeded(
+                    partition,
+                    timestampToOffsetResetStrategy(requestedReset.timestamp()),
+                    offsetData);
+        }
+    }
+
+    void onFailedRequestForResettingPositions(
+            final Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
+            final RuntimeException error) {
+        subscriptionState.requestFailed(resetTimestamps.keySet(), 
time.milliseconds() + retryBackoffMs);
+        metadata.requestUpdate();
+
+        if (!(error instanceof RetriableException) && 
!cachedListOffsetsException.compareAndSet(null,
+                error))
+            log.error("Discarding error in ListOffsetResponse because another 
error is pending", error);
+    }
+
+    // Visible for testing
+    void resetPositionIfNeeded(TopicPartition partition, OffsetResetStrategy 
requestedResetStrategy,
+                               ListOffsetData offsetData) {
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(
+                offsetData.offset,
+                Optional.empty(), // This will ensure we skip validation
+                metadata.currentLeader(partition));
+        offsetData.leaderEpoch.ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(partition, epoch));
+        subscriptionState.maybeSeekUnvalidated(partition, position, 
requestedResetStrategy);
+    }
+
+    static Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> 
regroupFetchPositionsByLeader(
+            Map<TopicPartition, SubscriptionState.FetchPosition> partitionMap) 
{
+        return partitionMap.entrySet()
+                .stream()
+                .filter(entry -> 
entry.getValue().currentLeader.leader.isPresent())
+                .collect(Collectors.groupingBy(entry -> 
entry.getValue().currentLeader.leader.get(),
+                        Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    }
+
+    static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions 
nodeApiVersions) {
+        ApiVersionsResponseData.ApiVersion apiVersion = 
nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
+        if (apiVersion == null)
+            return false;
+
+        return 
OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion());
+    }
+
     static class ListOffsetResult {
         final Map<TopicPartition, OffsetFetcherUtils.ListOffsetData> 
fetchedOffsets;
         final Set<TopicPartition> partitionsToRetry;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index ca1a5fa4c51..fdf89944d68 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -47,7 +47,7 @@ import java.util.function.LongSupplier;
 import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
-import static 
org.apache.kafka.clients.consumer.internals.OffsetFetcher.hasUsableOffsetForLeaderEpochVersion;
+import static 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.hasUsableOffsetForLeaderEpochVersion;
 import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
 import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
 

Reply via email to