This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 79b5f7f1ce2 KAFKA-14617: Add ReplicaState to FetchRequest (KIP-903)
(#13323)
79b5f7f1ce2 is described below
commit 79b5f7f1ce2791fcebf70a447e5530c60b768702
Author: Calvin Liu <[email protected]>
AuthorDate: Thu Mar 16 06:04:34 2023 -0700
KAFKA-14617: Add ReplicaState to FetchRequest (KIP-903) (#13323)
This patch is the first part of KIP-903. It updates the FetchRequest to
include the new tagged ReplicaState field which replaces the now deprecated
ReplicaId field. The FetchRequest version is bumped to version 15 and the
MetadataVersion to 3.5-IV1.
Reviewers: David Jacot <[email protected]>
---
.../apache/kafka/common/requests/FetchRequest.java | 55 +++++++++++++++++++---
.../resources/common/message/FetchRequest.json | 13 ++++-
.../resources/common/message/FetchResponse.json | 4 +-
.../kafka/common/requests/FetchRequestTest.java | 41 ++++++++++++++--
.../server/builders/ReplicaManagerBuilder.java | 9 +++-
.../scala/kafka/raft/KafkaNetworkChannel.scala | 6 +--
.../src/main/scala/kafka/server/BrokerServer.scala | 3 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 1 +
core/src/main/scala/kafka/server/KafkaServer.scala | 3 +-
.../scala/kafka/server/LocalLeaderEndPoint.scala | 3 +-
.../scala/kafka/server/RemoteLeaderEndPoint.scala | 5 +-
.../scala/kafka/server/ReplicaFetcherManager.scala | 5 +-
.../main/scala/kafka/server/ReplicaManager.scala | 3 +-
.../kafka/tools/ReplicaVerificationTool.scala | 2 +-
.../java/kafka/test/ClusterTestExtensionsTest.java | 2 +-
.../java/kafka/test/annotation/ClusterTest.java | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../kafka/server/DelayedFetchTest.scala | 1 +
.../server/MetadataVersionIntegrationTest.scala | 2 +-
.../kafka/server/RemoteLeaderEndPointTest.scala | 39 ++++++++++++---
.../unit/kafka/admin/FeatureCommandTest.scala | 10 ++--
.../unit/kafka/cluster/PartitionLockTest.scala | 1 +
.../scala/unit/kafka/cluster/PartitionTest.scala | 3 ++
.../unit/kafka/raft/KafkaNetworkChannelTest.scala | 29 +++++++++++-
.../kafka/server/AbstractFetcherThreadTest.scala | 2 +-
.../FetchRequestDownConversionConfigTest.scala | 2 +-
.../scala/unit/kafka/server/FetchRequestTest.scala | 18 +++----
.../scala/unit/kafka/server/FetchSessionTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 7 +--
.../server/ReplicaAlterLogDirsThreadTest.scala | 1 +
.../kafka/server/ReplicaFetcherThreadTest.scala | 9 ++--
.../server/ReplicaManagerConcurrencyTest.scala | 1 +
.../kafka/server/ReplicaManagerQuotasTest.scala | 2 +
.../unit/kafka/server/ReplicaManagerTest.scala | 5 +-
.../kafka/jmh/common/FetchRequestBenchmark.java | 4 +-
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 3 +-
.../kafka/controller/QuorumControllerTest.java | 8 ++--
.../org/apache/kafka/raft/KafkaRaftClient.java | 12 +++--
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 32 +++++++++++++
.../apache/kafka/raft/RaftClientTestContext.java | 4 +-
.../kafka/server/common/MetadataVersion.java | 9 +++-
.../kafka/server/common/MetadataVersionTest.java | 3 ++
.../kafka/storage/internals/log/FetchParams.java | 6 +++
43 files changed, 294 insertions(+), 82 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index aaad3b89104..3a286225a65 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchRequestData.ForgottenTopic;
+import org.apache.kafka.common.message.FetchRequestData.ReplicaState;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
@@ -130,10 +131,34 @@ public class FetchRequest extends AbstractRequest {
}
}
+ // It is only used by KafkaRaftClient for downgrading the FetchRequest.
+ public static class SimpleBuilder extends
AbstractRequest.Builder<FetchRequest> {
+ private final FetchRequestData fetchRequestData;
+
+ public SimpleBuilder(FetchRequestData fetchRequestData) {
+ super(ApiKeys.FETCH);
+ this.fetchRequestData = fetchRequestData;
+ }
+
+ @Override
+ public FetchRequest build(short version) {
+ if (fetchRequestData.replicaId() >= 0) {
+ throw new IllegalStateException("The replica id should be
placed in the replicaState of a fetchRequestData");
+ }
+
+ if (version < 15) {
+
fetchRequestData.setReplicaId(fetchRequestData.replicaState().replicaId());
+ fetchRequestData.setReplicaState(new ReplicaState());
+ }
+ return new FetchRequest(fetchRequestData, version);
+ }
+ }
+
public static class Builder extends AbstractRequest.Builder<FetchRequest> {
private final int maxWait;
private final int minBytes;
private final int replicaId;
+ private final long replicaEpoch;
private final Map<TopicPartition, PartitionData> toFetch;
private IsolationLevel isolationLevel =
IsolationLevel.READ_UNCOMMITTED;
private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
@@ -144,18 +169,19 @@ public class FetchRequest extends AbstractRequest {
public static Builder forConsumer(short maxVersion, int maxWait, int
minBytes, Map<TopicPartition, PartitionData> fetchData) {
return new Builder(ApiKeys.FETCH.oldestVersion(), maxVersion,
- CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
+ CONSUMER_REPLICA_ID, -1, maxWait, minBytes, fetchData);
}
- public static Builder forReplica(short allowedVersion, int replicaId,
int maxWait, int minBytes,
+ public static Builder forReplica(short allowedVersion, int replicaId,
long replicaEpoch, int maxWait, int minBytes,
Map<TopicPartition, PartitionData>
fetchData) {
- return new Builder(allowedVersion, allowedVersion, replicaId,
maxWait, minBytes, fetchData);
+ return new Builder(allowedVersion, allowedVersion, replicaId,
replicaEpoch, maxWait, minBytes, fetchData);
}
- public Builder(short minVersion, short maxVersion, int replicaId, int
maxWait, int minBytes,
+ public Builder(short minVersion, short maxVersion, int replicaId, long
replicaEpoch, int maxWait, int minBytes,
Map<TopicPartition, PartitionData> fetchData) {
super(ApiKeys.FETCH, minVersion, maxVersion);
this.replicaId = replicaId;
+ this.replicaEpoch = replicaEpoch;
this.maxWait = maxWait;
this.minBytes = minBytes;
this.toFetch = fetchData;
@@ -228,12 +254,18 @@ public class FetchRequest extends AbstractRequest {
}
FetchRequestData fetchRequestData = new FetchRequestData();
- fetchRequestData.setReplicaId(replicaId);
fetchRequestData.setMaxWaitMs(maxWait);
fetchRequestData.setMinBytes(minBytes);
fetchRequestData.setMaxBytes(maxBytes);
fetchRequestData.setIsolationLevel(isolationLevel.id());
fetchRequestData.setForgottenTopicsData(new ArrayList<>());
+ if (version < 15) {
+ fetchRequestData.setReplicaId(replicaId);
+ } else {
+ fetchRequestData.setReplicaState(new ReplicaState()
+ .setReplicaId(replicaId)
+ .setReplicaEpoch(replicaEpoch));
+ }
Map<String, FetchRequestData.ForgottenTopic> forgottenTopicMap =
new LinkedHashMap<>();
addToForgottenTopicMap(removed, forgottenTopicMap);
@@ -302,6 +334,10 @@ public class FetchRequest extends AbstractRequest {
}
}
+ public static int replicaId(FetchRequestData fetchRequestData) {
+ return fetchRequestData.replicaId() != -1 ?
fetchRequestData.replicaId() : fetchRequestData.replicaState().replicaId();
+ }
+
public FetchRequest(FetchRequestData fetchRequestData, short version) {
super(ApiKeys.FETCH, version);
this.data = fetchRequestData;
@@ -338,7 +374,14 @@ public class FetchRequest extends AbstractRequest {
}
public int replicaId() {
- return data.replicaId();
+ if (version() < 15) {
+ return data.replicaId();
+ }
+ return data.replicaState().replicaId();
+ }
+
+ public long replicaEpoch() {
+ return data.replicaState().replicaEpoch();
}
public int maxWait() {
diff --git a/clients/src/main/resources/common/message/FetchRequest.json
b/clients/src/main/resources/common/message/FetchRequest.json
index 903ed48e185..3ac9dccd0ab 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -50,14 +50,23 @@
// Version 13 replaces topic names with topic IDs (KIP-516). May return
UNKNOWN_TOPIC_ID error code.
//
// Version 14 is the same as version 13 but it also receives a new error
called OffsetMovedToTieredStorageException(KIP-405)
- "validVersions": "0-14",
+ //
+ // Version 15 adds the ReplicaState which includes new field ReplicaEpoch
and the ReplicaId. Also,
+ // deprecate the old ReplicaId field and set its default value to -1.
(KIP-903)
+ "validVersions": "0-15",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+",
"nullableVersions": "12+", "default": "null",
"taggedVersions": "12+", "tag": 0, "ignorable": true,
"about": "The clusterId if known. This is used to validate metadata
fetches prior to broker registration." },
- { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
+ { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default":
"-1", "entityType": "brokerId",
"about": "The broker ID of the follower, of -1 if this request is from a
consumer." },
+ { "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+",
"tag": 1, "fields": [
+ { "name": "ReplicaId", "type": "int32", "versions": "15+", "default":
"-1", "entityType": "brokerId",
+ "about": "The replica ID of the follower, or -1 if this request is
from a consumer." },
+ { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default":
"-1",
+ "about": "The epoch of this follower, or -1 if not available." }
+ ]},
{ "name": "MaxWaitMs", "type": "int32", "versions": "0+",
"about": "The maximum time in milliseconds to wait for the response." },
{ "name": "MinBytes", "type": "int32", "versions": "0+",
diff --git a/clients/src/main/resources/common/message/FetchResponse.json
b/clients/src/main/resources/common/message/FetchResponse.json
index 7d144f01831..366e702cfbd 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -43,7 +43,9 @@
// Version 13 replaces the topic name field with topic ID (KIP-516).
//
// Version 14 is the same as version 13 but it also receives a new error
called OffsetMovedToTieredStorageException (KIP-405)
- "validVersions": "0-14",
+ //
+ // Version 15 is the same as version 14 (KIP-903).
+ "validVersions": "0-15",
"flexibleVersions": "12+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+",
"ignorable": true,
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
index a567d43dc63..702341fee83 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -35,6 +37,7 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class FetchRequestTest {
@@ -54,7 +57,7 @@ public class FetchRequestTest {
List<TopicIdPartition> toReplace = Collections.singletonList(tp);
FetchRequest fetchRequest = FetchRequest.Builder
- .forReplica(version, 0, 1, 1, partitionData)
+ .forReplica(version, 0, 1, 1, 1, partitionData)
.removed(Collections.emptyList())
.replaced(toReplace)
.metadata(FetchMetadata.newIncremental(123)).build(version);
@@ -97,11 +100,14 @@ public class FetchRequestTest {
boolean fetchRequestUsesTopicIds = version >= 13;
FetchRequest fetchRequest = FetchRequest.parse(FetchRequest.Builder
- .forReplica(version, 0, 1, 1, partitionData)
+ .forReplica(version, 0, 1, 1, 1, partitionData)
.removed(Collections.emptyList())
.replaced(Collections.emptyList())
.metadata(FetchMetadata.newIncremental(123)).build(version).serialize(),
version);
+ if (version >= 15) {
+ assertEquals(1, fetchRequest.data().replicaState().replicaEpoch());
+ }
// For versions < 13, we will be provided a topic name and a zero UUID
in FetchRequestData.
// Versions 13+ will contain a valid topic ID but an empty topic name.
List<TopicIdPartition> expectedData = new LinkedList<>();
@@ -156,7 +162,7 @@ public class FetchRequestTest {
boolean fetchRequestUsesTopicIds = version >= 13;
FetchRequest fetchRequest = FetchRequest.parse(FetchRequest.Builder
- .forReplica(version, 0, 1, 1, Collections.emptyMap())
+ .forReplica(version, 0, 1, 1, 1, Collections.emptyMap())
.removed(toForgetTopics)
.replaced(Collections.emptyList())
.metadata(FetchMetadata.newIncremental(123)).build(version).serialize(),
version);
@@ -198,6 +204,35 @@ public class FetchRequestTest {
}
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+ public void testFetchRequestSimpleBuilderReplicaStateDowngrade(short
version) {
+ FetchRequestData fetchRequestData = new FetchRequestData();
+ fetchRequestData.setReplicaState(new
FetchRequestData.ReplicaState().setReplicaId(1));
+ FetchRequest.SimpleBuilder builder = new
FetchRequest.SimpleBuilder(fetchRequestData);
+ fetchRequestData = builder.build(version).data();
+
+ assertEquals(1, FetchRequest.replicaId(fetchRequestData));
+
+ if (version < 15) {
+ assertEquals(1, fetchRequestData.replicaId());
+ assertEquals(-1, fetchRequestData.replicaState().replicaId());
+ } else {
+ assertEquals(-1, fetchRequestData.replicaId());
+ assertEquals(1, fetchRequestData.replicaState().replicaId());
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+ public void testFetchRequestSimpleBuilderReplicaIdNotSupported(short
version) {
+ FetchRequestData fetchRequestData = new
FetchRequestData().setReplicaId(1);
+ FetchRequest.SimpleBuilder builder = new
FetchRequest.SimpleBuilder(fetchRequestData);
+ assertThrows(IllegalStateException.class, () -> {
+ builder.build(version);
+ });
+ }
+
@Test
public void testPartitionDataEquals() {
assertEquals(new FetchRequest.PartitionData(Uuid.ZERO_UUID, 300, 0L,
300, Optional.of(300)),
diff --git
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 2921818e59d..d9d7c1d82c4 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -61,6 +61,7 @@ public class ReplicaManagerBuilder {
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>>
delayedDeleteRecordsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedElectLeader>>
delayedElectLeaderPurgatory = Optional.empty();
private Optional<String> threadNamePrefix = Optional.empty();
+ private Long brokerEpoch = -1L;
public ReplicaManagerBuilder setConfig(KafkaConfig config) {
this.config = config;
@@ -152,6 +153,11 @@ public class ReplicaManagerBuilder {
return this;
}
+ public ReplicaManagerBuilder setBrokerEpoch(long brokerEpoch) {
+ this.brokerEpoch = brokerEpoch;
+ return this;
+ }
+
public ReplicaManager build() {
if (config == null) config = new KafkaConfig(Collections.emptyMap());
if (metrics == null) metrics = new Metrics();
@@ -176,6 +182,7 @@ public class ReplicaManagerBuilder {
OptionConverters.toScala(delayedFetchPurgatory),
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
OptionConverters.toScala(delayedElectLeaderPurgatory),
- OptionConverters.toScala(threadNamePrefix));
+ OptionConverters.toScala(threadNamePrefix),
+ () -> brokerEpoch);
}
}
diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index c44d57102c5..0d86e257932 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -42,11 +42,7 @@ object KafkaNetworkChannel {
case endEpochRequest: EndQuorumEpochRequestData =>
new EndQuorumEpochRequest.Builder(endEpochRequest)
case fetchRequest: FetchRequestData =>
- // Since we already have the request, we go through a simplified
builder
- new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
- override def build(version: Short): FetchRequest = new
FetchRequest(fetchRequest, version)
- override def toString: String = fetchRequest.toString
- }
+ new FetchRequest.SimpleBuilder(fetchRequest)
case fetchSnapshotRequest: FetchSnapshotRequestData =>
new FetchSnapshotRequest.Builder(fetchSnapshotRequest)
case _ =>
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index b73eaa12fee..02ff680614c 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -261,7 +261,8 @@ class BrokerServer(
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = None,
- threadNamePrefix = threadNamePrefix)
+ threadNamePrefix = threadNamePrefix,
+ brokerEpochSupplier = () => lifecycleManager.brokerEpoch)
/* start token manager */
if (config.tokenAuthEnabled) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index bde98b484f0..bc55a7f9ccf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -978,6 +978,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val params = new FetchParams(
versionId,
fetchRequest.replicaId,
+ fetchRequest.replicaEpoch,
fetchRequest.maxWait,
fetchMinBytes,
fetchMaxBytes,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index fe560b9910f..9fed5955a7d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -623,7 +623,8 @@ class KafkaServer(
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = Some(zkClient),
- threadNamePrefix = threadNamePrefix)
+ threadNamePrefix = threadNamePrefix,
+ brokerEpochSupplier = brokerEpochSupplier)
}
private def initZkClient(time: Time): Unit = {
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 587b6a449ed..cb42a612d19 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -94,6 +94,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
val fetchParams = new FetchParams(
request.version,
FetchRequest.FUTURE_LOCAL_REPLICA_ID,
+ -1,
0L, // timeout is 0 so that the callback will be executed immediately
request.minBytes,
request.maxBytes,
@@ -226,7 +227,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
ApiKeys.FETCH.latestVersion
// Set maxWait and minBytes to 0 because the response should return
immediately if
// the future log has caught up with the current log of the partition
- val requestBuilder = FetchRequest.Builder.forReplica(version, replicaId,
0, 0, requestMap).setMaxBytes(maxBytes)
+ val requestBuilder = FetchRequest.Builder.forReplica(version, replicaId,
-1, 0, 0, requestMap).setMaxBytes(maxBytes)
Some(ReplicaFetch(requestMap, requestBuilder))
}
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index ed8e775f330..0f363fba8fa 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -56,7 +56,8 @@ class RemoteLeaderEndPoint(logPrefix: String,
brokerConfig: KafkaConfig,
replicaManager: ReplicaManager,
quota: ReplicaQuota,
- metadataVersionSupplier: () => MetadataVersion)
extends LeaderEndPoint with Logging {
+ metadataVersionSupplier: () => MetadataVersion,
+ brokerEpochSupplier: () => Long) extends
LeaderEndPoint with Logging {
this.logIdent = logPrefix
@@ -217,7 +218,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
metadataVersion.fetchRequestVersion
}
val requestBuilder = FetchRequest.Builder
- .forReplica(version, brokerConfig.brokerId, maxWait, minBytes,
fetchData.toSend)
+ .forReplica(version, brokerConfig.brokerId, brokerEpochSupplier(),
maxWait, minBytes, fetchData.toSend)
.setMaxBytes(maxBytes)
.removed(fetchData.toForget)
.replaced(fetchData.toReplace)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 33af5836cd1..9abd2895c4e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -29,7 +29,8 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
time: Time,
threadNamePrefix: Option[String] = None,
quotaManager: ReplicationQuotaManager,
- metadataVersionSupplier: () => MetadataVersion)
+ metadataVersionSupplier: () => MetadataVersion,
+ brokerEpochSupplier: () => Long)
extends AbstractFetcherManager[ReplicaFetcherThread](
name = "ReplicaFetcherManager on broker " + brokerConfig.brokerId,
clientId = "Replica",
@@ -44,7 +45,7 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId", logContext)
val fetchSessionHandler = new FetchSessionHandler(logContext,
sourceBroker.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint,
fetchSessionHandler, brokerConfig,
- replicaManager, quotaManager, metadataVersionSupplier)
+ replicaManager, quotaManager, metadataVersionSupplier,
brokerEpochSupplier)
new ReplicaFetcherThread(threadName, leader, brokerConfig,
failedPartitions, replicaManager,
quotaManager, logContext.logPrefix, metadataVersionSupplier)
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1fd21104208..c3530fd9a55 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -194,6 +194,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedDeleteRecordsPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
delayedElectLeaderPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
threadNamePrefix: Option[String] = None,
+ brokerEpochSupplier: () => Long = () => -1
) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
@@ -1965,7 +1966,7 @@ class ReplicaManager(val config: KafkaConfig,
}
protected def createReplicaFetcherManager(metrics: Metrics, time: Time,
threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = {
- new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix,
quotaManager, () => metadataCache.metadataVersion())
+ new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix,
quotaManager, () => metadataCache.metadataVersion(), brokerEpochSupplier)
}
protected def createReplicaAlterLogDirsManager(quotaManager:
ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = {
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 19c351ad8b6..649fed0ae26 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -413,7 +413,7 @@ private class ReplicaFetcher(name: String, sourceBroker:
Node, topicPartitions:
0L, fetchSize, Optional.empty()))
val fetchRequestBuilder = FetchRequest.Builder.
- forReplica(ApiKeys.FETCH.latestVersion,
FetchRequest.DEBUGGING_CONSUMER_ID, maxWait, minBytes, requestMap)
+ forReplica(ApiKeys.FETCH.latestVersion,
FetchRequest.DEBUGGING_CONSUMER_ID, -1, maxWait, minBytes, requestMap)
debug("Issuing fetch request ")
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index cb260383294..98116dda9fc 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest {
@ClusterTest
public void testDefaults(ClusterConfig config) {
- Assertions.assertEquals(MetadataVersion.IBP_3_5_IV0,
config.metadataVersion());
+ Assertions.assertEquals(MetadataVersion.IBP_3_5_IV1,
config.metadataVersion());
}
}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java
b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index 2b8fa96e56e..813b6d65758 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -41,6 +41,6 @@ public @interface ClusterTest {
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
- MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV0;
+ MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV1;
ClusterConfigProperty[] serverProperties() default {};
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e8f2ea88c49..681067834f0 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -385,7 +385,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
partitionMap.put(tp, new
requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic,
Uuid.ZERO_UUID),
0, 0, 100, Optional.of(27)))
val version = ApiKeys.FETCH.latestVersion
- requests.FetchRequest.Builder.forReplica(version, 5000, 100, Int.MaxValue,
partitionMap).build()
+ requests.FetchRequest.Builder.forReplica(version, 5000, -1, 100,
Int.MaxValue, partitionMap).build()
}
private def createListOffsetsRequest = {
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 074ae92a056..1710d713d0f 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -173,6 +173,7 @@ class DelayedFetchTest {
new FetchParams(
ApiKeys.FETCH.latestVersion,
replicaId,
+ 1,
maxWaitMs,
1,
maxBytes,
diff --git
a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index 07be85b1e6f..54bc462e973 100644
---
a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -47,7 +47,7 @@ class MetadataVersionIntegrationTest {
assertEquals(ff.maxVersionLevel(),
clusterInstance.config().metadataVersion().featureLevel())
// Update to new version
- val updateVersion = MetadataVersion.IBP_3_5_IV0.featureLevel.shortValue
+ val updateVersion = MetadataVersion.IBP_3_5_IV1.featureLevel.shortValue
val updateResult = admin.updateFeatures(
Map("metadata.version" -> new FeatureUpdate(updateVersion,
UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
updateResult.all().get()
diff --git a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
index 5232f60b53d..cac71c2adea 100644
--- a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
@@ -18,22 +18,27 @@
package kafka.server
import kafka.cluster.BrokerEndPoint
+import kafka.log.UnifiedLog
+import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.epoch.util.MockBlockingSender
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.errors.{FencedLeaderEpochException,
UnknownLeaderEpochException}
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.LogContext
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicPartition, Uuid}
import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
-import org.mockito.Mockito.mock
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.Mockito.{mock, when}
import java.util
+import scala.collection.Map
import scala.jdk.CollectionConverters._
class RemoteLeaderEndPointTest {
@@ -43,8 +48,10 @@ class RemoteLeaderEndPointTest {
val logStartOffset = 20
val localLogStartOffset = 100
val logEndOffset = 300
+ val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
var blockingSend: MockBlockingSender = _
var endPoint: LeaderEndPoint = _
+ var currentBrokerEpoch = 1L
@BeforeEach
def setUp(): Unit = {
@@ -54,11 +61,10 @@ class RemoteLeaderEndPointTest {
val props = TestUtils.createBrokerConfig(sourceBroker.id,
TestUtils.MockZkConnect, port = sourceBroker.port)
val fetchSessionHandler = new FetchSessionHandler(new
LogContext(logPrefix), sourceBroker.id)
val config = KafkaConfig.fromProps(props)
- val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
blockingSend = new MockBlockingSender(offsets = new
util.HashMap[TopicPartition, EpochEndOffset](),
sourceBroker = sourceBroker, time = time)
endPoint = new RemoteLeaderEndPoint(logPrefix, blockingSend,
fetchSessionHandler,
- config, replicaManager, QuotaFactory.UnboundedQuota, () =>
MetadataVersion.MINIMUM_KRAFT_VERSION)
+ config, replicaManager, QuotaFactory.UnboundedQuota, () =>
MetadataVersion.MINIMUM_KRAFT_VERSION, () => currentBrokerEpoch)
}
@Test
@@ -116,4 +122,25 @@ class RemoteLeaderEndPointTest {
assertThrows(classOf[UnknownLeaderEpochException], () =>
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch + 1))
assertThrows(classOf[UnknownLeaderEpochException], () =>
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch + 1))
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+ def testBrokerEpochSupplier(version: Short): Unit = {
+ val tp = new TopicPartition("topic1", 0)
+ val topicId1 = Uuid.randomUuid()
+ val log = mock(classOf[UnifiedLog])
+ val partitionMap = Map(
+ tp -> PartitionFetchState(Some(topicId1), 150, None, 0, None,
state = Fetching, lastFetchedEpoch = None))
+ when(replicaManager.localLogOrException(tp)).thenReturn(log)
+ when(log.logStartOffset).thenReturn(1)
+
+ val ResultWithPartitions(fetchRequestOpt, partitionsWithError) =
endPoint.buildFetch(partitionMap)
+ assertTrue(partitionsWithError.isEmpty)
+ assertEquals(if (version < 15) -1L else 1L,
fetchRequestOpt.get.fetchRequest.build(version).replicaEpoch)
+
+ currentBrokerEpoch = 2L
+ val ResultWithPartitions(newFetchRequestOpt, newPartitionsWithError) =
endPoint.buildFetch(partitionMap)
+ assertTrue(newPartitionsWithError.isEmpty)
+ assertEquals(if (version < 15) -1L else 2L,
newFetchRequestOpt.get.fetchRequest.build(version).replicaEpoch)
+ }
}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
index 81508e64860..48337d75720 100644
--- a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
@@ -26,7 +26,7 @@ import
org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.{SAFE_DOWNGRADE,
import org.apache.kafka.clients.admin.MockAdminClient
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.common.MetadataVersion.{IBP_3_3_IV0,
IBP_3_3_IV1, IBP_3_3_IV2, IBP_3_3_IV3, IBP_3_5_IV0}
+import org.apache.kafka.server.common.MetadataVersion.{IBP_3_3_IV0,
IBP_3_3_IV1, IBP_3_3_IV2, IBP_3_3_IV3, IBP_3_5_IV1}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
@@ -84,7 +84,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
Array("--bootstrap-server", bootstrapServers(), "describe"), env.out))
assertEquals(String.format(
"Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
- "SupportedMaxVersion: 3.5-IV0\tFinalizedVersionLevel: 3.3-IV1\t"),
+ "SupportedMaxVersion: 3.5-IV1\tFinalizedVersionLevel: 3.3-IV1\t"),
env.outputWithoutEpoch())
}
}
@@ -145,7 +145,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server",
bootstrapServers(),
"disable", "--feature", "metadata.version"), env.out))
assertEquals("Could not disable metadata.version. Invalid update version
0 for feature " +
- "metadata.version. Local controller 1000 only supports versions 1-9",
env.outputWithoutEpoch())
+ "metadata.version. Local controller 1000 only supports versions 1-10",
env.outputWithoutEpoch())
}
TestUtils.resource(FeatureCommandTestEnv()) { env =>
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server",
bootstrapServers(),
@@ -173,8 +173,8 @@ class FeatureCommandUnitTest {
@Test
def testMetadataVersionsToString(): Unit = {
- assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.5-IV0",
- FeatureCommand.metadataVersionsToString(IBP_3_3_IV0, IBP_3_5_IV0))
+ assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.5-IV0,
3.5-IV1",
+ FeatureCommand.metadataVersionsToString(IBP_3_3_IV0, IBP_3_5_IV1))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 0d8cc47ce1f..3de0f4f8751 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -390,6 +390,7 @@ class PartitionLockTest extends Logging {
val fetchParams = new FetchParams(
ApiKeys.FETCH.latestVersion,
followerId,
+ 1,
0L,
1,
maxBytes,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 31396f46051..b01ee76210a 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -108,6 +108,7 @@ object PartitionTest {
def followerFetchParams(
replicaId: Int,
+ replicaEpoch: Long = 1L,
maxWaitMs: Long = 0L,
minBytes: Int = 1,
maxBytes: Int = Int.MaxValue
@@ -115,6 +116,7 @@ object PartitionTest {
new FetchParams(
ApiKeys.FETCH.latestVersion,
replicaId,
+ replicaEpoch,
maxWaitMs,
minBytes,
maxBytes,
@@ -133,6 +135,7 @@ object PartitionTest {
new FetchParams(
ApiKeys.FETCH.latestVersion,
FetchRequest.CONSUMER_REPLICA_ID,
+ -1,
maxWaitMs,
minBytes,
maxBytes,
diff --git a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
index eaa21dd8173..af230f66553 100644
--- a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
@@ -21,15 +21,18 @@ import java.util
import java.util.Collections
import org.apache.kafka.clients.MockClient.MockMetadataUpdater
import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.common.message.FetchRequestData.ReplicaState
import org.apache.kafka.common.message.{BeginQuorumEpochResponseData,
EndQuorumEpochResponseData, FetchResponseData, VoteResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
-import org.apache.kafka.common.requests.{AbstractResponse,
ApiVersionsResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse,
EndQuorumEpochRequest, EndQuorumEpochResponse, FetchResponse, VoteRequest,
VoteResponse}
+import org.apache.kafka.common.requests.{AbstractResponse,
ApiVersionsResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse,
EndQuorumEpochRequest, EndQuorumEpochResponse, FetchRequest, FetchResponse,
VoteRequest, VoteResponse}
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.common.{Node, TopicPartition, Uuid}
import org.apache.kafka.raft.RaftConfig.InetAddressSpec
import org.apache.kafka.raft.{RaftRequest, RaftUtil}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
import scala.jdk.CollectionConverters._
@@ -159,6 +162,28 @@ class KafkaNetworkChannelTest {
}
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+ def testFetchRequestDowngrade(version: Short): Unit = {
+ val destinationId = 2
+ val destinationNode = new Node(destinationId, "127.0.0.1", 9092)
+ channel.updateEndpoint(destinationId, new InetAddressSpec(
+ new InetSocketAddress(destinationNode.host, destinationNode.port)))
+ sendTestRequest(ApiKeys.FETCH, destinationId)
+ channel.pollOnce()
+
+ assertEquals(1, client.requests.size)
+ val request = client.requests.peek.requestBuilder.build(version)
+
+ if (version < 15) {
+ assertTrue(request.asInstanceOf[FetchRequest].data.replicaId == 1)
+
assertTrue(request.asInstanceOf[FetchRequest].data.replicaState.replicaId == -1)
+ } else {
+ assertTrue(request.asInstanceOf[FetchRequest].data.replicaId == -1)
+
assertTrue(request.asInstanceOf[FetchRequest].data.replicaState.replicaId == 1)
+ }
+ }
+
private def sendTestRequest(
apiKey: ApiKeys,
destinationId: Int,
@@ -216,7 +241,7 @@ class KafkaNetworkChannelTest {
.setFetchOffset(333)
.setLastFetchedEpoch(5)
})
- request.setReplicaId(1)
+ request.setReplicaState(new ReplicaState().setReplicaId(1))
case _ =>
throw new AssertionError(s"Unexpected api $key")
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index e959251b6ea..2af7f105244 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -1371,7 +1371,7 @@ class AbstractFetcherThreadTest {
1024 * 1024, Optional.of[Integer](state.currentLeaderEpoch),
lastFetchedEpoch))
}
}
- val fetchRequest = FetchRequest.Builder.forReplica(version, replicaId,
0, 1, fetchData.asJava)
+ val fetchRequest = FetchRequest.Builder.forReplica(version, replicaId,
1, 0, 1, fetchData.asJava)
val fetchRequestOpt =
if (fetchData.isEmpty)
None
diff --git
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index d01228c1d13..b713e576109 100644
---
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -257,7 +257,7 @@ class FetchRequestDownConversionConfigTest extends
BaseRequestTest {
val partitionMap = createPartitionMap(1024, partitions, topicIdMap)
val fetchRequest = replicaIdOpt.map { replicaId =>
- FetchRequest.Builder.forReplica(fetchVersion, replicaId, Int.MaxValue,
0, partitionMap)
+ FetchRequest.Builder.forReplica(fetchVersion, replicaId, -1,
Int.MaxValue, 0, partitionMap)
.build(fetchVersion)
}.getOrElse {
FetchRequest.Builder.forConsumer(fetchVersion, Int.MaxValue, 0,
partitionMap)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 9104d2840de..0034e429b92 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -627,7 +627,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
producer.close()
// fetch request with version below v10: UNSUPPORTED_COMPRESSION_TYPE
error occurs
- val req0 = new FetchRequest.Builder(0, 9, -1, Int.MaxValue, 0,
+ val req0 = new FetchRequest.Builder(0, 9, -1, -1, Int.MaxValue, 0,
createPartitionMap(300, Seq(topicPartition), Map.empty))
.setMaxBytes(800).build()
@@ -636,7 +636,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE.code, data0.errorCode)
// fetch request with version 10: works fine!
- val req1= new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0,
+ val req1= new FetchRequest.Builder(0, 10, -1, -1, Int.MaxValue, 0,
createPartitionMap(300, Seq(topicPartition), Map.empty))
.setMaxBytes(800).build()
val res1 = sendFetchRequest(leaderId, req1)
@@ -644,7 +644,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(Errors.NONE.code, data1.errorCode)
assertEquals(3, records(data1).size)
- val req2 = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion(),
ApiKeys.FETCH.latestVersion(), -1, Int.MaxValue, 0,
+ val req2 = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion(),
ApiKeys.FETCH.latestVersion(), -1, -1, Int.MaxValue, 0,
createPartitionMap(300, Seq(topicPartition), Map.empty))
.setMaxBytes(800).build()
val res2 = sendFetchRequest(leaderId, req2)
@@ -683,7 +683,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
// fetch request with fetch version v1 (magic 0):
// gzip compressed record is returned with down-conversion.
// zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error.
- val req0 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+ val req0 = new FetchRequest.Builder(0, 1, -1, -1, Int.MaxValue, 0,
createPartitionMap(300, Seq(topicPartition), Map.empty))
.setMaxBytes(800)
.build()
@@ -693,7 +693,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(Errors.NONE.code, data0.errorCode)
assertEquals(1, records(data0).size)
- val req1 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+ val req1 = new FetchRequest.Builder(0, 1, -1, -1, Int.MaxValue, 0,
createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L)))
.setMaxBytes(800).build()
@@ -704,7 +704,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
// fetch request with fetch version v3 (magic 1):
// gzip compressed record is returned with down-conversion.
// zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error.
- val req2 = new FetchRequest.Builder(2, 3, -1, Int.MaxValue, 0,
+ val req2 = new FetchRequest.Builder(2, 3, -1, -1, Int.MaxValue, 0,
createPartitionMap(300, Seq(topicPartition), Map.empty))
.setMaxBytes(800).build()
@@ -713,7 +713,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(Errors.NONE.code, data2.errorCode)
assertEquals(1, records(data2).size)
- val req3 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+ val req3 = new FetchRequest.Builder(0, 1, -1, -1, Int.MaxValue, 0,
createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L)))
.setMaxBytes(800).build()
@@ -722,7 +722,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE.code, data3.errorCode)
// fetch request with version 10: works fine!
- val req4 = new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0,
+ val req4 = new FetchRequest.Builder(0, 10, -1, -1, Int.MaxValue, 0,
createPartitionMap(300, Seq(topicPartition), Map.empty))
.setMaxBytes(800).build()
val res4 = sendFetchRequest(leaderId, req4)
@@ -730,7 +730,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(Errors.NONE.code, data4.errorCode)
assertEquals(3, records(data4).size)
- val req5 = new FetchRequest.Builder(0, ApiKeys.FETCH.latestVersion(), -1,
Int.MaxValue, 0,
+ val req5 = new FetchRequest.Builder(0, ApiKeys.FETCH.latestVersion(), -1,
-1, Int.MaxValue, 0,
createPartitionMap(300, Seq(topicPartition), Map.empty))
.setMaxBytes(800).build()
val res5 = sendFetchRequest(leaderId, req5)
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index 538a061eea3..2275673aba3 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -140,14 +140,14 @@ class FetchSessionTest {
toForget: util.List[TopicIdPartition], isFromFollower:
Boolean,
version: Short = ApiKeys.FETCH.latestVersion):
FetchRequest = {
new FetchRequest.Builder(version, version, if (isFromFollower) 1 else
FetchRequest.CONSUMER_REPLICA_ID,
- 0, 0, fetchData).metadata(metadata).removed(toForget).build
+ if (isFromFollower) 1 else -1, 0, 0,
fetchData).metadata(metadata).removed(toForget).build
}
def createRequestWithoutTopicIds(metadata: JFetchMetadata,
fetchData: util.Map[TopicPartition,
FetchRequest.PartitionData],
toForget: util.List[TopicIdPartition], isFromFollower:
Boolean): FetchRequest = {
new FetchRequest.Builder(12, 12, if (isFromFollower) 1 else
FetchRequest.CONSUMER_REPLICA_ID,
- 0, 0, fetchData).metadata(metadata).removed(toForget).build
+ if (isFromFollower) 1 else -1, 0, 0,
fetchData).metadata(metadata).removed(toForget).build
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 63fd69b61b3..48303b7a52d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3366,7 +3366,7 @@ class KafkaApisTest {
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
- val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0,
fetchDataBuilder)
+ val fetchRequest = new FetchRequest.Builder(9, 9, -1, -1, 100, 0,
fetchDataBuilder)
.build()
val request = buildRequest(fetchRequest)
@@ -3421,8 +3421,9 @@ class KafkaApisTest {
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
// If replicaId is -1 we will build a consumer request. Any non-negative
replicaId will build a follower request.
+ val replicaEpoch = if (replicaId < 0) -1 else 1
val fetchRequest = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion,
ApiKeys.FETCH.latestVersion,
- replicaId, 100, 0, fetchDataBuilder).metadata(fetchMetadata).build()
+ replicaId, replicaEpoch, 100, 0,
fetchDataBuilder).metadata(fetchMetadata).build()
val request = buildRequest(fetchRequest)
createKafkaApis().handleFetchRequest(request)
@@ -4528,7 +4529,7 @@ class KafkaApisTest {
val fetchDataBuilder = Collections.singletonMap(tp0, new
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, Int.MaxValue,
Optional.of(leaderEpoch)))
val fetchData = Collections.singletonMap(tidp0, new
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, Int.MaxValue,
Optional.of(leaderEpoch)))
val fetchFromFollower = buildRequest(new FetchRequest.Builder(
- ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1000,
0, fetchDataBuilder).build())
+ ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1,
1000, 0, fetchDataBuilder).build())
val records = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 582cd47ee73..0cc27854e50 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -279,6 +279,7 @@ class ReplicaAlterLogDirsThreadTest {
val expectedFetchParams = new FetchParams(
ApiKeys.FETCH.latestVersion,
FetchRequest.FUTURE_LOCAL_REPLICA_ID,
+ -1,
0L,
0,
config.replicaFetchResponseMaxBytes,
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 500bc23447f..bc3ea086f0c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -107,7 +107,7 @@ class ReplicaFetcherThreadTest {
val logContext = new LogContext(s"[ReplicaFetcher
replicaId=${brokerConfig.brokerId},
leaderId=${leaderEndpointBlockingSend.brokerEndPoint().id},
fetcherId=$fetcherId] ")
val fetchSessionHandler = new FetchSessionHandler(logContext,
leaderEndpointBlockingSend.brokerEndPoint().id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix,
leaderEndpointBlockingSend, fetchSessionHandler,
- brokerConfig, replicaMgr, quota, () =>
brokerConfig.interBrokerProtocolVersion)
+ brokerConfig, replicaMgr, quota, () =>
brokerConfig.interBrokerProtocolVersion, () => 1)
new ReplicaFetcherThread(name,
leader,
brokerConfig,
@@ -587,7 +587,7 @@ class ReplicaFetcherThreadTest {
val logContext = new LogContext(s"[ReplicaFetcher
replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
val fetchSessionHandler = new FetchSessionHandler(logContext,
brokerEndPoint.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockNetwork,
fetchSessionHandler, config,
- replicaManager, quota, () => config.interBrokerProtocolVersion)
+ replicaManager, quota, () => config.interBrokerProtocolVersion, () => 1)
val thread = new ReplicaFetcherThread("bob", leader, config,
failedPartitions,
replicaManager, quota, logContext.logPrefix, () =>
config.interBrokerProtocolVersion) {
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = None
@@ -701,7 +701,8 @@ class ReplicaFetcherThreadTest {
config,
replicaManager,
quota,
- () => config.interBrokerProtocolVersion
+ () => config.interBrokerProtocolVersion,
+ () => 1
)
val thread = new ReplicaFetcherThread(
@@ -1125,7 +1126,7 @@ class ReplicaFetcherThreadTest {
val logContext = new LogContext(s"[ReplicaFetcher
replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
val fetchSessionHandler = new FetchSessionHandler(logContext,
brokerEndPoint.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix,
mockBlockingSend, fetchSessionHandler, config,
- replicaManager, replicaQuota, () => config.interBrokerProtocolVersion)
+ replicaManager, replicaQuota, () => config.interBrokerProtocolVersion,
() => 1)
val thread = new ReplicaFetcherThread("bob",
leader,
config,
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 35209bfe877..5ffdf41878d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -231,6 +231,7 @@ class ReplicaManagerConcurrencyTest {
val fetchParams = new FetchParams(
ApiKeys.FETCH.latestVersion,
replicaId,
+ 1,
random.nextInt(100),
1,
1024 * 1024,
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 9de737be511..ce727f83c74 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -174,6 +174,7 @@ class ReplicaManagerQuotasTest {
val fetchParams = new FetchParams(
ApiKeys.FETCH.latestVersion,
1,
+ 1,
600,
1,
1000,
@@ -225,6 +226,7 @@ class ReplicaManagerQuotasTest {
val fetchParams = new FetchParams(
ApiKeys.FETCH.latestVersion,
FetchRequest.CONSUMER_REPLICA_ID,
+ -1,
600L,
1,
1000,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 047c5a5ac84..0ef3127fe1a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2217,14 +2217,14 @@ class ReplicaManagerTest {
time: Time,
threadNamePrefix:
Option[String],
replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
- new ReplicaFetcherManager(config, this, metrics, time,
threadNamePrefix, replicationQuotaManager, () =>
metadataCache.metadataVersion()) {
+ new ReplicaFetcherManager(config, this, metrics, time,
threadNamePrefix, replicationQuotaManager, () =>
metadataCache.metadataVersion(), () => 1) {
override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): ReplicaFetcherThread = {
val logContext = new LogContext(s"[ReplicaFetcher
replicaId=${config.brokerId}, leaderId=${sourceBroker.id}, " +
s"fetcherId=$fetcherId] ")
val fetchSessionHandler = new FetchSessionHandler(logContext,
sourceBroker.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix,
blockingSend, fetchSessionHandler, config,
- replicaManager, quotaManager.follower, () =>
config.interBrokerProtocolVersion)
+ replicaManager, quotaManager.follower, () =>
config.interBrokerProtocolVersion, () => 1)
new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId",
leader, config, failedPartitions, replicaManager,
quotaManager.follower, logContext.logPrefix, () =>
config.interBrokerProtocolVersion) {
override def doWork(): Unit = {
@@ -2415,6 +2415,7 @@ class ReplicaManagerTest {
val params = new FetchParams(
requestVersion,
replicaId,
+ 1,
maxWaitMs,
minBytes,
maxBytes,
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
index a428f9152e4..f7321b5bc65 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
@@ -90,7 +90,7 @@ public class FetchRequestBenchmark {
this.header = new RequestHeader(ApiKeys.FETCH,
ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
this.consumerRequest =
FetchRequest.Builder.forConsumer(ApiKeys.FETCH.latestVersion(), 0, 0, fetchData)
.build(ApiKeys.FETCH.latestVersion());
- this.replicaRequest =
FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion(), 1, 0, 0,
fetchData)
+ this.replicaRequest =
FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion(), 1, 1, 0, 0,
fetchData)
.build(ApiKeys.FETCH.latestVersion());
this.requestBuffer = this.consumerRequest.serialize();
@@ -111,7 +111,7 @@ public class FetchRequestBenchmark {
@Benchmark
public int testFetchRequestForReplica() {
FetchRequest fetchRequest = FetchRequest.Builder.forReplica(
- ApiKeys.FETCH.latestVersion(), 1, 0, 0, fetchData)
+ ApiKeys.FETCH.latestVersion(), 1, 1, 0, 0, fetchData)
.build(ApiKeys.FETCH.latestVersion());
return fetchRequest.fetchData(topicNames).size();
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index e37a197655c..8f764957fcf 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -314,7 +314,8 @@ public class ReplicaFetcherThreadBenchmark {
config,
replicaManager,
replicaQuota,
- config::interBrokerProtocolVersion
+ config::interBrokerProtocolVersion,
+ () -> -1
) {
@Override
public OffsetAndEpoch
fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index f85b1f921da..4e3c35dde2a 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -165,7 +165,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_3_5_IV0)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_3_5_IV1)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testConfigurationOperations(controlEnv.activeController());
@@ -206,7 +206,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_3_5_IV0)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_3_5_IV1)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testDelayedConfigurationOperations(logEnv,
controlEnv.activeController());
@@ -536,7 +536,7 @@ public class QuorumControllerTest {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_3_5_IV0)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_3_5_IV1)).
setListeners(listeners));
assertEquals(2L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =
@@ -957,7 +957,7 @@ public class QuorumControllerTest {
.setBrokerId(brokerId)
.setRack(null)
.setClusterId(controller.clusterId())
- .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_3_5_IV0))
+ .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_3_5_IV1))
.setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB"
+ brokerId))
.setListeners(
new ListenerCollection(
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 393909b390c..da8cb2a4bbe 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -49,6 +49,7 @@ import
org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.VoteRequest;
@@ -958,7 +959,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
Errors.INVALID_REQUEST, Optional.empty()));
}
- FetchResponseData response =
tryCompleteFetchRequest(request.replicaId(), fetchPartition, currentTimeMs);
+ int replicaId = FetchRequest.replicaId(request);
+ FetchResponseData response = tryCompleteFetchRequest(replicaId,
fetchPartition, currentTimeMs);
FetchResponseData.PartitionData partitionResponse =
response.responses().get(0).partitions().get(0);
@@ -983,16 +985,16 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
Errors error = Errors.forException(cause);
if (error != Errors.REQUEST_TIMED_OUT) {
logger.debug("Failed to handle fetch from {} at {} due to
{}",
- request.replicaId(), fetchPartition.fetchOffset(),
error);
+ replicaId, fetchPartition.fetchOffset(), error);
return buildEmptyFetchResponse(error, Optional.empty());
}
}
// FIXME: `completionTimeMs`, which can be null
logger.trace("Completing delayed fetch from {} starting at offset
{} at {}",
- request.replicaId(), fetchPartition.fetchOffset(),
completionTimeMs);
+ replicaId, fetchPartition.fetchOffset(), completionTimeMs);
- return tryCompleteFetchRequest(request.replicaId(),
fetchPartition, time.milliseconds());
+ return tryCompleteFetchRequest(replicaId, fetchPartition,
time.milliseconds());
});
}
@@ -1791,7 +1793,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
.setMaxBytes(MAX_FETCH_SIZE_BYTES)
.setMaxWaitMs(fetchMaxWaitMs)
.setClusterId(clusterId)
- .setReplicaId(quorum.localIdOrSentinel());
+ .setReplicaState(new
FetchRequestData.ReplicaState().setReplicaId(quorum.localIdOrSentinel()));
}
private long maybeSendAnyVoterFetch(long currentTimeMs) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index b843b8a09b8..7cbeb11f4f7 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -25,8 +25,10 @@ import
org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
@@ -35,11 +37,14 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
import org.mockito.Mockito;
import java.io.IOException;
@@ -1436,6 +1441,33 @@ public class KafkaRaftClientTest {
context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST,
epoch, OptionalInt.of(localId));
}
+ // This test mainly focuses on whether the leader state is correctly
updated under different fetch version.
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+ public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short
version) throws Exception {
+ int localId = 0;
+ int otherNodeId = 1;
+ int epoch = 5;
+ Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+ RaftClientTestContext context =
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+ // First poll has no high watermark advance.
+ context.client.poll();
+ assertEquals(OptionalLong.empty(), context.client.highWatermark());
+ assertEquals(1L, context.log.endOffset().offset);
+
+ // Now we will advance the high watermark with a follower fetch
request.
+ FetchRequestData fetchRequestData = context.fetchRequest(epoch,
otherNodeId, 1L, epoch, 0);
+ FetchRequestData request = new
FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
+ assertEquals((version < 15) ? 1 : -1, fetchRequestData.replicaId());
+ assertEquals((version < 15) ? -1 : 1,
fetchRequestData.replicaState().replicaId());
+ context.deliverRequest(request);
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(Errors.NONE, epoch,
OptionalInt.of(localId));
+ assertEquals(OptionalLong.of(1L), context.client.highWatermark());
+ }
+
@Test
public void testFetchRequestClusterIdValidation() throws Exception {
int localId = 0;
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index b79eddf0029..50647f8df3a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -984,7 +984,7 @@ public final class RaftClientTestContext {
assertEquals(epoch, fetchPartition.currentLeaderEpoch());
assertEquals(fetchOffset, fetchPartition.fetchOffset());
assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
- assertEquals(localId.orElse(-1), request.replicaId());
+ assertEquals(localId.orElse(-1), request.replicaState().replicaId());
// Assert that voters have flushed up to the fetch offset
if (localId.isPresent() && voters.contains(localId.getAsInt())) {
@@ -1036,7 +1036,7 @@ public final class RaftClientTestContext {
return request
.setMaxWaitMs(maxWaitTimeMs)
.setClusterId(clusterId)
- .setReplicaId(replicaId);
+ .setReplicaState(new
FetchRequestData.ReplicaState().setReplicaId(replicaId));
}
FetchResponseData fetchResponse(
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 67d878bc60e..b0593f40d07 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -168,7 +168,10 @@ public enum MetadataVersion {
IBP_3_4_IV0(8, "3.4", "IV0", true),
// Support for tiered storage (KIP-405) and SCRAM
- IBP_3_5_IV0(9, "3.5", "IV0", false);
+ IBP_3_5_IV0(9, "3.5", "IV0", false),
+
+ // Adds replica epoch to Fetch request (KIP-903).
+ IBP_3_5_IV1(10, "3.5", "IV1", false);
// NOTE: update the default version in @ClusterTest annotation to point to
the latest version
public static final String FEATURE_NAME = "metadata.version";
@@ -292,7 +295,9 @@ public enum MetadataVersion {
}
public short fetchRequestVersion() {
- if (this.isAtLeast(IBP_3_5_IV0)) {
+ if (this.isAtLeast(IBP_3_5_IV1)) {
+ return 15;
+ } else if (this.isAtLeast(IBP_3_5_IV0)) {
return 14;
} else if (this.isAtLeast(IBP_3_1_IV0)) {
return 13;
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 885596a9c20..75bbad2aff5 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -158,6 +158,7 @@ class MetadataVersionTest {
assertEquals(IBP_3_4_IV0,
MetadataVersion.fromVersionString("3.4-IV0"));
assertEquals(IBP_3_5_IV0,
MetadataVersion.fromVersionString("3.5-IV0"));
+ assertEquals(IBP_3_5_IV1,
MetadataVersion.fromVersionString("3.5-IV1"));
}
@Test
@@ -207,6 +208,7 @@ class MetadataVersionTest {
assertEquals("3.3", IBP_3_3_IV3.shortVersion());
assertEquals("3.4", IBP_3_4_IV0.shortVersion());
assertEquals("3.5", IBP_3_5_IV0.shortVersion());
+ assertEquals("3.5", IBP_3_5_IV1.shortVersion());
}
@Test
@@ -245,6 +247,7 @@ class MetadataVersionTest {
assertEquals("3.3-IV3", IBP_3_3_IV3.version());
assertEquals("3.4-IV0", IBP_3_4_IV0.version());
assertEquals("3.5-IV0", IBP_3_5_IV0.version());
+ assertEquals("3.5-IV1", IBP_3_5_IV1.version());
}
@Test
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
index ff2308a83c4..700a8e40d5f 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
@@ -25,6 +25,7 @@ import java.util.Optional;
public class FetchParams {
public final short requestVersion;
public final int replicaId;
+ public final long replicaEpoch;
public final long maxWaitMs;
public final int minBytes;
public final int maxBytes;
@@ -33,6 +34,7 @@ public class FetchParams {
public FetchParams(short requestVersion,
int replicaId,
+ long replicaEpoch,
long maxWaitMs,
int minBytes,
int maxBytes,
@@ -42,6 +44,7 @@ public class FetchParams {
Objects.requireNonNull(clientMetadata);
this.requestVersion = requestVersion;
this.replicaId = replicaId;
+ this.replicaEpoch = replicaEpoch;
this.maxWaitMs = maxWaitMs;
this.minBytes = minBytes;
this.maxBytes = maxBytes;
@@ -72,6 +75,7 @@ public class FetchParams {
FetchParams that = (FetchParams) o;
return requestVersion == that.requestVersion
&& replicaId == that.replicaId
+ && replicaEpoch == that.replicaEpoch
&& maxWaitMs == that.maxWaitMs
&& minBytes == that.minBytes
&& maxBytes == that.maxBytes
@@ -83,6 +87,7 @@ public class FetchParams {
public int hashCode() {
int result = requestVersion;
result = 31 * result + replicaId;
+ result = 31 * result + (int) replicaEpoch;
result = 31 * result + Long.hashCode(32);
result = 31 * result + minBytes;
result = 31 * result + maxBytes;
@@ -96,6 +101,7 @@ public class FetchParams {
return "FetchParams(" +
"requestVersion=" + requestVersion +
", replicaId=" + replicaId +
+ ", replicaEpoch=" + replicaEpoch +
", maxWaitMs=" + maxWaitMs +
", minBytes=" + minBytes +
", maxBytes=" + maxBytes +