This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 79b5f7f1ce2 KAFKA-14617: Add ReplicaState to FetchRequest (KIP-903) 
(#13323)
79b5f7f1ce2 is described below

commit 79b5f7f1ce2791fcebf70a447e5530c60b768702
Author: Calvin Liu <[email protected]>
AuthorDate: Thu Mar 16 06:04:34 2023 -0700

    KAFKA-14617: Add ReplicaState to FetchRequest (KIP-903) (#13323)
    
    This patch is the first part of KIP-903. It updates the FetchRequest to 
include the new tagged ReplicaState field which replaces the now deprecated 
ReplicaId field. The FetchRequest version is bumped to version 15 and the 
MetadataVersion to 3.5-IV1.
    
    Reviewers: David Jacot <[email protected]>
---
 .../apache/kafka/common/requests/FetchRequest.java | 55 +++++++++++++++++++---
 .../resources/common/message/FetchRequest.json     | 13 ++++-
 .../resources/common/message/FetchResponse.json    |  4 +-
 .../kafka/common/requests/FetchRequestTest.java    | 41 ++++++++++++++--
 .../server/builders/ReplicaManagerBuilder.java     |  9 +++-
 .../scala/kafka/raft/KafkaNetworkChannel.scala     |  6 +--
 .../src/main/scala/kafka/server/BrokerServer.scala |  3 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  1 +
 core/src/main/scala/kafka/server/KafkaServer.scala |  3 +-
 .../scala/kafka/server/LocalLeaderEndPoint.scala   |  3 +-
 .../scala/kafka/server/RemoteLeaderEndPoint.scala  |  5 +-
 .../scala/kafka/server/ReplicaFetcherManager.scala |  5 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  3 +-
 .../kafka/tools/ReplicaVerificationTool.scala      |  2 +-
 .../java/kafka/test/ClusterTestExtensionsTest.java |  2 +-
 .../java/kafka/test/annotation/ClusterTest.java    |  2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  2 +-
 .../kafka/server/DelayedFetchTest.scala            |  1 +
 .../server/MetadataVersionIntegrationTest.scala    |  2 +-
 .../kafka/server/RemoteLeaderEndPointTest.scala    | 39 ++++++++++++---
 .../unit/kafka/admin/FeatureCommandTest.scala      | 10 ++--
 .../unit/kafka/cluster/PartitionLockTest.scala     |  1 +
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  3 ++
 .../unit/kafka/raft/KafkaNetworkChannelTest.scala  | 29 +++++++++++-
 .../kafka/server/AbstractFetcherThreadTest.scala   |  2 +-
 .../FetchRequestDownConversionConfigTest.scala     |  2 +-
 .../scala/unit/kafka/server/FetchRequestTest.scala | 18 +++----
 .../scala/unit/kafka/server/FetchSessionTest.scala |  4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  7 +--
 .../server/ReplicaAlterLogDirsThreadTest.scala     |  1 +
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  9 ++--
 .../server/ReplicaManagerConcurrencyTest.scala     |  1 +
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  2 +
 .../unit/kafka/server/ReplicaManagerTest.scala     |  5 +-
 .../kafka/jmh/common/FetchRequestBenchmark.java    |  4 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |  3 +-
 .../kafka/controller/QuorumControllerTest.java     |  8 ++--
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 12 +++--
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 32 +++++++++++++
 .../apache/kafka/raft/RaftClientTestContext.java   |  4 +-
 .../kafka/server/common/MetadataVersion.java       |  9 +++-
 .../kafka/server/common/MetadataVersionTest.java   |  3 ++
 .../kafka/storage/internals/log/FetchParams.java   |  6 +++
 43 files changed, 294 insertions(+), 82 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index aaad3b89104..3a286225a65 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.FetchRequestData;
 import org.apache.kafka.common.message.FetchRequestData.ForgottenTopic;
+import org.apache.kafka.common.message.FetchRequestData.ReplicaState;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
@@ -130,10 +131,34 @@ public class FetchRequest extends AbstractRequest {
         }
     }
 
+    // It is only used by KafkaRaftClient for downgrading the FetchRequest.
+    public static class SimpleBuilder extends 
AbstractRequest.Builder<FetchRequest> {
+        private final FetchRequestData fetchRequestData;
+
+        public SimpleBuilder(FetchRequestData fetchRequestData) {
+            super(ApiKeys.FETCH);
+            this.fetchRequestData = fetchRequestData;
+        }
+
+        @Override
+        public FetchRequest build(short version) {
+            if (fetchRequestData.replicaId() >= 0) {
+                throw new IllegalStateException("The replica id should be 
placed in the replicaState of a fetchRequestData");
+            }
+
+            if (version < 15) {
+                
fetchRequestData.setReplicaId(fetchRequestData.replicaState().replicaId());
+                fetchRequestData.setReplicaState(new ReplicaState());
+            }
+            return new FetchRequest(fetchRequestData, version);
+        }
+    }
+
     public static class Builder extends AbstractRequest.Builder<FetchRequest> {
         private final int maxWait;
         private final int minBytes;
         private final int replicaId;
+        private final long replicaEpoch;
         private final Map<TopicPartition, PartitionData> toFetch;
         private IsolationLevel isolationLevel = 
IsolationLevel.READ_UNCOMMITTED;
         private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
@@ -144,18 +169,19 @@ public class FetchRequest extends AbstractRequest {
 
         public static Builder forConsumer(short maxVersion, int maxWait, int 
minBytes, Map<TopicPartition, PartitionData> fetchData) {
             return new Builder(ApiKeys.FETCH.oldestVersion(), maxVersion,
-                CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
+                CONSUMER_REPLICA_ID,  -1, maxWait, minBytes, fetchData);
         }
 
-        public static Builder forReplica(short allowedVersion, int replicaId, 
int maxWait, int minBytes,
+        public static Builder forReplica(short allowedVersion, int replicaId, 
long replicaEpoch, int maxWait, int minBytes,
                                          Map<TopicPartition, PartitionData> 
fetchData) {
-            return new Builder(allowedVersion, allowedVersion, replicaId, 
maxWait, minBytes, fetchData);
+            return new Builder(allowedVersion, allowedVersion, replicaId, 
replicaEpoch, maxWait, minBytes, fetchData);
         }
 
-        public Builder(short minVersion, short maxVersion, int replicaId, int 
maxWait, int minBytes,
+        public Builder(short minVersion, short maxVersion, int replicaId, long 
replicaEpoch, int maxWait, int minBytes,
                        Map<TopicPartition, PartitionData> fetchData) {
             super(ApiKeys.FETCH, minVersion, maxVersion);
             this.replicaId = replicaId;
+            this.replicaEpoch = replicaEpoch;
             this.maxWait = maxWait;
             this.minBytes = minBytes;
             this.toFetch = fetchData;
@@ -228,12 +254,18 @@ public class FetchRequest extends AbstractRequest {
             }
 
             FetchRequestData fetchRequestData = new FetchRequestData();
-            fetchRequestData.setReplicaId(replicaId);
             fetchRequestData.setMaxWaitMs(maxWait);
             fetchRequestData.setMinBytes(minBytes);
             fetchRequestData.setMaxBytes(maxBytes);
             fetchRequestData.setIsolationLevel(isolationLevel.id());
             fetchRequestData.setForgottenTopicsData(new ArrayList<>());
+            if (version < 15) {
+                fetchRequestData.setReplicaId(replicaId);
+            } else {
+                fetchRequestData.setReplicaState(new ReplicaState()
+                    .setReplicaId(replicaId)
+                    .setReplicaEpoch(replicaEpoch));
+            }
 
             Map<String, FetchRequestData.ForgottenTopic> forgottenTopicMap = 
new LinkedHashMap<>();
             addToForgottenTopicMap(removed, forgottenTopicMap);
@@ -302,6 +334,10 @@ public class FetchRequest extends AbstractRequest {
         }
     }
 
+    public static int replicaId(FetchRequestData fetchRequestData) {
+        return fetchRequestData.replicaId() != -1 ? 
fetchRequestData.replicaId() : fetchRequestData.replicaState().replicaId();
+    }
+
     public FetchRequest(FetchRequestData fetchRequestData, short version) {
         super(ApiKeys.FETCH, version);
         this.data = fetchRequestData;
@@ -338,7 +374,14 @@ public class FetchRequest extends AbstractRequest {
     }
 
     public int replicaId() {
-        return data.replicaId();
+        if (version() < 15) {
+            return data.replicaId();
+        }
+        return data.replicaState().replicaId();
+    }
+
+    public long replicaEpoch() {
+        return data.replicaState().replicaEpoch();
     }
 
     public int maxWait() {
diff --git a/clients/src/main/resources/common/message/FetchRequest.json 
b/clients/src/main/resources/common/message/FetchRequest.json
index 903ed48e185..3ac9dccd0ab 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
       "taggedVersions": "12+", "tag": 0, "ignorable": true,
       "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+    { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
       "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+    { "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [
+      { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": 
"-1", "entityType": "brokerId",
+        "about": "The replica ID of the follower, or -1 if this request is 
from a consumer." },
+      { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": 
"-1",
+        "about": "The epoch of this follower, or -1 if not available." }
+    ]},
     { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
       "about": "The maximum time in milliseconds to wait for the response." },
     { "name": "MinBytes", "type": "int32", "versions": "0+",
diff --git a/clients/src/main/resources/common/message/FetchResponse.json 
b/clients/src/main/resources/common/message/FetchResponse.json
index 7d144f01831..366e702cfbd 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -43,7 +43,9 @@
   // Version 13 replaces the topic name field with topic ID (KIP-516).
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException (KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 is the same as version 14 (KIP-903).
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java 
b/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
index a567d43dc63..702341fee83 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.FetchRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -35,6 +37,7 @@ import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class FetchRequestTest {
 
@@ -54,7 +57,7 @@ public class FetchRequestTest {
         List<TopicIdPartition> toReplace = Collections.singletonList(tp);
 
         FetchRequest fetchRequest = FetchRequest.Builder
-                .forReplica(version, 0, 1, 1, partitionData)
+                .forReplica(version, 0, 1, 1, 1, partitionData)
                 .removed(Collections.emptyList())
                 .replaced(toReplace)
                 .metadata(FetchMetadata.newIncremental(123)).build(version);
@@ -97,11 +100,14 @@ public class FetchRequestTest {
         boolean fetchRequestUsesTopicIds = version >= 13;
 
         FetchRequest fetchRequest = FetchRequest.parse(FetchRequest.Builder
-                .forReplica(version, 0, 1, 1, partitionData)
+                .forReplica(version, 0, 1, 1, 1, partitionData)
                 .removed(Collections.emptyList())
                 .replaced(Collections.emptyList())
                 
.metadata(FetchMetadata.newIncremental(123)).build(version).serialize(), 
version);
 
+        if (version >= 15) {
+            assertEquals(1, fetchRequest.data().replicaState().replicaEpoch());
+        }
         // For versions < 13, we will be provided a topic name and a zero UUID 
in FetchRequestData.
         // Versions 13+ will contain a valid topic ID but an empty topic name.
         List<TopicIdPartition> expectedData = new LinkedList<>();
@@ -156,7 +162,7 @@ public class FetchRequestTest {
             boolean fetchRequestUsesTopicIds = version >= 13;
 
             FetchRequest fetchRequest = FetchRequest.parse(FetchRequest.Builder
-                    .forReplica(version, 0, 1, 1, Collections.emptyMap())
+                    .forReplica(version, 0, 1, 1, 1, Collections.emptyMap())
                     .removed(toForgetTopics)
                     .replaced(Collections.emptyList())
                     
.metadata(FetchMetadata.newIncremental(123)).build(version).serialize(), 
version);
@@ -198,6 +204,35 @@ public class FetchRequestTest {
         }
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testFetchRequestSimpleBuilderReplicaStateDowngrade(short 
version) {
+        FetchRequestData fetchRequestData = new FetchRequestData();
+        fetchRequestData.setReplicaState(new 
FetchRequestData.ReplicaState().setReplicaId(1));
+        FetchRequest.SimpleBuilder builder = new 
FetchRequest.SimpleBuilder(fetchRequestData);
+        fetchRequestData = builder.build(version).data();
+
+        assertEquals(1, FetchRequest.replicaId(fetchRequestData));
+
+        if (version < 15) {
+            assertEquals(1, fetchRequestData.replicaId());
+            assertEquals(-1, fetchRequestData.replicaState().replicaId());
+        } else {
+            assertEquals(-1, fetchRequestData.replicaId());
+            assertEquals(1, fetchRequestData.replicaState().replicaId());
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testFetchRequestSimpleBuilderReplicaIdNotSupported(short 
version) {
+        FetchRequestData fetchRequestData = new 
FetchRequestData().setReplicaId(1);
+        FetchRequest.SimpleBuilder builder = new 
FetchRequest.SimpleBuilder(fetchRequestData);
+        assertThrows(IllegalStateException.class, () -> {
+            builder.build(version);
+        });
+    }
+
     @Test
     public void testPartitionDataEquals() {
         assertEquals(new FetchRequest.PartitionData(Uuid.ZERO_UUID, 300, 0L, 
300, Optional.of(300)),
diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 2921818e59d..d9d7c1d82c4 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -61,6 +61,7 @@ public class ReplicaManagerBuilder {
     private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> 
delayedDeleteRecordsPurgatory = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedElectLeader>> 
delayedElectLeaderPurgatory = Optional.empty();
     private Optional<String> threadNamePrefix = Optional.empty();
+    private Long brokerEpoch = -1L;
 
     public ReplicaManagerBuilder setConfig(KafkaConfig config) {
         this.config = config;
@@ -152,6 +153,11 @@ public class ReplicaManagerBuilder {
         return this;
     }
 
+    public ReplicaManagerBuilder setBrokerEpoch(long brokerEpoch) {
+        this.brokerEpoch = brokerEpoch;
+        return this;
+    }
+
     public ReplicaManager build() {
         if (config == null) config = new KafkaConfig(Collections.emptyMap());
         if (metrics == null) metrics = new Metrics();
@@ -176,6 +182,7 @@ public class ReplicaManagerBuilder {
                              OptionConverters.toScala(delayedFetchPurgatory),
                              
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
                              
OptionConverters.toScala(delayedElectLeaderPurgatory),
-                             OptionConverters.toScala(threadNamePrefix));
+                             OptionConverters.toScala(threadNamePrefix),
+                             () -> brokerEpoch);
     }
 }
diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala 
b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index c44d57102c5..0d86e257932 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -42,11 +42,7 @@ object KafkaNetworkChannel {
       case endEpochRequest: EndQuorumEpochRequestData =>
         new EndQuorumEpochRequest.Builder(endEpochRequest)
       case fetchRequest: FetchRequestData =>
-        // Since we already have the request, we go through a simplified 
builder
-        new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
-          override def build(version: Short): FetchRequest = new 
FetchRequest(fetchRequest, version)
-          override def toString: String = fetchRequest.toString
-        }
+        new FetchRequest.SimpleBuilder(fetchRequest)
       case fetchSnapshotRequest: FetchSnapshotRequestData =>
         new FetchSnapshotRequest.Builder(fetchSnapshotRequest)
       case _ =>
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index b73eaa12fee..02ff680614c 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -261,7 +261,8 @@ class BrokerServer(
         brokerTopicStats = brokerTopicStats,
         isShuttingDown = isShuttingDown,
         zkClient = None,
-        threadNamePrefix = threadNamePrefix)
+        threadNamePrefix = threadNamePrefix,
+        brokerEpochSupplier = () => lifecycleManager.brokerEpoch)
 
       /* start token manager */
       if (config.tokenAuthEnabled) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index bde98b484f0..bc55a7f9ccf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -978,6 +978,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val params = new FetchParams(
         versionId,
         fetchRequest.replicaId,
+        fetchRequest.replicaEpoch,
         fetchRequest.maxWait,
         fetchMinBytes,
         fetchMaxBytes,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index fe560b9910f..9fed5955a7d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -623,7 +623,8 @@ class KafkaServer(
       brokerTopicStats = brokerTopicStats,
       isShuttingDown = isShuttingDown,
       zkClient = Some(zkClient),
-      threadNamePrefix = threadNamePrefix)
+      threadNamePrefix = threadNamePrefix,
+      brokerEpochSupplier = brokerEpochSupplier)
   }
 
   private def initZkClient(time: Time): Unit = {
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala 
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 587b6a449ed..cb42a612d19 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -94,6 +94,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
     val fetchParams = new FetchParams(
       request.version,
       FetchRequest.FUTURE_LOCAL_REPLICA_ID,
+      -1,
       0L, // timeout is 0 so that the callback will be executed immediately
       request.minBytes,
       request.maxBytes,
@@ -226,7 +227,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
         ApiKeys.FETCH.latestVersion
       // Set maxWait and minBytes to 0 because the response should return 
immediately if
       // the future log has caught up with the current log of the partition
-      val requestBuilder = FetchRequest.Builder.forReplica(version, replicaId, 
0, 0, requestMap).setMaxBytes(maxBytes)
+      val requestBuilder = FetchRequest.Builder.forReplica(version, replicaId, 
-1, 0, 0, requestMap).setMaxBytes(maxBytes)
       Some(ReplicaFetch(requestMap, requestBuilder))
     }
 
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala 
b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index ed8e775f330..0f363fba8fa 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -56,7 +56,8 @@ class RemoteLeaderEndPoint(logPrefix: String,
                            brokerConfig: KafkaConfig,
                            replicaManager: ReplicaManager,
                            quota: ReplicaQuota,
-                           metadataVersionSupplier: () => MetadataVersion) 
extends LeaderEndPoint with Logging {
+                           metadataVersionSupplier: () => MetadataVersion,
+                           brokerEpochSupplier: () => Long) extends 
LeaderEndPoint with Logging {
 
   this.logIdent = logPrefix
 
@@ -217,7 +218,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
         metadataVersion.fetchRequestVersion
       }
       val requestBuilder = FetchRequest.Builder
-        .forReplica(version, brokerConfig.brokerId, maxWait, minBytes, 
fetchData.toSend)
+        .forReplica(version, brokerConfig.brokerId, brokerEpochSupplier(), 
maxWait, minBytes, fetchData.toSend)
         .setMaxBytes(maxBytes)
         .removed(fetchData.toForget)
         .replaced(fetchData.toReplace)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 33af5836cd1..9abd2895c4e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -29,7 +29,8 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
                             time: Time,
                             threadNamePrefix: Option[String] = None,
                             quotaManager: ReplicationQuotaManager,
-                            metadataVersionSupplier: () => MetadataVersion)
+                            metadataVersionSupplier: () => MetadataVersion,
+                            brokerEpochSupplier: () => Long)
       extends AbstractFetcherManager[ReplicaFetcherThread](
         name = "ReplicaFetcherManager on broker " + brokerConfig.brokerId,
         clientId = "Replica",
@@ -44,7 +45,7 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
       s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId", logContext)
     val fetchSessionHandler = new FetchSessionHandler(logContext, 
sourceBroker.id)
     val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, 
fetchSessionHandler, brokerConfig,
-      replicaManager, quotaManager, metadataVersionSupplier)
+      replicaManager, quotaManager, metadataVersionSupplier, 
brokerEpochSupplier)
     new ReplicaFetcherThread(threadName, leader, brokerConfig, 
failedPartitions, replicaManager,
       quotaManager, logContext.logPrefix, metadataVersionSupplier)
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1fd21104208..c3530fd9a55 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -194,6 +194,7 @@ class ReplicaManager(val config: KafkaConfig,
                      delayedDeleteRecordsPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
                      delayedElectLeaderPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
                      threadNamePrefix: Option[String] = None,
+                     brokerEpochSupplier: () => Long = () => -1
                      ) extends Logging {
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
@@ -1965,7 +1966,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   protected def createReplicaFetcherManager(metrics: Metrics, time: Time, 
threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = {
-    new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, 
quotaManager, () => metadataCache.metadataVersion())
+    new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, 
quotaManager, () => metadataCache.metadataVersion(), brokerEpochSupplier)
   }
 
   protected def createReplicaAlterLogDirsManager(quotaManager: 
ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = {
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 19c351ad8b6..649fed0ae26 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -413,7 +413,7 @@ private class ReplicaFetcher(name: String, sourceBroker: 
Node, topicPartitions:
         0L, fetchSize, Optional.empty()))
 
     val fetchRequestBuilder = FetchRequest.Builder.
-      forReplica(ApiKeys.FETCH.latestVersion, 
FetchRequest.DEBUGGING_CONSUMER_ID, maxWait, minBytes, requestMap)
+      forReplica(ApiKeys.FETCH.latestVersion, 
FetchRequest.DEBUGGING_CONSUMER_ID, -1, maxWait, minBytes, requestMap)
 
     debug("Issuing fetch request ")
 
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java 
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index cb260383294..98116dda9fc 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest {
 
     @ClusterTest
     public void testDefaults(ClusterConfig config) {
-        Assertions.assertEquals(MetadataVersion.IBP_3_5_IV0, 
config.metadataVersion());
+        Assertions.assertEquals(MetadataVersion.IBP_3_5_IV1, 
config.metadataVersion());
     }
 }
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java 
b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index 2b8fa96e56e..813b6d65758 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -41,6 +41,6 @@ public @interface ClusterTest {
     String name() default "";
     SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
     String listener() default "";
-    MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV0;
+    MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV1;
     ClusterConfigProperty[] serverProperties() default {};
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e8f2ea88c49..681067834f0 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -385,7 +385,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     partitionMap.put(tp, new 
requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic, 
Uuid.ZERO_UUID),
       0, 0, 100, Optional.of(27)))
     val version = ApiKeys.FETCH.latestVersion
-    requests.FetchRequest.Builder.forReplica(version, 5000, 100, Int.MaxValue, 
partitionMap).build()
+    requests.FetchRequest.Builder.forReplica(version, 5000, -1, 100, 
Int.MaxValue, partitionMap).build()
   }
 
   private def createListOffsetsRequest = {
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 074ae92a056..1710d713d0f 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -173,6 +173,7 @@ class DelayedFetchTest {
     new FetchParams(
       ApiKeys.FETCH.latestVersion,
       replicaId,
+      1,
       maxWaitMs,
       1,
       maxBytes,
diff --git 
a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index 07be85b1e6f..54bc462e973 100644
--- 
a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -47,7 +47,7 @@ class MetadataVersionIntegrationTest {
     assertEquals(ff.maxVersionLevel(), 
clusterInstance.config().metadataVersion().featureLevel())
 
     // Update to new version
-    val updateVersion = MetadataVersion.IBP_3_5_IV0.featureLevel.shortValue
+    val updateVersion = MetadataVersion.IBP_3_5_IV1.featureLevel.shortValue
     val updateResult = admin.updateFeatures(
       Map("metadata.version" -> new FeatureUpdate(updateVersion, 
UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
     updateResult.all().get()
diff --git a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
index 5232f60b53d..cac71c2adea 100644
--- a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
@@ -18,22 +18,27 @@
 package kafka.server
 
 import kafka.cluster.BrokerEndPoint
+import kafka.log.UnifiedLog
+import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.server.epoch.util.MockBlockingSender
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.errors.{FencedLeaderEpochException, 
UnknownLeaderEpochException}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.LogContext
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, Test}
-import org.mockito.Mockito.mock
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.Mockito.{mock, when}
 
 import java.util
+import scala.collection.Map
 import scala.jdk.CollectionConverters._
 
 class RemoteLeaderEndPointTest {
@@ -43,8 +48,10 @@ class RemoteLeaderEndPointTest {
     val logStartOffset = 20
     val localLogStartOffset = 100
     val logEndOffset = 300
+    val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
     var blockingSend: MockBlockingSender = _
     var endPoint: LeaderEndPoint = _
+    var currentBrokerEpoch = 1L
 
     @BeforeEach
     def setUp(): Unit = {
@@ -54,11 +61,10 @@ class RemoteLeaderEndPointTest {
         val props = TestUtils.createBrokerConfig(sourceBroker.id, 
TestUtils.MockZkConnect, port = sourceBroker.port)
         val fetchSessionHandler = new FetchSessionHandler(new 
LogContext(logPrefix), sourceBroker.id)
         val config = KafkaConfig.fromProps(props)
-        val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
         blockingSend = new MockBlockingSender(offsets = new 
util.HashMap[TopicPartition, EpochEndOffset](),
             sourceBroker = sourceBroker, time = time)
         endPoint = new RemoteLeaderEndPoint(logPrefix, blockingSend, 
fetchSessionHandler,
-            config, replicaManager, QuotaFactory.UnboundedQuota, () => 
MetadataVersion.MINIMUM_KRAFT_VERSION)
+            config, replicaManager, QuotaFactory.UnboundedQuota, () => 
MetadataVersion.MINIMUM_KRAFT_VERSION, () => currentBrokerEpoch)
     }
 
     @Test
@@ -116,4 +122,25 @@ class RemoteLeaderEndPointTest {
         assertThrows(classOf[UnknownLeaderEpochException], () => 
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch + 1))
         assertThrows(classOf[UnknownLeaderEpochException], () => 
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch + 1))
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    def testBrokerEpochSupplier(version: Short): Unit = {
+        val tp = new TopicPartition("topic1", 0)
+        val topicId1 = Uuid.randomUuid()
+        val log = mock(classOf[UnifiedLog])
+        val partitionMap = Map(
+            tp -> PartitionFetchState(Some(topicId1), 150, None, 0, None, 
state = Fetching, lastFetchedEpoch = None))
+        when(replicaManager.localLogOrException(tp)).thenReturn(log)
+        when(log.logStartOffset).thenReturn(1)
+
+        val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = 
endPoint.buildFetch(partitionMap)
+        assertTrue(partitionsWithError.isEmpty)
+        assertEquals(if (version < 15) -1L else 1L, 
fetchRequestOpt.get.fetchRequest.build(version).replicaEpoch)
+
+        currentBrokerEpoch = 2L
+        val ResultWithPartitions(newFetchRequestOpt, newPartitionsWithError) = 
endPoint.buildFetch(partitionMap)
+        assertTrue(newPartitionsWithError.isEmpty)
+        assertEquals(if (version < 15) -1L else 2L, 
newFetchRequestOpt.get.fetchRequest.build(version).replicaEpoch)
+    }
 }
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
index 81508e64860..48337d75720 100644
--- a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
@@ -26,7 +26,7 @@ import 
org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.{SAFE_DOWNGRADE,
 import org.apache.kafka.clients.admin.MockAdminClient
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.common.MetadataVersion.{IBP_3_3_IV0, 
IBP_3_3_IV1, IBP_3_3_IV2, IBP_3_3_IV3, IBP_3_5_IV0}
+import org.apache.kafka.server.common.MetadataVersion.{IBP_3_3_IV0, 
IBP_3_3_IV1, IBP_3_3_IV2, IBP_3_3_IV3, IBP_3_5_IV1}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
@@ -84,7 +84,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
         Array("--bootstrap-server", bootstrapServers(), "describe"), env.out))
       assertEquals(String.format(
         "Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
-          "SupportedMaxVersion: 3.5-IV0\tFinalizedVersionLevel: 3.3-IV1\t"),
+          "SupportedMaxVersion: 3.5-IV1\tFinalizedVersionLevel: 3.3-IV1\t"),
             env.outputWithoutEpoch())
     }
   }
@@ -145,7 +145,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
       assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", 
bootstrapServers(),
         "disable", "--feature", "metadata.version"), env.out))
       assertEquals("Could not disable metadata.version. Invalid update version 
0 for feature " +
-        "metadata.version. Local controller 1000 only supports versions 1-9", 
env.outputWithoutEpoch())
+        "metadata.version. Local controller 1000 only supports versions 1-10", 
env.outputWithoutEpoch())
     }
     TestUtils.resource(FeatureCommandTestEnv()) { env =>
       assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", 
bootstrapServers(),
@@ -173,8 +173,8 @@ class FeatureCommandUnitTest {
 
   @Test
   def testMetadataVersionsToString(): Unit = {
-    assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.5-IV0",
-      FeatureCommand.metadataVersionsToString(IBP_3_3_IV0, IBP_3_5_IV0))
+    assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.5-IV0, 
3.5-IV1",
+      FeatureCommand.metadataVersionsToString(IBP_3_3_IV0, IBP_3_5_IV1))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 0d8cc47ce1f..3de0f4f8751 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -390,6 +390,7 @@ class PartitionLockTest extends Logging {
       val fetchParams = new FetchParams(
         ApiKeys.FETCH.latestVersion,
         followerId,
+        1,
         0L,
         1,
         maxBytes,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 31396f46051..b01ee76210a 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -108,6 +108,7 @@ object PartitionTest {
 
   def followerFetchParams(
     replicaId: Int,
+    replicaEpoch: Long = 1L,
     maxWaitMs: Long = 0L,
     minBytes: Int = 1,
     maxBytes: Int = Int.MaxValue
@@ -115,6 +116,7 @@ object PartitionTest {
     new FetchParams(
       ApiKeys.FETCH.latestVersion,
       replicaId,
+      replicaEpoch,
       maxWaitMs,
       minBytes,
       maxBytes,
@@ -133,6 +135,7 @@ object PartitionTest {
     new FetchParams(
       ApiKeys.FETCH.latestVersion,
       FetchRequest.CONSUMER_REPLICA_ID,
+      -1,
       maxWaitMs,
       minBytes,
       maxBytes,
diff --git a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala 
b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
index eaa21dd8173..af230f66553 100644
--- a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
@@ -21,15 +21,18 @@ import java.util
 import java.util.Collections
 import org.apache.kafka.clients.MockClient.MockMetadataUpdater
 import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.common.message.FetchRequestData.ReplicaState
 import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, 
EndQuorumEpochResponseData, FetchResponseData, VoteResponseData}
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
-import org.apache.kafka.common.requests.{AbstractResponse, 
ApiVersionsResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse, 
EndQuorumEpochRequest, EndQuorumEpochResponse, FetchResponse, VoteRequest, 
VoteResponse}
+import org.apache.kafka.common.requests.{AbstractResponse, 
ApiVersionsResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse, 
EndQuorumEpochRequest, EndQuorumEpochResponse, FetchRequest, FetchResponse, 
VoteRequest, VoteResponse}
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{MockTime, Time}
 import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import org.apache.kafka.raft.RaftConfig.InetAddressSpec
 import org.apache.kafka.raft.{RaftRequest, RaftUtil}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
 
 import scala.jdk.CollectionConverters._
 
@@ -159,6 +162,28 @@ class KafkaNetworkChannelTest {
     }
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+  def testFetchRequestDowngrade(version: Short): Unit = {
+    val destinationId = 2
+    val destinationNode = new Node(destinationId, "127.0.0.1", 9092)
+    channel.updateEndpoint(destinationId, new InetAddressSpec(
+      new InetSocketAddress(destinationNode.host, destinationNode.port)))
+    sendTestRequest(ApiKeys.FETCH, destinationId)
+    channel.pollOnce()
+
+    assertEquals(1, client.requests.size)
+    val request = client.requests.peek.requestBuilder.build(version)
+
+    if (version < 15) {
+      assertTrue(request.asInstanceOf[FetchRequest].data.replicaId == 1)
+      
assertTrue(request.asInstanceOf[FetchRequest].data.replicaState.replicaId == -1)
+    } else {
+      assertTrue(request.asInstanceOf[FetchRequest].data.replicaId == -1)
+      
assertTrue(request.asInstanceOf[FetchRequest].data.replicaState.replicaId == 1)
+    }
+  }
+
   private def sendTestRequest(
     apiKey: ApiKeys,
     destinationId: Int,
@@ -216,7 +241,7 @@ class KafkaNetworkChannelTest {
             .setFetchOffset(333)
             .setLastFetchedEpoch(5)
         })
-        request.setReplicaId(1)
+        request.setReplicaState(new ReplicaState().setReplicaId(1))
 
       case _ =>
         throw new AssertionError(s"Unexpected api $key")
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index e959251b6ea..2af7f105244 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -1371,7 +1371,7 @@ class AbstractFetcherThreadTest {
               1024 * 1024, Optional.of[Integer](state.currentLeaderEpoch), 
lastFetchedEpoch))
         }
       }
-      val fetchRequest = FetchRequest.Builder.forReplica(version, replicaId, 
0, 1, fetchData.asJava)
+      val fetchRequest = FetchRequest.Builder.forReplica(version, replicaId, 
1, 0, 1, fetchData.asJava)
       val fetchRequestOpt =
         if (fetchData.isEmpty)
           None
diff --git 
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
 
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index d01228c1d13..b713e576109 100644
--- 
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -257,7 +257,7 @@ class FetchRequestDownConversionConfigTest extends 
BaseRequestTest {
     val partitionMap = createPartitionMap(1024, partitions, topicIdMap)
 
     val fetchRequest = replicaIdOpt.map { replicaId =>
-      FetchRequest.Builder.forReplica(fetchVersion, replicaId, Int.MaxValue, 
0, partitionMap)
+      FetchRequest.Builder.forReplica(fetchVersion, replicaId, -1, 
Int.MaxValue, 0, partitionMap)
         .build(fetchVersion)
     }.getOrElse {
       FetchRequest.Builder.forConsumer(fetchVersion, Int.MaxValue, 0, 
partitionMap)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 9104d2840de..0034e429b92 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -627,7 +627,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     producer.close()
 
     // fetch request with version below v10: UNSUPPORTED_COMPRESSION_TYPE 
error occurs
-    val req0 = new FetchRequest.Builder(0, 9, -1, Int.MaxValue, 0,
+    val req0 = new FetchRequest.Builder(0, 9, -1, -1, Int.MaxValue, 0,
       createPartitionMap(300, Seq(topicPartition), Map.empty))
       .setMaxBytes(800).build()
 
@@ -636,7 +636,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE.code, data0.errorCode)
 
     // fetch request with version 10: works fine!
-    val req1= new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0,
+    val req1= new FetchRequest.Builder(0, 10, -1, -1, Int.MaxValue, 0,
       createPartitionMap(300, Seq(topicPartition), Map.empty))
       .setMaxBytes(800).build()
     val res1 = sendFetchRequest(leaderId, req1)
@@ -644,7 +644,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertEquals(Errors.NONE.code, data1.errorCode)
     assertEquals(3, records(data1).size)
 
-    val req2 = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion(), 
ApiKeys.FETCH.latestVersion(), -1, Int.MaxValue, 0,
+    val req2 = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion(), 
ApiKeys.FETCH.latestVersion(), -1, -1, Int.MaxValue, 0,
       createPartitionMap(300, Seq(topicPartition), Map.empty))
       .setMaxBytes(800).build()
     val res2 = sendFetchRequest(leaderId, req2)
@@ -683,7 +683,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     // fetch request with fetch version v1 (magic 0):
     // gzip compressed record is returned with down-conversion.
     // zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error.
-    val req0 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+    val req0 = new FetchRequest.Builder(0, 1, -1, -1, Int.MaxValue, 0,
       createPartitionMap(300, Seq(topicPartition), Map.empty))
       .setMaxBytes(800)
       .build()
@@ -693,7 +693,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertEquals(Errors.NONE.code, data0.errorCode)
     assertEquals(1, records(data0).size)
 
-    val req1 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+    val req1 = new FetchRequest.Builder(0, 1, -1, -1, Int.MaxValue, 0,
       createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L)))
       .setMaxBytes(800).build()
 
@@ -704,7 +704,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     // fetch request with fetch version v3 (magic 1):
     // gzip compressed record is returned with down-conversion.
     // zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error.
-    val req2 = new FetchRequest.Builder(2, 3, -1, Int.MaxValue, 0,
+    val req2 = new FetchRequest.Builder(2, 3, -1, -1, Int.MaxValue, 0,
       createPartitionMap(300, Seq(topicPartition), Map.empty))
       .setMaxBytes(800).build()
 
@@ -713,7 +713,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertEquals(Errors.NONE.code, data2.errorCode)
     assertEquals(1, records(data2).size)
 
-    val req3 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+    val req3 = new FetchRequest.Builder(0, 1, -1, -1, Int.MaxValue, 0,
       createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L)))
       .setMaxBytes(800).build()
 
@@ -722,7 +722,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE.code, data3.errorCode)
 
     // fetch request with version 10: works fine!
-    val req4 = new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0,
+    val req4 = new FetchRequest.Builder(0, 10, -1, -1, Int.MaxValue, 0,
       createPartitionMap(300, Seq(topicPartition), Map.empty))
       .setMaxBytes(800).build()
     val res4 = sendFetchRequest(leaderId, req4)
@@ -730,7 +730,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertEquals(Errors.NONE.code, data4.errorCode)
     assertEquals(3, records(data4).size)
 
-    val req5 = new FetchRequest.Builder(0, ApiKeys.FETCH.latestVersion(), -1, 
Int.MaxValue, 0,
+    val req5 = new FetchRequest.Builder(0, ApiKeys.FETCH.latestVersion(), -1, 
-1, Int.MaxValue, 0,
       createPartitionMap(300, Seq(topicPartition), Map.empty))
       .setMaxBytes(800).build()
     val res5 = sendFetchRequest(leaderId, req5)
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index 538a061eea3..2275673aba3 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -140,14 +140,14 @@ class FetchSessionTest {
                     toForget: util.List[TopicIdPartition], isFromFollower: 
Boolean,
                     version: Short = ApiKeys.FETCH.latestVersion): 
FetchRequest = {
     new FetchRequest.Builder(version, version, if (isFromFollower) 1 else 
FetchRequest.CONSUMER_REPLICA_ID,
-      0, 0, fetchData).metadata(metadata).removed(toForget).build
+      if (isFromFollower) 1 else -1, 0, 0, 
fetchData).metadata(metadata).removed(toForget).build
   }
 
   def createRequestWithoutTopicIds(metadata: JFetchMetadata,
                     fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
                     toForget: util.List[TopicIdPartition], isFromFollower: 
Boolean): FetchRequest = {
     new FetchRequest.Builder(12, 12, if (isFromFollower) 1 else 
FetchRequest.CONSUMER_REPLICA_ID,
-      0, 0, fetchData).metadata(metadata).removed(toForget).build
+      if (isFromFollower) 1 else -1, 0, 0, 
fetchData).metadata(metadata).removed(toForget).build
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 63fd69b61b3..48303b7a52d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3366,7 +3366,7 @@ class KafkaApisTest {
     when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
       any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
 
-    val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0, 
fetchDataBuilder)
+    val fetchRequest = new FetchRequest.Builder(9, 9, -1, -1, 100, 0, 
fetchDataBuilder)
       .build()
     val request = buildRequest(fetchRequest)
 
@@ -3421,8 +3421,9 @@ class KafkaApisTest {
       any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
 
     // If replicaId is -1 we will build a consumer request. Any non-negative 
replicaId will build a follower request.
+    val replicaEpoch = if (replicaId < 0) -1 else 1
     val fetchRequest = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion, 
ApiKeys.FETCH.latestVersion,
-      replicaId, 100, 0, fetchDataBuilder).metadata(fetchMetadata).build()
+      replicaId, replicaEpoch, 100, 0, 
fetchDataBuilder).metadata(fetchMetadata).build()
     val request = buildRequest(fetchRequest)
 
     createKafkaApis().handleFetchRequest(request)
@@ -4528,7 +4529,7 @@ class KafkaApisTest {
     val fetchDataBuilder = Collections.singletonMap(tp0, new 
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, Int.MaxValue, 
Optional.of(leaderEpoch)))
     val fetchData = Collections.singletonMap(tidp0, new 
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, Int.MaxValue, 
Optional.of(leaderEpoch)))
     val fetchFromFollower = buildRequest(new FetchRequest.Builder(
-      ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1000, 
0, fetchDataBuilder).build())
+      ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1, 
1000, 0, fetchDataBuilder).build())
 
     val records = MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 582cd47ee73..0cc27854e50 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -279,6 +279,7 @@ class ReplicaAlterLogDirsThreadTest {
     val expectedFetchParams = new FetchParams(
       ApiKeys.FETCH.latestVersion,
       FetchRequest.FUTURE_LOCAL_REPLICA_ID,
+      -1,
       0L,
       0,
       config.replicaFetchResponseMaxBytes,
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 500bc23447f..bc3ea086f0c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -107,7 +107,7 @@ class ReplicaFetcherThreadTest {
     val logContext = new LogContext(s"[ReplicaFetcher 
replicaId=${brokerConfig.brokerId}, 
leaderId=${leaderEndpointBlockingSend.brokerEndPoint().id}, 
fetcherId=$fetcherId] ")
     val fetchSessionHandler = new FetchSessionHandler(logContext, 
leaderEndpointBlockingSend.brokerEndPoint().id)
     val leader = new RemoteLeaderEndPoint(logContext.logPrefix, 
leaderEndpointBlockingSend, fetchSessionHandler,
-      brokerConfig, replicaMgr, quota, () => 
brokerConfig.interBrokerProtocolVersion)
+      brokerConfig, replicaMgr, quota, () => 
brokerConfig.interBrokerProtocolVersion, () => 1)
     new ReplicaFetcherThread(name,
       leader,
       brokerConfig,
@@ -587,7 +587,7 @@ class ReplicaFetcherThreadTest {
     val logContext = new LogContext(s"[ReplicaFetcher 
replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
     val fetchSessionHandler = new FetchSessionHandler(logContext, 
brokerEndPoint.id)
     val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockNetwork, 
fetchSessionHandler, config,
-      replicaManager, quota, () => config.interBrokerProtocolVersion)
+      replicaManager, quota, () => config.interBrokerProtocolVersion, () => 1)
     val thread = new ReplicaFetcherThread("bob", leader, config, 
failedPartitions,
       replicaManager, quota, logContext.logPrefix, () => 
config.interBrokerProtocolVersion) {
       override def processPartitionData(topicPartition: TopicPartition, 
fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = None
@@ -701,7 +701,8 @@ class ReplicaFetcherThreadTest {
       config,
       replicaManager,
       quota,
-      () => config.interBrokerProtocolVersion
+      () => config.interBrokerProtocolVersion,
+      () => 1
     )
 
     val thread = new ReplicaFetcherThread(
@@ -1125,7 +1126,7 @@ class ReplicaFetcherThreadTest {
     val logContext = new LogContext(s"[ReplicaFetcher 
replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
     val fetchSessionHandler = new FetchSessionHandler(logContext, 
brokerEndPoint.id)
     val leader = new RemoteLeaderEndPoint(logContext.logPrefix, 
mockBlockingSend, fetchSessionHandler, config,
-      replicaManager, replicaQuota, () => config.interBrokerProtocolVersion)
+      replicaManager, replicaQuota, () => config.interBrokerProtocolVersion, 
() => 1)
     val thread = new ReplicaFetcherThread("bob",
       leader,
       config,
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 35209bfe877..5ffdf41878d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -231,6 +231,7 @@ class ReplicaManagerConcurrencyTest {
       val fetchParams = new FetchParams(
         ApiKeys.FETCH.latestVersion,
         replicaId,
+        1,
         random.nextInt(100),
         1,
         1024 * 1024,
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 9de737be511..ce727f83c74 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -174,6 +174,7 @@ class ReplicaManagerQuotasTest {
       val fetchParams = new FetchParams(
         ApiKeys.FETCH.latestVersion,
         1,
+        1,
         600,
         1,
         1000,
@@ -225,6 +226,7 @@ class ReplicaManagerQuotasTest {
       val fetchParams = new FetchParams(
         ApiKeys.FETCH.latestVersion,
         FetchRequest.CONSUMER_REPLICA_ID,
+        -1,
         600L,
         1,
         1000,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 047c5a5ac84..0ef3127fe1a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2217,14 +2217,14 @@ class ReplicaManagerTest {
                                                          time: Time,
                                                          threadNamePrefix: 
Option[String],
                                                          
replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
-        new ReplicaFetcherManager(config, this, metrics, time, 
threadNamePrefix, replicationQuotaManager, () => 
metadataCache.metadataVersion()) {
+        new ReplicaFetcherManager(config, this, metrics, time, 
threadNamePrefix, replicationQuotaManager, () => 
metadataCache.metadataVersion(), () => 1) {
 
           override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): ReplicaFetcherThread = {
             val logContext = new LogContext(s"[ReplicaFetcher 
replicaId=${config.brokerId}, leaderId=${sourceBroker.id}, " +
               s"fetcherId=$fetcherId] ")
             val fetchSessionHandler = new FetchSessionHandler(logContext, 
sourceBroker.id)
             val leader = new RemoteLeaderEndPoint(logContext.logPrefix, 
blockingSend, fetchSessionHandler, config,
-              replicaManager, quotaManager.follower, () => 
config.interBrokerProtocolVersion)
+              replicaManager, quotaManager.follower, () => 
config.interBrokerProtocolVersion, () => 1)
             new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", 
leader, config, failedPartitions, replicaManager,
               quotaManager.follower, logContext.logPrefix, () => 
config.interBrokerProtocolVersion) {
               override def doWork(): Unit = {
@@ -2415,6 +2415,7 @@ class ReplicaManagerTest {
     val params = new FetchParams(
       requestVersion,
       replicaId,
+      1,
       maxWaitMs,
       minBytes,
       maxBytes,
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
index a428f9152e4..f7321b5bc65 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
@@ -90,7 +90,7 @@ public class FetchRequestBenchmark {
         this.header = new RequestHeader(ApiKeys.FETCH, 
ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
         this.consumerRequest = 
FetchRequest.Builder.forConsumer(ApiKeys.FETCH.latestVersion(), 0, 0, fetchData)
             .build(ApiKeys.FETCH.latestVersion());
-        this.replicaRequest = 
FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion(), 1, 0, 0, 
fetchData)
+        this.replicaRequest = 
FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion(), 1, 1, 0, 0, 
fetchData)
             .build(ApiKeys.FETCH.latestVersion());
         this.requestBuffer = this.consumerRequest.serialize();
 
@@ -111,7 +111,7 @@ public class FetchRequestBenchmark {
     @Benchmark
     public int testFetchRequestForReplica() {
         FetchRequest fetchRequest = FetchRequest.Builder.forReplica(
-            ApiKeys.FETCH.latestVersion(), 1, 0, 0, fetchData)
+            ApiKeys.FETCH.latestVersion(), 1, 1, 0, 0, fetchData)
                 .build(ApiKeys.FETCH.latestVersion());
         return fetchRequest.fetchData(topicNames).size();
     }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index e37a197655c..8f764957fcf 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -314,7 +314,8 @@ public class ReplicaFetcherThreadBenchmark {
                             config,
                             replicaManager,
                             replicaQuota,
-                            config::interBrokerProtocolVersion
+                            config::interBrokerProtocolVersion,
+                            () -> -1
                     ) {
                         @Override
                         public OffsetAndEpoch 
fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index f85b1f921da..4e3c35dde2a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -165,7 +165,7 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_5_IV0)).
+                setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_5_IV1)).
                 setBrokerId(0).
                 setClusterId(logEnv.clusterId())).get();
             testConfigurationOperations(controlEnv.activeController());
@@ -206,7 +206,7 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_5_IV0)).
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_5_IV1)).
                     setBrokerId(0).
                     setClusterId(logEnv.clusterId())).get();
             testDelayedConfigurationOperations(logEnv, 
controlEnv.activeController());
@@ -536,7 +536,7 @@ public class QuorumControllerTest {
                     setBrokerId(0).
                     setClusterId(active.clusterId()).
                     
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_5_IV0)).
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_5_IV1)).
                     setListeners(listeners));
             assertEquals(2L, reply.get().epoch());
             CreateTopicsRequestData createTopicsRequestData =
@@ -957,7 +957,7 @@ public class QuorumControllerTest {
                     .setBrokerId(brokerId)
                     .setRack(null)
                     .setClusterId(controller.clusterId())
-                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_5_IV0))
+                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_5_IV1))
                     .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" 
+ brokerId))
                     .setListeners(
                         new ListenerCollection(
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 393909b390c..da8cb2a4bbe 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.requests.DescribeQuorumResponse;
 import org.apache.kafka.common.requests.EndQuorumEpochRequest;
 import org.apache.kafka.common.requests.EndQuorumEpochResponse;
 import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchSnapshotRequest;
 import org.apache.kafka.common.requests.FetchSnapshotResponse;
 import org.apache.kafka.common.requests.VoteRequest;
@@ -958,7 +959,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
                 Errors.INVALID_REQUEST, Optional.empty()));
         }
 
-        FetchResponseData response = 
tryCompleteFetchRequest(request.replicaId(), fetchPartition, currentTimeMs);
+        int replicaId = FetchRequest.replicaId(request);
+        FetchResponseData response = tryCompleteFetchRequest(replicaId, 
fetchPartition, currentTimeMs);
         FetchResponseData.PartitionData partitionResponse =
             response.responses().get(0).partitions().get(0);
 
@@ -983,16 +985,16 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
                 Errors error = Errors.forException(cause);
                 if (error != Errors.REQUEST_TIMED_OUT) {
                     logger.debug("Failed to handle fetch from {} at {} due to 
{}",
-                        request.replicaId(), fetchPartition.fetchOffset(), 
error);
+                        replicaId, fetchPartition.fetchOffset(), error);
                     return buildEmptyFetchResponse(error, Optional.empty());
                 }
             }
 
             // FIXME: `completionTimeMs`, which can be null
             logger.trace("Completing delayed fetch from {} starting at offset 
{} at {}",
-                request.replicaId(), fetchPartition.fetchOffset(), 
completionTimeMs);
+                replicaId, fetchPartition.fetchOffset(), completionTimeMs);
 
-            return tryCompleteFetchRequest(request.replicaId(), 
fetchPartition, time.milliseconds());
+            return tryCompleteFetchRequest(replicaId, fetchPartition, 
time.milliseconds());
         });
     }
 
@@ -1791,7 +1793,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             .setMaxBytes(MAX_FETCH_SIZE_BYTES)
             .setMaxWaitMs(fetchMaxWaitMs)
             .setClusterId(clusterId)
-            .setReplicaId(quorum.localIdOrSentinel());
+            .setReplicaState(new 
FetchRequestData.ReplicaState().setReplicaId(quorum.localIdOrSentinel()));
     }
 
     private long maybeSendAnyVoterFetch(long currentTimeMs) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index b843b8a09b8..7cbeb11f4f7 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -25,8 +25,10 @@ import 
org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
 import org.apache.kafka.common.message.EndQuorumEpochResponseData;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MutableRecordBatch;
@@ -35,11 +37,14 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.requests.DescribeQuorumRequest;
 import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.apache.kafka.raft.errors.BufferAllocationException;
 import org.apache.kafka.raft.errors.NotLeaderException;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -1436,6 +1441,33 @@ public class KafkaRaftClientTest {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
     }
 
+    // This test mainly focuses on whether the leader state is correctly 
updated under different fetch version.
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short 
version) throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+        // First poll has no high watermark advance.
+        context.client.poll();
+        assertEquals(OptionalLong.empty(), context.client.highWatermark());
+        assertEquals(1L, context.log.endOffset().offset);
+
+        // Now we will advance the high watermark with a follower fetch 
request.
+        FetchRequestData fetchRequestData = context.fetchRequest(epoch, 
otherNodeId, 1L, epoch, 0);
+        FetchRequestData request = new 
FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
+        assertEquals((version < 15) ? 1 : -1, fetchRequestData.replicaId());
+        assertEquals((version < 15) ? -1 : 1, 
fetchRequestData.replicaState().replicaId());
+        context.deliverRequest(request);
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
+        assertEquals(OptionalLong.of(1L), context.client.highWatermark());
+    }
+
     @Test
     public void testFetchRequestClusterIdValidation() throws Exception {
         int localId = 0;
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index b79eddf0029..50647f8df3a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -984,7 +984,7 @@ public final class RaftClientTestContext {
         assertEquals(epoch, fetchPartition.currentLeaderEpoch());
         assertEquals(fetchOffset, fetchPartition.fetchOffset());
         assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
-        assertEquals(localId.orElse(-1), request.replicaId());
+        assertEquals(localId.orElse(-1), request.replicaState().replicaId());
 
         // Assert that voters have flushed up to the fetch offset
         if (localId.isPresent() && voters.contains(localId.getAsInt())) {
@@ -1036,7 +1036,7 @@ public final class RaftClientTestContext {
         return request
             .setMaxWaitMs(maxWaitTimeMs)
             .setClusterId(clusterId)
-            .setReplicaId(replicaId);
+            .setReplicaState(new 
FetchRequestData.ReplicaState().setReplicaId(replicaId));
     }
 
     FetchResponseData fetchResponse(
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 67d878bc60e..b0593f40d07 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -168,7 +168,10 @@ public enum MetadataVersion {
     IBP_3_4_IV0(8, "3.4", "IV0", true),
 
     // Support for tiered storage (KIP-405) and SCRAM
-    IBP_3_5_IV0(9, "3.5", "IV0", false);
+    IBP_3_5_IV0(9, "3.5", "IV0", false),
+
+    // Adds replica epoch to Fetch request (KIP-903).
+    IBP_3_5_IV1(10, "3.5", "IV1", false);
 
     // NOTE: update the default version in @ClusterTest annotation to point to 
the latest version
     public static final String FEATURE_NAME = "metadata.version";
@@ -292,7 +295,9 @@ public enum MetadataVersion {
     }
 
     public short fetchRequestVersion() {
-        if (this.isAtLeast(IBP_3_5_IV0)) {
+        if (this.isAtLeast(IBP_3_5_IV1)) {
+            return 15;
+        } else if (this.isAtLeast(IBP_3_5_IV0)) {
             return 14;
         } else if (this.isAtLeast(IBP_3_1_IV0)) {
             return 13;
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 885596a9c20..75bbad2aff5 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -158,6 +158,7 @@ class MetadataVersionTest {
         assertEquals(IBP_3_4_IV0, 
MetadataVersion.fromVersionString("3.4-IV0"));
 
         assertEquals(IBP_3_5_IV0, 
MetadataVersion.fromVersionString("3.5-IV0"));
+        assertEquals(IBP_3_5_IV1, 
MetadataVersion.fromVersionString("3.5-IV1"));
     }
 
     @Test
@@ -207,6 +208,7 @@ class MetadataVersionTest {
         assertEquals("3.3", IBP_3_3_IV3.shortVersion());
         assertEquals("3.4", IBP_3_4_IV0.shortVersion());
         assertEquals("3.5", IBP_3_5_IV0.shortVersion());
+        assertEquals("3.5", IBP_3_5_IV1.shortVersion());
     }
 
     @Test
@@ -245,6 +247,7 @@ class MetadataVersionTest {
         assertEquals("3.3-IV3", IBP_3_3_IV3.version());
         assertEquals("3.4-IV0", IBP_3_4_IV0.version());
         assertEquals("3.5-IV0", IBP_3_5_IV0.version());
+        assertEquals("3.5-IV1", IBP_3_5_IV1.version());
     }
 
     @Test
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
index ff2308a83c4..700a8e40d5f 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 public class FetchParams {
     public final short requestVersion;
     public final int replicaId;
+    public final long replicaEpoch;
     public final long maxWaitMs;
     public final int minBytes;
     public final int maxBytes;
@@ -33,6 +34,7 @@ public class FetchParams {
 
     public FetchParams(short requestVersion,
                        int replicaId,
+                       long replicaEpoch,
                        long maxWaitMs,
                        int minBytes,
                        int maxBytes,
@@ -42,6 +44,7 @@ public class FetchParams {
         Objects.requireNonNull(clientMetadata);
         this.requestVersion = requestVersion;
         this.replicaId = replicaId;
+        this.replicaEpoch = replicaEpoch;
         this.maxWaitMs = maxWaitMs;
         this.minBytes = minBytes;
         this.maxBytes = maxBytes;
@@ -72,6 +75,7 @@ public class FetchParams {
         FetchParams that = (FetchParams) o;
         return requestVersion == that.requestVersion
                 && replicaId == that.replicaId
+                && replicaEpoch == that.replicaEpoch
                 && maxWaitMs == that.maxWaitMs
                 && minBytes == that.minBytes
                 && maxBytes == that.maxBytes
@@ -83,6 +87,7 @@ public class FetchParams {
     public int hashCode() {
         int result = requestVersion;
         result = 31 * result + replicaId;
+        result = 31 * result + (int) replicaEpoch;
         result = 31 * result + Long.hashCode(32);
         result = 31 * result + minBytes;
         result = 31 * result + maxBytes;
@@ -96,6 +101,7 @@ public class FetchParams {
         return "FetchParams(" +
                 "requestVersion=" + requestVersion +
                 ", replicaId=" + replicaId +
+                ", replicaEpoch=" + replicaEpoch +
                 ", maxWaitMs=" + maxWaitMs +
                 ", minBytes=" + minBytes +
                 ", maxBytes=" + maxBytes +

Reply via email to