This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e2847e8  KAFKA-8365; Consumer and protocol support for follower 
fetching (#6731)
e2847e8 is described below

commit e2847e8603fe19a87ff03584fb38954e4bd3a59e
Author: David Arthur <mum...@gmail.com>
AuthorDate: Sat May 18 01:45:46 2019 -0400

    KAFKA-8365; Consumer and protocol support for follower fetching (#6731)
    
    This patch includes API changes for follower fetching per 
[KIP-392](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica)
 as well as the consumer implementation. After this patch, consumers will 
continue to fetch only from the leader, since the broker implementation to 
select an alternate read replica is not included here.
    
    Adds new `client.rack` consumer configuration property is added which 
allows the consumer to indicate its rack. This is just an arbitrary string to 
indicate some relative location, it doesn't have to actually represent a 
physical rack. We are keeping the naming consistent with the broker property 
(`broker.rack`).
    
    FetchRequest now includes `rack_id` which can optionally be specified by 
the consumer. FetchResponse includes an optional `preferred_read_replica` field 
for each partition in the response. OffsetForLeaderEpochRequest also adds new 
`replica_id` field which is similar to the same field in FetchRequest.
    
    When the consumer sees a `preferred_read_replica` in a fetch response, it 
will use the Node with that ID for the next fetch.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/clients/CommonClientConfigs.java  |   3 +
 .../java/org/apache/kafka/clients/Metadata.java    |   4 +
 .../kafka/clients/consumer/ConsumerConfig.java     |  10 ++
 .../kafka/clients/consumer/KafkaConsumer.java      |   1 +
 .../kafka/clients/consumer/internals/Fetcher.java  |  81 +++++++++++---
 .../consumer/internals/SubscriptionState.java      |  66 +++++++++++-
 .../main/java/org/apache/kafka/common/Cluster.java |  17 +++
 .../apache/kafka/common/requests/FetchRequest.java |  36 ++++++-
 .../kafka/common/requests/FetchResponse.java       |  57 +++++++++-
 .../requests/OffsetsForLeaderEpochRequest.java     |  32 +++++-
 .../requests/OffsetsForLeaderEpochResponse.java    |   5 +-
 .../resources/common/message/FetchRequest.json     |   6 +-
 .../resources/common/message/FetchResponse.json    |   4 +-
 .../message/OffsetForLeaderEpochRequest.json       |   5 +-
 .../message/OffsetForLeaderEpochResponse.json      |   2 +-
 .../org/apache/kafka/clients/MetadataTest.java     |  23 ++++
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   1 +
 .../clients/consumer/internals/FetcherTest.java    | 116 +++++++++++++++++++--
 .../consumer/internals/SubscriptionStateTest.java  |  31 ++++++
 .../kafka/common/requests/RequestResponseTest.java |  16 +--
 core/src/main/scala/kafka/api/ApiVersion.scala     |  11 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   6 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |   2 +-
 23 files changed, 484 insertions(+), 51 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 491b5de..49465dc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -60,6 +60,9 @@ public class CommonClientConfigs {
     public static final String CLIENT_ID_CONFIG = "client.id";
     public static final String CLIENT_ID_DOC = "An id string to pass to the 
server when making requests. The purpose of this is to be able to track the 
source of requests beyond just ip/port by allowing a logical application name 
to be included in server-side request logging.";
 
+    public static final String CLIENT_RACK_CONFIG = "client.rack";
+    public static final String CLIENT_RACK_DOC = "A rack identifier for this 
client. This can be any string value which indicates where this client is 
physically located. It corresponds with the broker config 'broker.rack'";
+
     public static final String RECONNECT_BACKOFF_MS_CONFIG = 
"reconnect.backoff.ms";
     public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of 
time to wait before attempting to reconnect to a given host. This avoids 
repeatedly connecting to a host in a tight loop. This backoff applies to all 
connection attempts by the client to a broker.";
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index ae75045..f991fa6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -129,6 +129,10 @@ public class Metadata implements Closeable {
         return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
     }
 
+    public long metadataExpireMs() {
+        return this.metadataExpireMs;
+    }
+
     /**
      * Request an update of the current cluster metadata info, return the 
current updateVersion before the update
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ba1928e..c80b71f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -177,6 +177,11 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String CLIENT_ID_CONFIG = 
CommonClientConfigs.CLIENT_ID_CONFIG;
 
     /**
+     * <code>client.rack</code>
+     */
+    public static final String CLIENT_RACK_CONFIG = 
CommonClientConfigs.CLIENT_RACK_CONFIG;
+
+    /**
      * <code>reconnect.backoff.ms</code>
      */
     public static final String RECONNECT_BACKOFF_MS_CONFIG = 
CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
@@ -328,6 +333,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         "",
                                         Importance.LOW,
                                         CommonClientConfigs.CLIENT_ID_DOC)
+                                .define(CLIENT_RACK_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.LOW,
+                                        CommonClientConfigs.CLIENT_RACK_DOC)
                                 .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
                                         Type.INT,
                                         DEFAULT_MAX_PARTITION_FETCH_BYTES,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3bfd5ac..07be128 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -799,6 +799,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
                     config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
+                    config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
                     this.keyDeserializer,
                     this.valueDeserializer,
                     this.metadata,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 870a8b7..4ea9b0b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -126,6 +126,7 @@ public class Fetcher<K, V> implements Closeable {
     private final long requestTimeoutMs;
     private final int maxPollRecords;
     private final boolean checkCrcs;
+    private final String clientRackId;
     private final ConsumerMetadata metadata;
     private final FetchManagerMetrics sensors;
     private final SubscriptionState subscriptions;
@@ -149,6 +150,7 @@ public class Fetcher<K, V> implements Closeable {
                    int fetchSize,
                    int maxPollRecords,
                    boolean checkCrcs,
+                   String clientRackId,
                    Deserializer<K> keyDeserializer,
                    Deserializer<V> valueDeserializer,
                    ConsumerMetadata metadata,
@@ -171,6 +173,7 @@ public class Fetcher<K, V> implements Closeable {
         this.fetchSize = fetchSize;
         this.maxPollRecords = maxPollRecords;
         this.checkCrcs = checkCrcs;
+        this.clientRackId = clientRackId;
         this.keyDeserializer = keyDeserializer;
         this.valueDeserializer = valueDeserializer;
         this.completedFetches = new ConcurrentLinkedQueue<>();
@@ -223,7 +226,9 @@ public class Fetcher<K, V> implements Closeable {
                     .isolationLevel(isolationLevel)
                     .setMaxBytes(this.maxBytes)
                     .metadata(data.metadata())
-                    .toForget(data.toForget());
+                    .toForget(data.toForget())
+                    .rackId(clientRackId);
+
             if (log.isDebugEnabled()) {
                 log.debug("Sending {} {} to broker {}", isolationLevel, 
data.toString(), fetchTarget);
             }
@@ -1005,6 +1010,26 @@ public class Fetcher<K, V> implements Closeable {
     }
 
     /**
+     * Determine which replica to read from.
+     */
+    Node selectReadReplica(TopicPartition partition, Node leaderReplica, long 
currentTimeMs) {
+        Optional<Integer> nodeId = 
subscriptions.preferredReadReplica(partition, currentTimeMs);
+        if (nodeId.isPresent()) {
+            Optional<Node> node = nodeId.flatMap(id -> 
metadata.fetch().nodeIfOnline(partition, id));
+            if (node.isPresent()) {
+                return node.get();
+            } else {
+                log.trace("Not fetching from {} for partition {} since it is 
marked offline or is missing from our metadata," +
+                          " using the leader instead.", nodeId, partition);
+                subscriptions.clearPreferredReadReplica(partition);
+                return leaderReplica;
+            }
+        } else {
+            return leaderReplica;
+        }
+    }
+
+    /**
      * Create fetch requests for all nodes for which we have assigned 
partitions
      * that have no existing requests in flight.
      */
@@ -1015,10 +1040,12 @@ public class Fetcher<K, V> implements Closeable {
         subscriptions.assignedPartitions().forEach(
             tp -> subscriptions.maybeValidatePosition(tp, 
metadata.leaderAndEpoch(tp)));
 
+        long currentTimeMs = time.milliseconds();
+
         for (TopicPartition partition : fetchablePartitions()) {
+            // Use the preferred read replica if set, or the position's leader
             SubscriptionState.FetchPosition position = 
this.subscriptions.position(partition);
-            Metadata.LeaderAndEpoch leaderAndEpoch = position.currentLeader;
-            Node node = leaderAndEpoch.leader;
+            Node node = selectReadReplica(partition, 
position.currentLeader.leader, currentTimeMs);
 
             if (node == null || node.isEmpty()) {
                 metadata.requestUpdate();
@@ -1032,23 +1059,23 @@ public class Fetcher<K, V> implements Closeable {
                 log.trace("Skipping fetch for partition {} because there is an 
in-flight request to {}", partition, node);
             } else {
                 // if there is a leader and no in-flight requests, issue a new 
fetch
-                FetchSessionHandler.Builder builder = 
fetchable.get(leaderAndEpoch.leader);
+                FetchSessionHandler.Builder builder = fetchable.get(node);
                 if (builder == null) {
-                    int id = leaderAndEpoch.leader.id();
+                    int id = node.id();
                     FetchSessionHandler handler = sessionHandler(id);
                     if (handler == null) {
                         handler = new FetchSessionHandler(logContext, id);
                         sessionHandlers.put(id, handler);
                     }
                     builder = handler.newBuilder();
-                    fetchable.put(leaderAndEpoch.leader, builder);
+                    fetchable.put(node, builder);
                 }
 
                 builder.add(partition, new 
FetchRequest.PartitionData(position.offset,
-                        FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize, 
leaderAndEpoch.epoch));
+                        FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize, 
position.currentLeader.epoch));
 
                 log.debug("Added {} fetch request for partition {} at position 
{} to node {}", isolationLevel,
-                    partition, position, leaderAndEpoch.leader);
+                    partition, position, node);
             }
         }
 
@@ -1136,24 +1163,42 @@ public class Fetcher<K, V> implements Closeable {
                     log.trace("Updating last stable offset for partition {} to 
{}", tp, partition.lastStableOffset);
                     subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset);
                 }
+
+                if (partition.preferredReadReplica.isPresent()) {
+                    
subscriptions.updatePreferredReadReplica(partitionRecords.partition, 
partition.preferredReadReplica.get(), () -> {
+                        long expireTimeMs = time.milliseconds() + 
metadata.metadataExpireMs();
+                        log.debug("Updating preferred read replica for 
partition {} to {}, set to expire at {}",
+                                tp, partition.preferredReadReplica.get(), 
expireTimeMs);
+                        return expireTimeMs;
+                    });
+                }
+
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
                        error == Errors.REPLICA_NOT_AVAILABLE ||
                        error == Errors.KAFKA_STORAGE_ERROR ||
-                       error == Errors.FENCED_LEADER_EPOCH) {
+                       error == Errors.FENCED_LEADER_EPOCH ||
+                       error == Errors.OFFSET_NOT_AVAILABLE) {
                 log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
                 this.metadata.requestUpdate();
             } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                 log.warn("Received unknown topic or partition error in fetch 
for partition {}", tp);
                 this.metadata.requestUpdate();
             } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
-                if (fetchOffset != subscriptions.position(tp).offset) {
-                    log.debug("Discarding stale fetch response for partition 
{} since the fetched offset {} " +
-                            "does not match the current offset {}", tp, 
fetchOffset, subscriptions.position(tp));
-                } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
-                    log.info("Fetch offset {} is out of range for partition 
{}, resetting offset", fetchOffset, tp);
-                    subscriptions.requestOffsetReset(tp);
+                Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);
+                if (!clearedReplicaId.isPresent()) {
+                    // If there's no preferred replica to clear, we're 
fetching from the leader so handle this error normally
+                    if (fetchOffset != subscriptions.position(tp).offset) {
+                        log.debug("Discarding stale fetch response for 
partition {} since the fetched offset {} " +
+                                "does not match the current offset {}", tp, 
fetchOffset, subscriptions.position(tp));
+                    } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
+                        log.info("Fetch offset {} is out of range for 
partition {}, resetting offset", fetchOffset, tp);
+                        subscriptions.requestOffsetReset(tp);
+                    } else {
+                        throw new 
OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
+                    }
                 } else {
-                    throw new 
OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
+                    log.debug("Unset the preferred read replica {} for 
partition {} since we got {} when fetching {}",
+                            clearedReplicaId.get(), tp, error, fetchOffset);
                 }
             } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                 //we log the actual partition and not just the topic to help 
with ACL propagation issues in large clusters
@@ -1304,6 +1349,10 @@ public class Fetcher<K, V> implements Closeable {
             }
         }
 
+        private Optional<Integer> preferredReadReplica() {
+            return completedFetch.partitionData.preferredReadReplica;
+        }
+
         private void maybeEnsureValid(RecordBatch batch) {
             if (checkCrcs && currentBatch.magic() >= 
RecordBatch.MAGIC_VALUE_V2) {
                 try {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 3909421..87d1a35 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -40,6 +40,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
@@ -418,6 +419,39 @@ public class SubscriptionState {
         assignedState(tp).lastStableOffset = lastStableOffset;
     }
 
+    /**
+     * Set the preferred read replica with a lease timeout. After this time, 
the replica will no longer be valid and
+     * {@link #preferredReadReplica(TopicPartition, long)} will return an 
empty result.
+     *
+     * @param tp The topic partition
+     * @param preferredReadReplicaId The preferred read replica
+     * @param timeMs The time at which this preferred replica is no longer 
valid
+     */
+    public void updatePreferredReadReplica(TopicPartition tp, int 
preferredReadReplicaId, Supplier<Long> timeMs) {
+        assignedState(tp).updatePreferredReadReplica(preferredReadReplicaId, 
timeMs);
+    }
+
+    /**
+     * Get the preferred read replica
+     *
+     * @param tp The topic partition
+     * @param timeMs The current time
+     * @return Returns the current preferred read replica, if it has been set 
and if it has not expired.
+     */
+    public Optional<Integer> preferredReadReplica(TopicPartition tp, long 
timeMs) {
+        return assignedState(tp).preferredReadReplica(timeMs);
+    }
+
+    /**
+     * Unset the preferred read replica. This causes the fetcher to go back to 
the leader for fetches.
+     *
+     * @param tp The topic partition
+     * @return true if the preferred read replica was set, false otherwise.
+     */
+    public Optional<Integer> clearPreferredReadReplica(TopicPartition tp) {
+        return assignedState(tp).clearPreferredReadReplica();
+    }
+
     public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
         Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
         assignment.stream().forEach(state -> {
@@ -553,7 +587,8 @@ public class SubscriptionState {
         private boolean paused;  // whether this partition has been paused by 
the user
         private OffsetResetStrategy resetStrategy;  // the strategy to use if 
the offset needs resetting
         private Long nextRetryTimeMs;
-
+        private Integer preferredReadReplica;
+        private Long preferredReadReplicaExpireTimeMs;
 
         TopicPartitionState() {
             this.paused = false;
@@ -564,6 +599,7 @@ public class SubscriptionState {
             this.lastStableOffset = null;
             this.resetStrategy = null;
             this.nextRetryTimeMs = null;
+            this.preferredReadReplica = null;
         }
 
         private void transitionState(FetchState newState, Runnable 
runIfTransitioned) {
@@ -574,6 +610,33 @@ public class SubscriptionState {
             }
         }
 
+        private Optional<Integer> preferredReadReplica(long timeMs) {
+            if (preferredReadReplicaExpireTimeMs != null && timeMs > 
preferredReadReplicaExpireTimeMs) {
+                preferredReadReplica = null;
+                return Optional.empty();
+            } else {
+                return Optional.ofNullable(preferredReadReplica);
+            }
+        }
+
+        private void updatePreferredReadReplica(int preferredReadReplica, 
Supplier<Long> timeMs) {
+            if (this.preferredReadReplica == null || preferredReadReplica != 
this.preferredReadReplica) {
+                this.preferredReadReplica = preferredReadReplica;
+                this.preferredReadReplicaExpireTimeMs = timeMs.get();
+            }
+        }
+
+        private Optional<Integer> clearPreferredReadReplica() {
+            if (preferredReadReplica != null) {
+                int removedReplicaId = this.preferredReadReplica;
+                this.preferredReadReplica = null;
+                this.preferredReadReplicaExpireTimeMs = null;
+                return Optional.of(removedReplicaId);
+            } else {
+                return Optional.empty();
+            }
+        }
+
         private void reset(OffsetResetStrategy strategy) {
             transitionState(FetchStates.AWAIT_RESET, () -> {
                 this.resetStrategy = strategy;
@@ -594,6 +657,7 @@ public class SubscriptionState {
             if (position != null && 
!position.safeToFetchFrom(currentLeaderAndEpoch)) {
                 FetchPosition newPosition = new FetchPosition(position.offset, 
position.offsetEpoch, currentLeaderAndEpoch);
                 validatePosition(newPosition);
+                preferredReadReplica = null;
             }
             return this.fetchState.equals(FetchStates.AWAIT_VALIDATION);
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java 
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 753b7f9..0b01d22 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Utils;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -27,6 +28,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -184,6 +186,21 @@ public final class Cluster {
     }
 
     /**
+     * Get the node by node id if the replica for the given partition is online
+     * @param partition
+     * @param id
+     * @return
+     */
+    public Optional<Node> nodeIfOnline(TopicPartition partition, int id) {
+        Node node = nodeById(id);
+        if (node != null && 
!Arrays.asList(partition(partition).offlineReplicas()).contains(node)) {
+            return Optional.of(node);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    /**
      * Get the current leader for the given topic-partition
      * @param topicPartition The topic and partition we want to know the 
leader for
      * @return The node that is the leader for this topic-partition, or null 
if there is currently no leader
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index b3443a1..485b102 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -63,6 +63,7 @@ public class FetchRequest extends AbstractRequest {
                     "consumers to discard ABORTED transactional records");
     private static final Field.Int32 SESSION_ID = new 
Field.Int32("session_id", "The fetch session ID");
     private static final Field.Int32 SESSION_EPOCH = new 
Field.Int32("session_epoch", "The fetch session epoch");
+    private static final Field.Str RACK_ID = new Field.Str("rack_id", "The 
consumer's rack id");
 
     // topic level fields
     private static final Field.ComplexArray PARTITIONS = new 
Field.ComplexArray("partitions",
@@ -194,10 +195,22 @@ public class FetchRequest extends AbstractRequest {
     // V10 bumped up to indicate ZStandard capability. (see KIP-110)
     private static final Schema FETCH_REQUEST_V10 = FETCH_REQUEST_V9;
 
+    private static final Schema FETCH_REQUEST_V11 = new Schema(
+            REPLICA_ID,
+            MAX_WAIT_TIME,
+            MIN_BYTES,
+            MAX_BYTES,
+            ISOLATION_LEVEL,
+            SESSION_ID,
+            SESSION_EPOCH,
+            FETCH_REQUEST_TOPIC_V9,
+            FORGOTTEN_TOPIC_DATA_V7,
+            RACK_ID);
+
     public static Schema[] schemaVersions() {
         return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, 
FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4,
             FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, 
FETCH_REQUEST_V8, FETCH_REQUEST_V9,
-            FETCH_REQUEST_V10};
+            FETCH_REQUEST_V10, FETCH_REQUEST_V11};
     }
 
     // default values for older versions where a request level limit did not 
exist
@@ -217,6 +230,7 @@ public class FetchRequest extends AbstractRequest {
 
     private final List<TopicPartition> toForget;
     private final FetchMetadata metadata;
+    private final String rackId;
 
     public static final class PartitionData {
         public final long fetchOffset;
@@ -290,6 +304,7 @@ public class FetchRequest extends AbstractRequest {
         private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
         private FetchMetadata metadata = FetchMetadata.LEGACY;
         private List<TopicPartition> toForget = Collections.emptyList();
+        private String rackId = "";
 
         public static Builder forConsumer(int maxWait, int minBytes, 
Map<TopicPartition, PartitionData> fetchData) {
             return new Builder(ApiKeys.FETCH.oldestVersion(), 
ApiKeys.FETCH.latestVersion(),
@@ -320,6 +335,11 @@ public class FetchRequest extends AbstractRequest {
             return this;
         }
 
+        public Builder rackId(String rackId) {
+            this.rackId = rackId;
+            return this;
+        }
+
         public Map<TopicPartition, PartitionData> fetchData() {
             return this.fetchData;
         }
@@ -345,7 +365,7 @@ public class FetchRequest extends AbstractRequest {
             }
 
             return new FetchRequest(version, replicaId, maxWait, minBytes, 
maxBytes, fetchData,
-                isolationLevel, toForget, metadata);
+                isolationLevel, toForget, metadata, rackId);
         }
 
         @Override
@@ -360,6 +380,7 @@ public class FetchRequest extends AbstractRequest {
                     append(", isolationLevel=").append(isolationLevel).
                     append(", toForget=").append(Utils.join(toForget, ", ")).
                     append(", metadata=").append(metadata).
+                    append(", rackId=").append(rackId).
                     append(")");
             return bld.toString();
         }
@@ -367,7 +388,7 @@ public class FetchRequest extends AbstractRequest {
 
     private FetchRequest(short version, int replicaId, int maxWait, int 
minBytes, int maxBytes,
                          Map<TopicPartition, PartitionData> fetchData, 
IsolationLevel isolationLevel,
-                         List<TopicPartition> toForget, FetchMetadata 
metadata) {
+                         List<TopicPartition> toForget, FetchMetadata 
metadata, String rackId) {
         super(ApiKeys.FETCH, version);
         this.replicaId = replicaId;
         this.maxWait = maxWait;
@@ -377,6 +398,7 @@ public class FetchRequest extends AbstractRequest {
         this.isolationLevel = isolationLevel;
         this.toForget = toForget;
         this.metadata = metadata;
+        this.rackId = rackId;
     }
 
     public FetchRequest(Struct struct, short version) {
@@ -421,6 +443,7 @@ public class FetchRequest extends AbstractRequest {
                 fetchData.put(new TopicPartition(topic, partition), 
partitionData);
             }
         }
+        rackId = struct.getOrElse(RACK_ID, "");
     }
 
     @Override
@@ -436,7 +459,7 @@ public class FetchRequest extends AbstractRequest {
         for (Map.Entry<TopicPartition, PartitionData> entry : 
fetchData.entrySet()) {
             FetchResponse.PartitionData<MemoryRecords> partitionResponse = new 
FetchResponse.PartitionData<>(error,
                 FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                FetchResponse.INVALID_LOG_START_OFFSET, null, 
MemoryRecords.EMPTY);
+                FetchResponse.INVALID_LOG_START_OFFSET, null, null, 
MemoryRecords.EMPTY);
             responseData.put(entry.getKey(), partitionResponse);
         }
         return new FetchResponse<>(error, responseData, throttleTimeMs, 
metadata.sessionId());
@@ -478,6 +501,10 @@ public class FetchRequest extends AbstractRequest {
         return metadata;
     }
 
+    public String rackId() {
+        return rackId;
+    }
+
     public static FetchRequest parse(ByteBuffer buffer, short version) {
         return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), 
version);
     }
@@ -530,6 +557,7 @@ public class FetchRequest extends AbstractRequest {
             }
             struct.set(FORGOTTEN_TOPICS, toForgetStructs.toArray());
         }
+        struct.setIfExists(RACK_ID, rackId);
         return struct;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 3191f42..e857b5b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -38,6 +38,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Queue;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
@@ -78,6 +79,8 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
             "Last committed offset.");
     private static final Field.Int64 LOG_START_OFFSET = new 
Field.Int64("log_start_offset",
             "Earliest available offset.");
+    private static final Field.Int32 PREFERRED_READ_REPLICA = new 
Field.Int32("preferred_read_replica",
+            "The ID of the replica that the consumer should prefer.");
 
     private static final String PARTITION_HEADER_KEY_NAME = "partition_header";
     private static final String ABORTED_TRANSACTIONS_KEY_NAME = 
"aborted_transactions";
@@ -140,6 +143,15 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
             LOG_START_OFFSET,
             new Field(ABORTED_TRANSACTIONS_KEY_NAME, 
ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
 
+    private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V6 = new 
Schema(
+            PARTITION_ID,
+            ERROR_CODE,
+            HIGH_WATERMARK,
+            LAST_STABLE_OFFSET,
+            LOG_START_OFFSET,
+            new Field(ABORTED_TRANSACTIONS_KEY_NAME, 
ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)),
+            PREFERRED_READ_REPLICA);
+
     private static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
             new Field(PARTITION_HEADER_KEY_NAME, 
FETCH_RESPONSE_PARTITION_HEADER_V4),
             new Field(RECORD_SET_KEY_NAME, RECORDS));
@@ -148,6 +160,10 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
             new Field(PARTITION_HEADER_KEY_NAME, 
FETCH_RESPONSE_PARTITION_HEADER_V5),
             new Field(RECORD_SET_KEY_NAME, RECORDS));
 
+    private static final Schema FETCH_RESPONSE_PARTITION_V6 = new Schema(
+            new Field(PARTITION_HEADER_KEY_NAME, 
FETCH_RESPONSE_PARTITION_HEADER_V6),
+            new Field(RECORD_SET_KEY_NAME, RECORDS));
+
     private static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
             TOPIC_NAME,
             new Field(PARTITIONS_KEY_NAME, new 
ArrayOf(FETCH_RESPONSE_PARTITION_V4)));
@@ -156,6 +172,10 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
             TOPIC_NAME,
             new Field(PARTITIONS_KEY_NAME, new 
ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
 
+    private static final Schema FETCH_RESPONSE_TOPIC_V6 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new 
ArrayOf(FETCH_RESPONSE_PARTITION_V6)));
+
     private static final Schema FETCH_RESPONSE_V4 = new Schema(
             THROTTLE_TIME_MS,
             new Field(RESPONSES_KEY_NAME, new 
ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
@@ -186,15 +206,24 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
     // V10 bumped up to indicate ZStandard capability. (see KIP-110)
     private static final Schema FETCH_RESPONSE_V10 = FETCH_RESPONSE_V9;
 
+    private static final Schema FETCH_RESPONSE_V11 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE,
+            SESSION_ID,
+            new Field(RESPONSES_KEY_NAME, new 
ArrayOf(FETCH_RESPONSE_TOPIC_V6)));
+
+
     public static Schema[] schemaVersions() {
         return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, 
FETCH_RESPONSE_V2,
             FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, 
FETCH_RESPONSE_V6,
-            FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9, 
FETCH_RESPONSE_V10};
+            FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9, 
FETCH_RESPONSE_V10,
+            FETCH_RESPONSE_V11};
     }
 
     public static final long INVALID_HIGHWATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
+    public static final int UNSPECIFIED_PREFERRED_REPLICA = -1;
 
     private final int throttleTimeMs;
     private final Errors error;
@@ -240,6 +269,7 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
         public final long highWatermark;
         public final long lastStableOffset;
         public final long logStartOffset;
+        public final Optional<Integer> preferredReadReplica;
         public final List<AbortedTransaction> abortedTransactions;
         public final T records;
 
@@ -247,12 +277,29 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
                              long highWatermark,
                              long lastStableOffset,
                              long logStartOffset,
+                             Integer preferredReadReplica,
+                             List<AbortedTransaction> abortedTransactions,
+                             T records) {
+            this.error = error;
+            this.highWatermark = highWatermark;
+            this.lastStableOffset = lastStableOffset;
+            this.logStartOffset = logStartOffset;
+            this.preferredReadReplica = 
Optional.ofNullable(preferredReadReplica);
+            this.abortedTransactions = abortedTransactions;
+            this.records = records;
+        }
+
+        public PartitionData(Errors error,
+                             long highWatermark,
+                             long lastStableOffset,
+                             long logStartOffset,
                              List<AbortedTransaction> abortedTransactions,
                              T records) {
             this.error = error;
             this.highWatermark = highWatermark;
             this.lastStableOffset = lastStableOffset;
             this.logStartOffset = logStartOffset;
+            this.preferredReadReplica = Optional.empty();
             this.abortedTransactions = abortedTransactions;
             this.records = records;
         }
@@ -270,6 +317,7 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
                     highWatermark == that.highWatermark &&
                     lastStableOffset == that.lastStableOffset &&
                     logStartOffset == that.logStartOffset &&
+                    Objects.equals(preferredReadReplica, 
that.preferredReadReplica) &&
                     Objects.equals(abortedTransactions, 
that.abortedTransactions) &&
                     Objects.equals(records, that.records);
         }
@@ -280,6 +328,7 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
             result = 31 * result + Long.hashCode(highWatermark);
             result = 31 * result + Long.hashCode(lastStableOffset);
             result = 31 * result + Long.hashCode(logStartOffset);
+            result = 31 * result + (preferredReadReplica != null ? 
preferredReadReplica.hashCode() : 0);
             result = 31 * result + (abortedTransactions != null ? 
abortedTransactions.hashCode() : 0);
             result = 31 * result + (records != null ? records.hashCode() : 0);
             return result;
@@ -291,6 +340,7 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
                     ", highWaterMark=" + highWatermark +
                     ", lastStableOffset = " + lastStableOffset +
                     ", logStartOffset = " + logStartOffset +
+                    ", preferredReadReplica = " + 
preferredReadReplica.map(Object::toString).orElse("absent") +
                     ", abortedTransactions = " + abortedTransactions +
                     ", recordsSizeInBytes=" + records.sizeInBytes() + ")";
         }
@@ -329,6 +379,7 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
                 long highWatermark = 
partitionResponseHeader.get(HIGH_WATERMARK);
                 long lastStableOffset = 
partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, 
INVALID_LAST_STABLE_OFFSET);
                 long logStartOffset = 
partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
+                int preferredReadReplica = 
partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, 
UNSPECIFIED_PREFERRED_REPLICA);
 
                 BaseRecords baseRecords = 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
                 if (!(baseRecords instanceof MemoryRecords))
@@ -350,7 +401,8 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
                 }
 
                 PartitionData<MemoryRecords> partitionData = new 
PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, abortedTransactions, records);
+                        logStartOffset, preferredReadReplica == 
UNSPECIFIED_PREFERRED_REPLICA ? null : preferredReadReplica,
+                        abortedTransactions, records);
                 responseData.put(new TopicPartition(topic, partition), 
partitionData);
             }
         }
@@ -513,6 +565,7 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
                     }
                 }
                 partitionDataHeader.setIfExists(LOG_START_OFFSET, 
fetchPartitionData.logStartOffset);
+                partitionDataHeader.setIfExists(PREFERRED_READ_REPLICA, 
fetchPartitionData.preferredReadReplica.orElse(-1));
                 partitionData.set(PARTITION_HEADER_KEY_NAME, 
partitionDataHeader);
                 partitionData.set(RECORD_SET_KEY_NAME, 
fetchPartitionData.records);
                 partitionArray.add(partitionData);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 75788a0..5052b0e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -36,6 +36,15 @@ import static 
org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
 import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
 
 public class OffsetsForLeaderEpochRequest extends AbstractRequest {
+    private static final Field.Int32 REPLICA_ID = new Field.Int32("replica_id",
+            "Broker id of the follower. For normal consumers, use -1.");
+
+    /**
+     * Sentinel replica_id value to indicate a regular consumer rather than 
another broker
+     */
+    public static final int CONSUMER_REPLICA_ID = -1;
+
+
     private static final Field.ComplexArray TOPICS = new 
Field.ComplexArray("topics",
             "An array of topics to get epochs for");
     private static final Field.ComplexArray PARTITIONS = new 
Field.ComplexArray("partitions",
@@ -67,28 +76,40 @@ public class OffsetsForLeaderEpochRequest extends 
AbstractRequest {
     private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V2 = new 
Schema(
             TOPICS_V2);
 
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V3 = new 
Schema(
+            REPLICA_ID,
+            TOPICS_V2);
+
     public static Schema[] schemaVersions() {
         return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0, 
OFFSET_FOR_LEADER_EPOCH_REQUEST_V1,
-            OFFSET_FOR_LEADER_EPOCH_REQUEST_V2};
+            OFFSET_FOR_LEADER_EPOCH_REQUEST_V2, 
OFFSET_FOR_LEADER_EPOCH_REQUEST_V3};
     }
 
-    private Map<TopicPartition, PartitionData> epochsByPartition;
+    private final Map<TopicPartition, PartitionData> epochsByPartition;
+
+    private final int replicaId;
 
     public Map<TopicPartition, PartitionData> epochsByTopicPartition() {
         return epochsByPartition;
     }
 
+    public int replicaId() {
+        return replicaId;
+    }
+
     public static class Builder extends 
AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
         private final Map<TopicPartition, PartitionData> epochsByPartition;
+        private final int replicaId;
 
         public Builder(short version, Map<TopicPartition, PartitionData> 
epochsByPartition) {
             super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
             this.epochsByPartition = epochsByPartition;
+            this.replicaId = CONSUMER_REPLICA_ID;
         }
 
         @Override
         public OffsetsForLeaderEpochRequest build(short version) {
-            return new OffsetsForLeaderEpochRequest(epochsByPartition, 
version);
+            return new OffsetsForLeaderEpochRequest(epochsByPartition, 
replicaId, version);
         }
 
         public static OffsetsForLeaderEpochRequest parse(ByteBuffer buffer, 
short version) {
@@ -105,13 +126,15 @@ public class OffsetsForLeaderEpochRequest extends 
AbstractRequest {
         }
     }
 
-    public OffsetsForLeaderEpochRequest(Map<TopicPartition, PartitionData> 
epochsByPartition, short version) {
+    public OffsetsForLeaderEpochRequest(Map<TopicPartition, PartitionData> 
epochsByPartition, int replicaId, short version) {
         super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
         this.epochsByPartition = epochsByPartition;
+        this.replicaId = replicaId;
     }
 
     public OffsetsForLeaderEpochRequest(Struct struct, short version) {
         super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
+        replicaId = struct.getOrElse(REPLICA_ID, CONSUMER_REPLICA_ID);
         epochsByPartition = new HashMap<>();
         for (Object topicAndEpochsObj : struct.get(TOPICS)) {
             Struct topicAndEpochs = (Struct) topicAndEpochsObj;
@@ -134,6 +157,7 @@ public class OffsetsForLeaderEpochRequest extends 
AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct requestStruct = new 
Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
+        requestStruct.set(REPLICA_ID, replicaId);
 
         Map<String, Map<Integer, PartitionData>> topicsToPartitionEpochs = 
CollectionUtils.groupPartitionDataByTopic(epochsByPartition);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index d5d1265..3fe3cdc 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -85,9 +85,12 @@ public class OffsetsForLeaderEpochResponse extends 
AbstractResponse {
             THROTTLE_TIME_MS,
             TOPICS_V1);
 
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3 = 
OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2;
+
+
     public static Schema[] schemaVersions() {
         return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0, 
OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1,
-            OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2};
+            OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2, 
OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3};
     }
 
     private final int throttleTimeMs;
diff --git a/clients/src/main/resources/common/message/FetchRequest.json 
b/clients/src/main/resources/common/message/FetchRequest.json
index 24c974d..5b834b8 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -44,7 +44,7 @@
   // Version 10 indicates that we can use the ZStd compression algorithm, as
   // described in KIP-110.
   //
-  "validVersions": "0-10",
+  "validVersions": "0-11",
   "fields": [
     { "name": "ReplicaId", "type": "int32", "versions": "0+",
       "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
@@ -84,6 +84,8 @@
         "about": "The partition name." },
       { "name": "ForgottenPartitionIndexes", "type": "[]int32", "versions": 
"7+",
         "about": "The partitions indexes to forget." }
-    ]}
+    ]},
+    { "name": "RackId", "type":  "string", "versions": "11+", "default": "", 
"ignorable": true,
+      "about": "Rack ID of the consumer making this request"}
   ]
 }
diff --git a/clients/src/main/resources/common/message/FetchResponse.json 
b/clients/src/main/resources/common/message/FetchResponse.json
index 5ebc97c..f6cc582 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -38,7 +38,7 @@
   // Version 10 indicates that the response data can use the ZStd compression
   // algorithm, as described in KIP-110.
   //
-  "validVersions": "0-10",
+  "validVersions": "0-11",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
@@ -69,6 +69,8 @@
           { "name": "FirstOffset", "type": "int64", "versions": "4+",
             "about": "The first offset in the aborted transaction." }
         ]},
+        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", 
"ignorable": true,
+          "about": "The preferred read replica for the consumer to use on its 
next fetch request"},
         { "name": "Records", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+",
           "about": "The record data." }
       ]}
diff --git 
a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json 
b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
index 4104938..84585ca 100644
--- a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
+++ b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
@@ -20,8 +20,11 @@
   // Version 1 is the same as version 0.
   //
   // Version 2 adds the current leader epoch to support fencing.
-  "validVersions": "0-2",
+  // Version 3 adds ReplicaId
+  "validVersions": "0-3",
   "fields": [
+    { "name": "ReplicaId", "type": "int32", "versions": "3+",
+      "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
     { "name": "Topics", "type": "[]OffsetForLeaderTopic", "versions": "0+",
       "about": "Each topic to get offsets for.", "fields": [
       { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
diff --git 
a/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json 
b/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json
index 8e93422..1694606 100644
--- 
a/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json
+++ 
b/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json
@@ -19,7 +19,7 @@
   "name": "OffsetForLeaderEpochResponse",
   // Version 1 added the leader epoch to the response.
   // Version 2 added the throttle time.
-  "validVersions": "0-2",
+  "validVersions": "0-3",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", 
"ignorable": true,
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java 
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 0e3d191..a0d0819 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -36,6 +37,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.kafka.test.TestUtils.assertOptional;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -437,4 +439,25 @@ public class MetadataTest {
         assertNull(metadata.getAndClearMetadataException());
     }
 
+    @Test
+    public void testNodeIfOffline() {
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put("topic-1", 1);
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+
+        MetadataResponse metadataResponse = 
TestUtils.metadataUpdateWith("dummy", 2, Collections.emptyMap(), 
partitionCounts, _tp -> 99,
+            (error, partition, leader, leaderEpoch, replicas, isr, 
offlineReplicas) ->
+                new MetadataResponse.PartitionMetadata(error, partition, 
node0, leaderEpoch,
+                    Collections.singletonList(node0), Collections.emptyList(), 
Collections.singletonList(node1)));
+        metadata.update(emptyMetadataResponse(), 0L);
+        metadata.update(metadataResponse, 10L);
+
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+
+        assertOptional(metadata.fetch().nodeIfOnline(tp, 0), node -> 
assertEquals(node.id(), 0));
+        assertFalse(metadata.fetch().nodeIfOnline(tp, 1).isPresent());
+        assertEquals(metadata.fetch().nodeById(0).id(), 0);
+        assertEquals(metadata.fetch().nodeById(1).id(), 1);
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 606b711..9012ea2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1913,6 +1913,7 @@ public class KafkaConsumerTest {
                 fetchSize,
                 maxPollRecords,
                 checkCrcs,
+                "",
                 keyDeserializer,
                 valueDeserializer,
                 metadata,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 30c9a04..1dfdf23 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -78,6 +78,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
@@ -96,6 +97,7 @@ import java.io.DataOutputStream;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -1171,7 +1173,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> 
partitions = new HashMap<>();
         partitions.put(tp0, new FetchResponse.PartitionData<>(Errors.NONE, 100,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, null, null, records));
         client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.NONE, 100L, 0));
         consumerClient.poll(time.timer(0));
 
@@ -1182,7 +1184,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         partitions = new HashMap<>();
         partitions.put(tp1, new 
FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY));
         client.prepareResponse(new FetchResponse<>(Errors.NONE, new 
LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID));
         consumerClient.poll(time.timer(0));
         assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
@@ -1767,6 +1769,7 @@ public class FetcherTest {
         for (int i = 1; i <= 3; i++) {
             int throttleTimeMs = 100 * i;
             FetchRequest.Builder builder = 
FetchRequest.Builder.forConsumer(100, 100, new LinkedHashMap<>());
+            builder.rackId("");
             ClientRequest request = client.newClientRequest(node.idString(), 
builder, time.milliseconds(), true);
             client.send(request, time.milliseconds());
             client.poll(1, time.milliseconds());
@@ -2812,7 +2815,7 @@ public class FetcherTest {
         for (int i = 0; i < numPartitions; i++)
             topicPartitions.add(new TopicPartition(topicName, i));
 
-        buildDependencies(new MetricConfig(), OffsetResetStrategy.EARLIEST);
+        buildDependencies(new MetricConfig(), OffsetResetStrategy.EARLIEST, 
Long.MAX_VALUE);
 
         fetcher = new Fetcher<byte[], byte[]>(
                 new LogContext(),
@@ -2823,6 +2826,7 @@ public class FetcherTest {
                 fetchSize,
                 2 * numPartitions,
                 true,
+                "",
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(),
                 metadata,
@@ -3263,6 +3267,86 @@ public class FetcherTest {
         assertOptional(subscriptions.position(tp0).offsetEpoch, value -> 
assertEquals(value.intValue(), 1));
     }
 
+    @Test
+    public void testPreferredReadReplica() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(), new BytesDeserializer(),
+                Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, 
Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(TestUtils.metadataUpdateWith(2, 
singletonMap(topicName, 4)));
+        subscriptions.seek(tp0, 0);
+
+        // Node preferred replica before first fetch response
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(selected.id(), -1);
+
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.NONE, 100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        // verify
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(selected.id(), 1);
+
+
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Set preferred read replica to node=2, which isn't in our metadata, 
should revert to leader
+        client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.NONE, 100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 2));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(selected.id(), -1);
+    }
+
+    @Test
+    public void testPreferredReadReplicaOffsetError() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(), new BytesDeserializer(),
+                Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, 
Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(TestUtils.metadataUpdateWith(2, 
singletonMap(topicName, 4)));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.NONE, 100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        fetchedRecords();
+
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(selected.id(), 1);
+
+        // Return an error, should unset the preferred read replica
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.OFFSET_OUT_OF_RANGE, 100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, null));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        fetchedRecords();
+
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(selected.id(), -1);
+    }
+
     private MockClient.RequestMatcher listOffsetRequestMatcher(final long 
timestamp) {
         // matches any list offset request with the provided timestamp
         return new MockClient.RequestMatcher() {
@@ -3308,6 +3392,14 @@ public class FetcherTest {
         return new FetchResponse<>(Errors.NONE, new 
LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
     }
 
+    private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, 
MemoryRecords records, Errors error, long hw,
+                                                           long 
lastStableOffset, int throttleTime, Integer preferredReplicaId) {
+        Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> 
partitions = Collections.singletonMap(tp,
+                new FetchResponse.PartitionData<>(error, hw, lastStableOffset, 
0L,
+                        preferredReplicaId, null, records));
+        return new FetchResponse<>(Errors.NONE, new 
LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
+    }
+
     private FetchResponse<MemoryRecords> fetchResponse(TopicPartition tp, 
MemoryRecords records, Errors error, long hw,
                                         long lastStableOffset, long 
logStartOffset, int throttleTime) {
         Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> 
partitions = Collections.singletonMap(tp,
@@ -3369,7 +3461,17 @@ public class FetcherTest {
                                      Deserializer<V> valueDeserializer,
                                      int maxPollRecords,
                                      IsolationLevel isolationLevel) {
-        buildDependencies(metricConfig, offsetResetStrategy);
+        buildFetcher(metricConfig, offsetResetStrategy, keyDeserializer, 
valueDeserializer, maxPollRecords, isolationLevel, Long.MAX_VALUE);
+    }
+
+    private <K, V> void buildFetcher(MetricConfig metricConfig,
+                                     OffsetResetStrategy offsetResetStrategy,
+                                     Deserializer<K> keyDeserializer,
+                                     Deserializer<V> valueDeserializer,
+                                     int maxPollRecords,
+                                     IsolationLevel isolationLevel,
+                                     long metadataExpireMs) {
+        buildDependencies(metricConfig, offsetResetStrategy, metadataExpireMs);
         fetcher = new Fetcher<>(
                 new LogContext(),
                 consumerClient,
@@ -3379,6 +3481,7 @@ public class FetcherTest {
                 fetchSize,
                 maxPollRecords,
                 true, // check crc
+                "",
                 keyDeserializer,
                 valueDeserializer,
                 metadata,
@@ -3391,11 +3494,12 @@ public class FetcherTest {
                 isolationLevel);
     }
 
-    private void buildDependencies(MetricConfig metricConfig, 
OffsetResetStrategy offsetResetStrategy) {
+
+    private void buildDependencies(MetricConfig metricConfig, 
OffsetResetStrategy offsetResetStrategy, long metadataExpireMs) {
         LogContext logContext = new LogContext();
         time = new MockTime(1);
         subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
-        metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
+        metadata = new ConsumerMetadata(0, metadataExpireMs, false, false,
                 subscriptions, logContext, new ClusterResourceListeners());
         client = new MockClient(time, metadata);
         metrics = new Metrics(metricConfig, time);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 701866d..528c2b9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -309,6 +310,36 @@ public class SubscriptionStateTest {
         assertEquals(0, state.numAssignedPartitions());
     }
 
+    @Test
+    public void testPreferredReadReplicaLease() {
+        state.assignFromUser(Collections.singleton(tp0));
+
+        // Default state
+        assertFalse(state.preferredReadReplica(tp0, 0L).isPresent());
+
+        // Set the preferred replica with lease
+        state.updatePreferredReadReplica(tp0, 42, () -> 10L);
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 9L),  value 
-> assertEquals(value.intValue(), 42));
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 10L),  value 
-> assertEquals(value.intValue(), 42));
+        assertFalse(state.preferredReadReplica(tp0, 11L).isPresent());
+
+        // Unset the preferred replica
+        state.clearPreferredReadReplica(tp0);
+        assertFalse(state.preferredReadReplica(tp0, 9L).isPresent());
+        assertFalse(state.preferredReadReplica(tp0, 11L).isPresent());
+
+        // Set to new preferred replica with lease
+        state.updatePreferredReadReplica(tp0, 43, () -> 20L);
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 11L),  value 
-> assertEquals(value.intValue(), 43));
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 20L),  value 
-> assertEquals(value.intValue(), 43));
+        assertFalse(state.preferredReadReplica(tp0, 21L).isPresent());
+
+        // Set to new preferred replica without clearing first
+        state.updatePreferredReadReplica(tp0, 44, () -> 30L);
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 30L),  value 
-> assertEquals(value.intValue(), 44));
+        assertFalse(state.preferredReadReplica(tp0, 31L).isPresent());
+    }
+
     private static class MockRebalanceListener implements 
ConsumerRebalanceListener {
         public Collection<TopicPartition> revoked;
         public Collection<TopicPartition> assigned;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index d9a9ed1..85ea49c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -559,7 +559,7 @@ public class RequestResponseTest {
         MemoryRecords records = 
MemoryRecords.readableRecords(ByteBuffer.allocate(10));
         responseData.put(new TopicPartition("test", 0), new 
FetchResponse.PartitionData<>(
                 Errors.NONE, 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                0L, null, records));
+                0L, null, null, records));
 
         FetchResponse<MemoryRecords> v0Response = new 
FetchResponse<>(Errors.NONE, responseData, 0, INVALID_SESSION_ID);
         FetchResponse<MemoryRecords> v1Response = new 
FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
@@ -583,11 +583,11 @@ public class RequestResponseTest {
                 new FetchResponse.AbortedTransaction(15, 50)
         );
         responseData.put(new TopicPartition("bar", 0), new 
FetchResponse.PartitionData<>(Errors.NONE, 100000,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, abortedTransactions, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, null, abortedTransactions, records));
         responseData.put(new TopicPartition("bar", 1), new 
FetchResponse.PartitionData<>(Errors.NONE, 900000,
-                5, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+                5, FetchResponse.INVALID_LOG_START_OFFSET, null, null, 
records));
         responseData.put(new TopicPartition("foo", 0), new 
FetchResponse.PartitionData<>(Errors.NONE, 70000,
-                6, FetchResponse.INVALID_LOG_START_OFFSET, 
Collections.emptyList(), records));
+                6, FetchResponse.INVALID_LOG_START_OFFSET, null, 
Collections.emptyList(), records));
 
         FetchResponse<MemoryRecords> response = new 
FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
         FetchResponse deserialized = 
FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -751,11 +751,11 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, 
FetchResponse.PartitionData<MemoryRecords>> responseData = new 
LinkedHashMap<>();
         MemoryRecords records = 
MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord("blah".getBytes()));
         responseData.put(new TopicPartition("test", 0), new 
FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, 
records));
+            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null, 
records));
         List<FetchResponse.AbortedTransaction> abortedTransactions = 
Collections.singletonList(
             new FetchResponse.AbortedTransaction(234L, 999L));
         responseData.put(new TopicPartition("test", 1), new 
FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, 
abortedTransactions, MemoryRecords.EMPTY));
+            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, 
abortedTransactions, MemoryRecords.EMPTY));
         return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
     }
 
@@ -763,12 +763,12 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, 
FetchResponse.PartitionData<MemoryRecords>> responseData = new 
LinkedHashMap<>();
         MemoryRecords records = 
MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord("blah".getBytes()));
         responseData.put(new TopicPartition("test", 0), new 
FetchResponse.PartitionData<>(Errors.NONE,
-                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, 
records));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, 
null, records));
 
         List<FetchResponse.AbortedTransaction> abortedTransactions = 
Collections.singletonList(
                 new FetchResponse.AbortedTransaction(234L, 999L));
         responseData.put(new TopicPartition("test", 1), new 
FetchResponse.PartitionData<>(Errors.NONE,
-                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, 
abortedTransactions, MemoryRecords.EMPTY));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, 
abortedTransactions, MemoryRecords.EMPTY));
 
         return new FetchResponse<>(Errors.NONE, responseData, 25, 
INVALID_SESSION_ID);
     }
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala 
b/core/src/main/scala/kafka/api/ApiVersion.scala
index d8d1edb..8e8ca0b 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -88,7 +88,9 @@ object ApiVersion {
     // New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
     KAFKA_2_2_IV1,
     // Introduced static membership.
-    KAFKA_2_3_IV0
+    KAFKA_2_3_IV0,
+    // Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+    KAFKA_2_3_IV1
   )
 
   // Map keys are the union of the short and full versions
@@ -307,6 +309,13 @@ case object KAFKA_2_3_IV0 extends DefaultApiVersion {
   val id: Int = 22
 }
 
+case object KAFKA_2_3_IV1 extends DefaultApiVersion {
+  val shortVersion: String = "2.3"
+  val subVersion = "IV1"
+  val recordVersion = RecordVersion.V2
+  val id: Int = 23
+}
+
 object ApiVersionValidator extends Validator {
 
   override def ensureValid(name: String, value: Any): Unit = {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 82f51e6..8e92c2b 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -64,7 +64,8 @@ class ReplicaFetcherThread(name: String,
 
   // Visible for testing
   private[server] val fetchRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV2) 10
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_3_IV1) 11
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV2) 10
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 8
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0) 7
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
@@ -76,7 +77,8 @@ class ReplicaFetcherThread(name: String,
 
   // Visible for testing
   private[server] val offsetForLeaderEpochRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 2
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_3_IV1) 3
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 2
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV0) 1
     else 0
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 3d7e86a..d6ebdd6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -412,7 +412,7 @@ class ReplicaFetcherThreadTest {
     assertEquals(2, mockNetwork.epochFetchCount)
     assertEquals(1, mockNetwork.fetchCount)
     assertEquals("OffsetsForLeaderEpochRequest version.",
-      2, mockNetwork.lastUsedOffsetForLeaderEpochVersion)
+      3, mockNetwork.lastUsedOffsetForLeaderEpochVersion)
 
     //Loop 3 we should not fetch epochs
     thread.doWork()

Reply via email to