This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d05b47112ad81bf37cff078f403544cfd65a5221 Author: David Arthur <[email protected]> AuthorDate: Tue May 21 18:50:21 2019 -0400 MINOR: Set `replicaId` for OffsetsForLeaderEpoch from followers (#6775) Reviewers: Jason Gustafson <[email protected]> --- .../internals/OffsetsForLeaderEpochClient.java | 2 +- .../org/apache/kafka/common/requests/FetchRequest.java | 2 +- .../apache/kafka/common/requests/FetchResponse.java | 14 ++++++++------ .../common/requests/OffsetsForLeaderEpochRequest.java | 13 +++++++++++-- .../kafka/clients/consumer/internals/FetcherTest.java | 14 +++++++------- .../kafka/common/requests/RequestResponseTest.java | 18 +++++++++--------- .../main/scala/kafka/server/ReplicaFetcherThread.scala | 2 +- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../server/OffsetsForLeaderEpochRequestTest.scala | 4 ++-- .../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +- .../server/epoch/LeaderEpochIntegrationTest.scala | 2 +- 11 files changed, 43 insertions(+), 32 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java index 9ffedd1..d7b02a7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java @@ -53,7 +53,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient< fetchEpoch -> partitionData.put(topicPartition, new OffsetsForLeaderEpochRequest.PartitionData(fetchPosition.currentLeader.epoch, fetchEpoch)))); - return new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), partitionData); + return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), partitionData); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 485b102..da09df3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -459,7 +459,7 @@ public class FetchRequest extends AbstractRequest { for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) { FetchResponse.PartitionData<MemoryRecords> partitionResponse = new FetchResponse.PartitionData<>(error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY); + FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, MemoryRecords.EMPTY); responseData.put(entry.getKey(), partitionResponse); } return new FetchResponse<>(error, responseData, throttleTimeMs, metadata.sessionId()); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index e857b5b..942b0d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Queue; +import java.util.function.Predicate; import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; @@ -223,7 +224,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse { public static final long INVALID_HIGHWATERMARK = -1L; public static final long INVALID_LAST_STABLE_OFFSET = -1L; public static final long INVALID_LOG_START_OFFSET = -1L; - public static final int UNSPECIFIED_PREFERRED_REPLICA = -1; + public static final int INVALID_PREFERRED_REPLICA_ID = -1; private final int throttleTimeMs; private final Errors error; @@ -277,14 +278,14 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse { long highWatermark, long lastStableOffset, long logStartOffset, - Integer preferredReadReplica, + Optional<Integer> preferredReadReplica, List<AbortedTransaction> abortedTransactions, T records) { this.error = error; this.highWatermark = highWatermark; this.lastStableOffset = lastStableOffset; this.logStartOffset = logStartOffset; - this.preferredReadReplica = Optional.ofNullable(preferredReadReplica); + this.preferredReadReplica = preferredReadReplica; this.abortedTransactions = abortedTransactions; this.records = records; } @@ -379,7 +380,9 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse { long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK); long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET); long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET); - int preferredReadReplica = partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, UNSPECIFIED_PREFERRED_REPLICA); + Optional<Integer> preferredReadReplica = Optional.of( + partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID) + ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate()); BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME); if (!(baseRecords instanceof MemoryRecords)) @@ -401,8 +404,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse { } PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset, - logStartOffset, preferredReadReplica == UNSPECIFIED_PREFERRED_REPLICA ? null : preferredReadReplica, - abortedTransactions, records); + logStartOffset, preferredReadReplica, abortedTransactions, records); responseData.put(new TopicPartition(topic, partition), partitionData); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java index 5052b0e..6599a70 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java @@ -101,10 +101,19 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest { private final Map<TopicPartition, PartitionData> epochsByPartition; private final int replicaId; - public Builder(short version, Map<TopicPartition, PartitionData> epochsByPartition) { + Builder(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) { super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version); this.epochsByPartition = epochsByPartition; - this.replicaId = CONSUMER_REPLICA_ID; + this.replicaId = replicaId; + } + + public static Builder forConsumer(short version, Map<TopicPartition, PartitionData> epochsByPartition) { + return new Builder(version, epochsByPartition, CONSUMER_REPLICA_ID); + } + + public static Builder forFollower(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) { + return new Builder(version, epochsByPartition, replicaId); + } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 2f40ffc..0e2662a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1173,7 +1173,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = new HashMap<>(); partitions.put(tp0, new FetchResponse.PartitionData<>(Errors.NONE, 100, - FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null, records)); + FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, records)); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); @@ -1184,7 +1184,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); partitions = new HashMap<>(); partitions.put(tp1, new FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100, - FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY)); + FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, MemoryRecords.EMPTY)); client.prepareResponse(new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID)); consumerClient.poll(time.timer(0)); assertEquals(1, fetcher.fetchedRecords().get(tp0).size()); @@ -3341,7 +3341,7 @@ public class FetcherTest { // Set preferred read replica to node=1 client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, - FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1)); + FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1))); consumerClient.poll(time.timer(0)); assertTrue(fetcher.hasCompletedFetches()); @@ -3358,7 +3358,7 @@ public class FetcherTest { // Set preferred read replica to node=2, which isn't in our metadata, should revert to leader client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, - FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 2)); + FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(2))); consumerClient.poll(time.timer(0)); assertTrue(fetcher.hasCompletedFetches()); fetchedRecords(); @@ -3379,7 +3379,7 @@ public class FetcherTest { assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, - FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1)); + FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1))); consumerClient.poll(time.timer(0)); assertTrue(fetcher.hasCompletedFetches()); @@ -3393,7 +3393,7 @@ public class FetcherTest { assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, - FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, null)); + FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.empty())); consumerClient.poll(time.timer(0)); assertTrue(fetcher.hasCompletedFetches()); @@ -3449,7 +3449,7 @@ public class FetcherTest { } private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, - long lastStableOffset, int throttleTime, Integer preferredReplicaId) { + long lastStableOffset, int throttleTime, Optional<Integer> preferredReplicaId) { Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp, new FetchResponse.PartitionData<>(error, hw, lastStableOffset, 0L, preferredReplicaId, null, records)); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 1cef864..d7e9223 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -559,7 +559,7 @@ public class RequestResponseTest { MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10)); responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>( Errors.NONE, 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, - 0L, null, null, records)); + 0L, Optional.empty(), null, records)); FetchResponse<MemoryRecords> v0Response = new FetchResponse<>(Errors.NONE, responseData, 0, INVALID_SESSION_ID); FetchResponse<MemoryRecords> v1Response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID); @@ -583,11 +583,11 @@ public class RequestResponseTest { new FetchResponse.AbortedTransaction(15, 50) ); responseData.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData<>(Errors.NONE, 100000, - FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, abortedTransactions, records)); + FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), abortedTransactions, records)); responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData<>(Errors.NONE, 900000, - 5, FetchResponse.INVALID_LOG_START_OFFSET, null, null, records)); + 5, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, records)); responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData<>(Errors.NONE, 70000, - 6, FetchResponse.INVALID_LOG_START_OFFSET, null, Collections.emptyList(), records)); + 6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), Collections.emptyList(), records)); FetchResponse<MemoryRecords> response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID); FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4); @@ -751,11 +751,11 @@ public class RequestResponseTest { LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>(); MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes())); responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE, - 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null, records)); + 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), null, records)); List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList( new FetchResponse.AbortedTransaction(234L, 999L)); responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE, - 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, abortedTransactions, MemoryRecords.EMPTY)); + 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), abortedTransactions, MemoryRecords.EMPTY)); return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId); } @@ -763,12 +763,12 @@ public class RequestResponseTest { LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>(); MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes())); responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE, - 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null, records)); + 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), null, records)); List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList( new FetchResponse.AbortedTransaction(234L, 999L)); responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE, - 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, abortedTransactions, MemoryRecords.EMPTY)); + 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), abortedTransactions, MemoryRecords.EMPTY)); return new FetchResponse<>(Errors.NONE, responseData, 25, INVALID_SESSION_ID); } @@ -1260,7 +1260,7 @@ public class RequestResponseTest { epochs.put(new TopicPartition("topic2", 2), new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3)); - return new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build(); + return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build(); } private OffsetsForLeaderEpochResponse createLeaderEpochResponse() { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 8e92c2b..ab5be6e 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -303,7 +303,7 @@ class ReplicaFetcherThread(name: String, return Map.empty } - val epochRequest = new OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion, partitions.asJava) + val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(offsetForLeaderEpochRequestVersion, partitions.asJava, brokerConfig.brokerId) debug(s"Sending offset for leader epoch request $epochRequest") try { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index b465773..a577b2e 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -295,7 +295,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def offsetsForLeaderEpochRequest: OffsetsForLeaderEpochRequest = { val epochs = Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(27), 7)) - new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs.asJava).build() + OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs.asJava).build() } private def createOffsetFetchRequest = { diff --git a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala index 2cdd2e8..4d1416c 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala @@ -35,7 +35,7 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest { val partition = new TopicPartition(topic, 0) val epochs = Map(partition -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty[Integer], 0)).asJava - val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs).build() + val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs).build() // Unknown topic val randomBrokerId = servers.head.config.brokerId @@ -61,7 +61,7 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest { def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = { val epochs = Map(topicPartition -> new OffsetsForLeaderEpochRequest.PartitionData( currentLeaderEpoch, 0)).asJava - val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs) + val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs) .build() assertResponseError(error, brokerId, request) } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index ddcee12..047188f 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -362,7 +362,7 @@ class RequestQuotaTest extends BaseRequestTest { new InitProducerIdRequest.Builder(requestData) case ApiKeys.OFFSET_FOR_LEADER_EPOCH => - new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, + OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(15), 0)).asJava) case ApiKeys.ADD_PARTITIONS_TO_TXN => diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index cb8996c..8a6dcba 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -279,7 +279,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { val partitionData = partitions.mapValues( new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), _)) - val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, + val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, partitionData.asJava) val response = sender.sendRequest(request) response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
