This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bacb45e MINOR: Set `replicaId` for OffsetsForLeaderEpoch from
followers (#6775)
bacb45e is described below
commit bacb45e044aae61fd373f6535f8073263c972370
Author: David Arthur <[email protected]>
AuthorDate: Tue May 21 18:50:21 2019 -0400
MINOR: Set `replicaId` for OffsetsForLeaderEpoch from followers (#6775)
Reviewers: Jason Gustafson <[email protected]>
---
.../internals/OffsetsForLeaderEpochClient.java | 2 +-
.../org/apache/kafka/common/requests/FetchRequest.java | 2 +-
.../apache/kafka/common/requests/FetchResponse.java | 14 ++++++++------
.../common/requests/OffsetsForLeaderEpochRequest.java | 13 +++++++++++--
.../kafka/clients/consumer/internals/FetcherTest.java | 14 +++++++-------
.../kafka/common/requests/RequestResponseTest.java | 18 +++++++++---------
.../main/scala/kafka/server/ReplicaFetcherThread.scala | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../server/OffsetsForLeaderEpochRequestTest.scala | 4 ++--
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
.../server/epoch/LeaderEpochIntegrationTest.scala | 2 +-
11 files changed, 43 insertions(+), 32 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index 9ffedd1..d7b02a7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -53,7 +53,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
fetchEpoch -> partitionData.put(topicPartition,
new
OffsetsForLeaderEpochRequest.PartitionData(fetchPosition.currentLeader.epoch,
fetchEpoch))));
- return new
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(),
partitionData);
+ return
OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(),
partitionData);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 485b102..da09df3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -459,7 +459,7 @@ public class FetchRequest extends AbstractRequest {
for (Map.Entry<TopicPartition, PartitionData> entry :
fetchData.entrySet()) {
FetchResponse.PartitionData<MemoryRecords> partitionResponse = new
FetchResponse.PartitionData<>(error,
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null, null,
MemoryRecords.EMPTY);
+ FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(),
null, MemoryRecords.EMPTY);
responseData.put(entry.getKey(), partitionResponse);
}
return new FetchResponse<>(error, responseData, throttleTimeMs,
metadata.sessionId());
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index e857b5b..942b0d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
+import java.util.function.Predicate;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
@@ -223,7 +224,7 @@ public class FetchResponse<T extends BaseRecords> extends
AbstractResponse {
public static final long INVALID_HIGHWATERMARK = -1L;
public static final long INVALID_LAST_STABLE_OFFSET = -1L;
public static final long INVALID_LOG_START_OFFSET = -1L;
- public static final int UNSPECIFIED_PREFERRED_REPLICA = -1;
+ public static final int INVALID_PREFERRED_REPLICA_ID = -1;
private final int throttleTimeMs;
private final Errors error;
@@ -277,14 +278,14 @@ public class FetchResponse<T extends BaseRecords> extends
AbstractResponse {
long highWatermark,
long lastStableOffset,
long logStartOffset,
- Integer preferredReadReplica,
+ Optional<Integer> preferredReadReplica,
List<AbortedTransaction> abortedTransactions,
T records) {
this.error = error;
this.highWatermark = highWatermark;
this.lastStableOffset = lastStableOffset;
this.logStartOffset = logStartOffset;
- this.preferredReadReplica =
Optional.ofNullable(preferredReadReplica);
+ this.preferredReadReplica = preferredReadReplica;
this.abortedTransactions = abortedTransactions;
this.records = records;
}
@@ -379,7 +380,9 @@ public class FetchResponse<T extends BaseRecords> extends
AbstractResponse {
long highWatermark =
partitionResponseHeader.get(HIGH_WATERMARK);
long lastStableOffset =
partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET,
INVALID_LAST_STABLE_OFFSET);
long logStartOffset =
partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
- int preferredReadReplica =
partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA,
UNSPECIFIED_PREFERRED_REPLICA);
+ Optional<Integer> preferredReadReplica = Optional.of(
+ partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA,
INVALID_PREFERRED_REPLICA_ID)
+
).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
BaseRecords baseRecords =
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
if (!(baseRecords instanceof MemoryRecords))
@@ -401,8 +404,7 @@ public class FetchResponse<T extends BaseRecords> extends
AbstractResponse {
}
PartitionData<MemoryRecords> partitionData = new
PartitionData<>(error, highWatermark, lastStableOffset,
- logStartOffset, preferredReadReplica ==
UNSPECIFIED_PREFERRED_REPLICA ? null : preferredReadReplica,
- abortedTransactions, records);
+ logStartOffset, preferredReadReplica,
abortedTransactions, records);
responseData.put(new TopicPartition(topic, partition),
partitionData);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 5052b0e..6599a70 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -101,10 +101,19 @@ public class OffsetsForLeaderEpochRequest extends
AbstractRequest {
private final Map<TopicPartition, PartitionData> epochsByPartition;
private final int replicaId;
- public Builder(short version, Map<TopicPartition, PartitionData>
epochsByPartition) {
+ Builder(short version, Map<TopicPartition, PartitionData>
epochsByPartition, int replicaId) {
super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
this.epochsByPartition = epochsByPartition;
- this.replicaId = CONSUMER_REPLICA_ID;
+ this.replicaId = replicaId;
+ }
+
+ public static Builder forConsumer(short version, Map<TopicPartition,
PartitionData> epochsByPartition) {
+ return new Builder(version, epochsByPartition,
CONSUMER_REPLICA_ID);
+ }
+
+ public static Builder forFollower(short version, Map<TopicPartition,
PartitionData> epochsByPartition, int replicaId) {
+ return new Builder(version, epochsByPartition, replicaId);
+
}
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 2f40ffc..0e2662a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1173,7 +1173,7 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>>
partitions = new HashMap<>();
partitions.put(tp0, new FetchResponse.PartitionData<>(Errors.NONE, 100,
- FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, null, records));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, records));
client.prepareResponse(fullFetchResponse(tp0, this.records,
Errors.NONE, 100L, 0));
consumerClient.poll(time.timer(0));
@@ -1184,7 +1184,7 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
partitions = new HashMap<>();
partitions.put(tp1, new
FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100,
- FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null,
MemoryRecords.EMPTY));
client.prepareResponse(new FetchResponse<>(Errors.NONE, new
LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID));
consumerClient.poll(time.timer(0));
assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
@@ -3341,7 +3341,7 @@ public class FetcherTest {
// Set preferred read replica to node=1
client.prepareResponse(fullFetchResponse(tp0, this.records,
Errors.NONE, 100L,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
consumerClient.poll(time.timer(0));
assertTrue(fetcher.hasCompletedFetches());
@@ -3358,7 +3358,7 @@ public class FetcherTest {
// Set preferred read replica to node=2, which isn't in our metadata,
should revert to leader
client.prepareResponse(fullFetchResponse(tp0, this.records,
Errors.NONE, 100L,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 2));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(2)));
consumerClient.poll(time.timer(0));
assertTrue(fetcher.hasCompletedFetches());
fetchedRecords();
@@ -3379,7 +3379,7 @@ public class FetcherTest {
assertFalse(fetcher.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tp0, this.records,
Errors.NONE, 100L,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
consumerClient.poll(time.timer(0));
assertTrue(fetcher.hasCompletedFetches());
@@ -3393,7 +3393,7 @@ public class FetcherTest {
assertFalse(fetcher.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tp0, this.records,
Errors.OFFSET_OUT_OF_RANGE, 100L,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, null));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0,
Optional.empty()));
consumerClient.poll(time.timer(0));
assertTrue(fetcher.hasCompletedFetches());
@@ -3449,7 +3449,7 @@ public class FetcherTest {
}
private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp,
MemoryRecords records, Errors error, long hw,
- long
lastStableOffset, int throttleTime, Integer preferredReplicaId) {
+ long
lastStableOffset, int throttleTime, Optional<Integer> preferredReplicaId) {
Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>>
partitions = Collections.singletonMap(tp,
new FetchResponse.PartitionData<>(error, hw, lastStableOffset,
0L,
preferredReplicaId, null, records));
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 1cef864..d7e9223 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -559,7 +559,7 @@ public class RequestResponseTest {
MemoryRecords records =
MemoryRecords.readableRecords(ByteBuffer.allocate(10));
responseData.put(new TopicPartition("test", 0), new
FetchResponse.PartitionData<>(
Errors.NONE, 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- 0L, null, null, records));
+ 0L, Optional.empty(), null, records));
FetchResponse<MemoryRecords> v0Response = new
FetchResponse<>(Errors.NONE, responseData, 0, INVALID_SESSION_ID);
FetchResponse<MemoryRecords> v1Response = new
FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
@@ -583,11 +583,11 @@ public class RequestResponseTest {
new FetchResponse.AbortedTransaction(15, 50)
);
responseData.put(new TopicPartition("bar", 0), new
FetchResponse.PartitionData<>(Errors.NONE, 100000,
- FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, abortedTransactions, records));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), abortedTransactions,
records));
responseData.put(new TopicPartition("bar", 1), new
FetchResponse.PartitionData<>(Errors.NONE, 900000,
- 5, FetchResponse.INVALID_LOG_START_OFFSET, null, null,
records));
+ 5, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(),
null, records));
responseData.put(new TopicPartition("foo", 0), new
FetchResponse.PartitionData<>(Errors.NONE, 70000,
- 6, FetchResponse.INVALID_LOG_START_OFFSET, null,
Collections.emptyList(), records));
+ 6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(),
Collections.emptyList(), records));
FetchResponse<MemoryRecords> response = new
FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
FetchResponse deserialized =
FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -751,11 +751,11 @@ public class RequestResponseTest {
LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseData = new
LinkedHashMap<>();
MemoryRecords records =
MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("blah".getBytes()));
responseData.put(new TopicPartition("test", 0), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null,
records));
+ 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), null, records));
List<FetchResponse.AbortedTransaction> abortedTransactions =
Collections.singletonList(
new FetchResponse.AbortedTransaction(234L, 999L));
responseData.put(new TopicPartition("test", 1), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null,
abortedTransactions, MemoryRecords.EMPTY));
+ 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
}
@@ -763,12 +763,12 @@ public class RequestResponseTest {
LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseData = new
LinkedHashMap<>();
MemoryRecords records =
MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("blah".getBytes()));
responseData.put(new TopicPartition("test", 0), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null,
null, records));
+ 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), null, records));
List<FetchResponse.AbortedTransaction> abortedTransactions =
Collections.singletonList(
new FetchResponse.AbortedTransaction(234L, 999L));
responseData.put(new TopicPartition("test", 1), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null,
abortedTransactions, MemoryRecords.EMPTY));
+ 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
return new FetchResponse<>(Errors.NONE, responseData, 25,
INVALID_SESSION_ID);
}
@@ -1260,7 +1260,7 @@ public class RequestResponseTest {
epochs.put(new TopicPartition("topic2", 2),
new
OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
- return new
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(),
epochs).build();
+ return
OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(),
epochs).build();
}
private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8e92c2b..ab5be6e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -303,7 +303,7 @@ class ReplicaFetcherThread(name: String,
return Map.empty
}
- val epochRequest = new
OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion,
partitions.asJava)
+ val epochRequest =
OffsetsForLeaderEpochRequest.Builder.forFollower(offsetForLeaderEpochRequestVersion,
partitions.asJava, brokerConfig.brokerId)
debug(s"Sending offset for leader epoch request $epochRequest")
try {
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index b465773..a577b2e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -295,7 +295,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def offsetsForLeaderEpochRequest: OffsetsForLeaderEpochRequest = {
val epochs = Map(tp -> new
OffsetsForLeaderEpochRequest.PartitionData(Optional.of(27), 7))
- new
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
epochs.asJava).build()
+
OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
epochs.asJava).build()
}
private def createOffsetFetchRequest = {
diff --git
a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
index 2cdd2e8..4d1416c 100644
---
a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
@@ -35,7 +35,7 @@ class OffsetsForLeaderEpochRequestTest extends
BaseRequestTest {
val partition = new TopicPartition(topic, 0)
val epochs = Map(partition -> new
OffsetsForLeaderEpochRequest.PartitionData(Optional.empty[Integer], 0)).asJava
- val request = new
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
epochs).build()
+ val request =
OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
epochs).build()
// Unknown topic
val randomBrokerId = servers.head.config.brokerId
@@ -61,7 +61,7 @@ class OffsetsForLeaderEpochRequestTest extends
BaseRequestTest {
def assertResponseErrorForEpoch(error: Errors, brokerId: Int,
currentLeaderEpoch: Optional[Integer]): Unit = {
val epochs = Map(topicPartition -> new
OffsetsForLeaderEpochRequest.PartitionData(
currentLeaderEpoch, 0)).asJava
- val request = new
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
epochs)
+ val request =
OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
epochs)
.build()
assertResponseError(error, brokerId, request)
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ddcee12..047188f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -362,7 +362,7 @@ class RequestQuotaTest extends BaseRequestTest {
new InitProducerIdRequest.Builder(requestData)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
- new
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
+
OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
Map(tp -> new
OffsetsForLeaderEpochRequest.PartitionData(Optional.of(15), 0)).asJava)
case ApiKeys.ADD_PARTITIONS_TO_TXN =>
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index cb8996c..8a6dcba 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -279,7 +279,7 @@ class LeaderEpochIntegrationTest extends
ZooKeeperTestHarness with Logging {
def leaderOffsetsFor(partitions: Map[TopicPartition, Int]):
Map[TopicPartition, EpochEndOffset] = {
val partitionData = partitions.mapValues(
new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), _))
- val request = new
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
+ val request =
OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
partitionData.asJava)
val response = sender.sendRequest(request)
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala