This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 3ee844c91f0a961b0b53bec59604da7f16604241 Author: Dezhi “Andy” Fang <andyfang...@gmail.com> AuthorDate: Wed Apr 8 23:46:45 2020 -0700 KAFKA-9583; Use topic-partitions grouped by node to send OffsetsForLeaderEpoch requests (#8077) In `validateOffsetsAsync` in t he consumer, we group the requests by leader node for efficiency. The list of topic-partitions are grouped from `partitionsToValidate` (all partitions) to `node` => `fetchPostitions` (partitions by node). However, when actually sending the request with `OffsetsForLeaderEpochClient`, we use `partitionsToValidate`, which is the list of all topic-partitions passed into `validateOffsetsAsync`. This results in extra partitions being included in the request se [...] This PR fixes the issue by using `fetchPositions`, which is the proper list of partitions that we should send in the request. Additionally, a small typo of API name in `OffsetsForLeaderEpochClient` is corrected (it originally referenced `LisfOffsets` as the API name). Reviewers: David Arthur <mum...@gmail.com>, Jason Gustafson <ja...@confluent.io> --- .../main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java | 2 +- .../kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 3537f60..b359a24 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -796,7 +796,7 @@ public class Fetcher<K, V> implements Closeable { subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), time.milliseconds() + requestTimeoutMs); - RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, partitionsToValidate); + RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions); future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() { @Override public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java index 480d0ea..7e372c7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java @@ -96,7 +96,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient< partitionsToRetry.add(topicPartition); break; case UNKNOWN_TOPIC_OR_PARTITION: - logger().warn("Received unknown topic or partition error in ListOffset request for partition {}", + logger().warn("Received unknown topic or partition error in OffsetsForLeaderEpoch request for partition {}", topicPartition); partitionsToRetry.add(topicPartition); break;