This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new f210b7a  KAFKA-9261; Client should handle unavailable leader metadata 
(#7770)
f210b7a is described below

commit f210b7abbafd82c4b8491169bbb76458b85d1591
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Feb 5 09:13:11 2020 -0800

    KAFKA-9261; Client should handle unavailable leader metadata (#7770)
    
    The client caches metadata fetched from Metadata requests. Previously, each 
metadata response overwrote all of the metadata from the previous one, so we 
could rely on the expectation that the broker only returned the leaderId for a 
partition if it had connection information available. This behavior changed 
with KIP-320 since having the leader epoch allows the client to filter out 
partition metadata which is known to be stale. However, because of this, we can 
no longer rely on the requ [...]
    
    Fixing this issue was unfortunately not straightforward because the cache 
was built to maintain references to broker metadata through the `Node` object 
at the partition level. In order to keep the state consistent, each `Node` 
reference would need to be updated based on the new broker metadata. Instead of 
doing that, this patch changes the cache so that it is structured more closely 
with the Metadata response schema. Broker node information is maintained at the 
top level in a single c [...]
    
    Note that one of the side benefits of the refactor here is that we 
virtually eliminate one of the hotspots in Metadata request handling in 
`MetadataCache.getEndpoints` (which was renamed to `maybeFilterAliveReplicas`). 
The only reason this was expensive was because we had to build a new collection 
for the `Node` representations of each of the replica lists. This information 
was doomed to just get discarded on serialization, so the whole effort was 
wasteful. Now, we work with the lower [...]
    
    Reviewers: Rajini Sivaram <[email protected]>, Ismael Juma 
<[email protected]>
---
 .../java/org/apache/kafka/clients/Metadata.java    | 113 +++++++------
 .../org/apache/kafka/clients/MetadataCache.java    | 105 ++++--------
 .../org/apache/kafka/clients/NetworkClient.java    |   2 +-
 .../admin/internals/MetadataOperationContext.java  |   4 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |   4 +-
 .../kafka/clients/consumer/MockConsumer.java       |   5 +-
 .../kafka/clients/consumer/OffsetAndMetadata.java  |   4 +-
 .../consumer/internals/ConsumerCoordinator.java    |   6 +-
 .../kafka/clients/consumer/internals/Fetcher.java  |  55 +++---
 .../consumer/internals/SubscriptionState.java      |   8 +-
 .../main/java/org/apache/kafka/common/Cluster.java |  21 +--
 .../kafka/common/requests/MetadataResponse.java    | 184 ++++++++++-----------
 .../apache/kafka/common/requests/RequestUtils.java |   9 +-
 .../apache/kafka/clients/MetadataCacheTest.java    |  85 ++++++++++
 .../org/apache/kafka/clients/MetadataTest.java     | 115 ++++++++++---
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  51 +++---
 .../internals/ConsumerCoordinatorTest.java         |   5 +-
 .../consumer/internals/ConsumerMetadataTest.java   |   3 +-
 .../clients/consumer/internals/FetcherTest.java    |  37 +++--
 .../internals/OffsetForLeaderEpochClientTest.java  |   8 +-
 .../consumer/internals/SubscriptionStateTest.java  |  47 +++---
 .../kafka/common/requests/RequestResponseTest.java |  26 +--
 .../test/java/org/apache/kafka/test/TestUtils.java |  15 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   8 +-
 .../main/scala/kafka/server/MetadataCache.scala    |  89 ++++++----
 .../scala/unit/kafka/admin/AddPartitionsTest.scala |  23 ++-
 .../admin/TopicCommandWithAdminClientTest.scala    |   2 +-
 .../server/AbstractCreateTopicsRequestTest.scala   |   4 +-
 .../unit/kafka/server/MetadataCacheTest.scala      |  33 ++--
 .../unit/kafka/server/MetadataRequestTest.scala    |  43 +++--
 30 files changed, 635 insertions(+), 479 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 82c1b07..72f2d04 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -19,14 +19,12 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.LogContext;
@@ -43,10 +41,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
+import static 
org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
+
 /**
  * A class encapsulating some of the logic around metadata.
  * <p>
@@ -147,11 +146,16 @@ public class Metadata implements Closeable {
     }
 
     /**
-     * Request an update for the partition metadata iff the given leader epoch 
is at newer than the last seen leader epoch
+     * Request an update for the partition metadata iff the given leader epoch 
is newer than the last seen leader epoch
      */
     public synchronized boolean updateLastSeenEpochIfNewer(TopicPartition 
topicPartition, int leaderEpoch) {
         Objects.requireNonNull(topicPartition, "TopicPartition cannot be 
null");
-        return updateLastSeenEpoch(topicPartition, leaderEpoch, oldEpoch -> 
leaderEpoch > oldEpoch, true);
+        if (leaderEpoch < 0)
+            throw new IllegalArgumentException("Invalid leader epoch " + 
leaderEpoch + " (must be non-negative)");
+
+        boolean updated = updateLastSeenEpoch(topicPartition, leaderEpoch, 
oldEpoch -> leaderEpoch > oldEpoch);
+        this.needUpdate = this.needUpdate || updated;
+        return updated;
     }
 
 
@@ -165,21 +169,16 @@ public class Metadata implements Closeable {
      * @param topicPartition       topic+partition to update the epoch for
      * @param epoch                the new epoch
      * @param epochTest            a predicate to determine if the old epoch 
should be replaced
-     * @param setRequestUpdateFlag sets the "needUpdate" flag to true if the 
epoch is updated
      * @return true if the epoch was updated, false otherwise
      */
     private synchronized boolean updateLastSeenEpoch(TopicPartition 
topicPartition,
                                                      int epoch,
-                                                     Predicate<Integer> 
epochTest,
-                                                     boolean 
setRequestUpdateFlag) {
+                                                     Predicate<Integer> 
epochTest) {
         Integer oldEpoch = lastSeenLeaderEpochs.get(topicPartition);
         log.trace("Determining if we should replace existing epoch {} with new 
epoch {}", oldEpoch, epoch);
         if (oldEpoch == null || epochTest.test(oldEpoch)) {
             log.debug("Updating last seen epoch from {} to {} for partition 
{}", oldEpoch, epoch, topicPartition);
             lastSeenLeaderEpochs.put(topicPartition, epoch);
-            if (setRequestUpdateFlag) {
-                this.needUpdate = true;
-            }
             return true;
         } else {
             log.debug("Not replacing existing epoch {} with new epoch {} for 
partition {}", oldEpoch, epoch, topicPartition);
@@ -199,16 +198,29 @@ public class Metadata implements Closeable {
     /**
      * Return the cached partition info if it exists and a newer leader epoch 
isn't known about.
      */
-    public synchronized Optional<MetadataCache.PartitionInfoAndEpoch> 
partitionInfoIfCurrent(TopicPartition topicPartition) {
+    synchronized Optional<MetadataResponse.PartitionMetadata> 
partitionMetadataIfCurrent(TopicPartition topicPartition) {
         Integer epoch = lastSeenLeaderEpochs.get(topicPartition);
+        Optional<MetadataResponse.PartitionMetadata> partitionMetadata = 
cache.partitionMetadata(topicPartition);
         if (epoch == null) {
             // old cluster format (no epochs)
-            return cache.getPartitionInfo(topicPartition);
+            return partitionMetadata;
         } else {
-            return cache.getPartitionInfoHavingEpoch(topicPartition, epoch);
+            return partitionMetadata.filter(metadata ->
+                    
metadata.leaderEpoch.orElse(NO_PARTITION_LEADER_EPOCH).equals(epoch));
         }
     }
 
+    public synchronized LeaderAndEpoch currentLeader(TopicPartition 
topicPartition) {
+        Optional<MetadataResponse.PartitionMetadata> maybeMetadata = 
partitionMetadataIfCurrent(topicPartition);
+        if (!maybeMetadata.isPresent())
+            return new LeaderAndEpoch(Optional.empty(), 
Optional.ofNullable(lastSeenLeaderEpochs.get(topicPartition)));
+
+        MetadataResponse.PartitionMetadata partitionMetadata = 
maybeMetadata.get();
+        Optional<Integer> leaderEpochOpt = partitionMetadata.leaderEpoch;
+        Optional<Node> leaderNodeOpt = 
partitionMetadata.leaderId.flatMap(cache::nodeById);
+        return new LeaderAndEpoch(leaderNodeOpt, leaderEpochOpt);
+    }
+
     public synchronized void bootstrap(List<InetSocketAddress> addresses) {
         this.needUpdate = true;
         this.updateVersion += 1;
@@ -245,7 +257,7 @@ public class Metadata implements Closeable {
         this.lastSuccessfulRefreshMs = now;
         this.updateVersion += 1;
 
-        String previousClusterId = 
cache.cluster().clusterResource().clusterId();
+        String previousClusterId = cache.clusterResource().clusterId();
 
         this.cache = handleMetadataResponse(response, topic -> 
retainTopic(topic.topic(), topic.isInternal(), now));
 
@@ -254,11 +266,11 @@ public class Metadata implements Closeable {
 
         this.lastSeenLeaderEpochs.keySet().removeIf(tp -> 
!retainTopic(tp.topic(), false, now));
 
-        String newClusterId = cache.cluster().clusterResource().clusterId();
+        String newClusterId = cache.clusterResource().clusterId();
         if (!Objects.equals(previousClusterId, newClusterId)) {
             log.info("Cluster ID: {}", newClusterId);
         }
-        clusterResourceListeners.onUpdate(cache.cluster().clusterResource());
+        clusterResourceListeners.onUpdate(cache.clusterResource());
 
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
@@ -289,7 +301,7 @@ public class Metadata implements Closeable {
     private MetadataCache handleMetadataResponse(MetadataResponse 
metadataResponse,
                                                  
Predicate<MetadataResponse.TopicMetadata> topicsToRetain) {
         Set<String> internalTopics = new HashSet<>();
-        List<MetadataCache.PartitionInfoAndEpoch> partitions = new 
ArrayList<>();
+        List<MetadataResponse.PartitionMetadata> partitions = new 
ArrayList<>();
         for (MetadataResponse.TopicMetadata metadata : 
metadataResponse.topicMetadata()) {
             if (!topicsToRetain.test(metadata))
                 continue;
@@ -300,12 +312,12 @@ public class Metadata implements Closeable {
                 for (MetadataResponse.PartitionMetadata partitionMetadata : 
metadata.partitionMetadata()) {
                     // Even if the partition's metadata includes an error, we 
need to handle
                     // the update to catch new epochs
-                    updatePartitionInfo(metadata.topic(), partitionMetadata,
-                            metadataResponse.hasReliableLeaderEpochs(), 
partitions::add);
+                    updateLatestMetadata(partitionMetadata, 
metadataResponse.hasReliableLeaderEpochs())
+                        .ifPresent(partitions::add);
 
-                    if (partitionMetadata.error().exception() instanceof 
InvalidMetadataException) {
+                    if (partitionMetadata.error.exception() instanceof 
InvalidMetadataException) {
                         log.debug("Requesting metadata update for partition {} 
due to error {}",
-                                new TopicPartition(metadata.topic(), 
partitionMetadata.partition()), partitionMetadata.error());
+                                partitionMetadata.topicPartition, 
partitionMetadata.error);
                         requestUpdate();
                     }
                 }
@@ -315,37 +327,36 @@ public class Metadata implements Closeable {
             }
         }
 
-        return new MetadataCache(metadataResponse.clusterId(), new 
ArrayList<>(metadataResponse.brokers()), partitions,
+        return new MetadataCache(metadataResponse.clusterId(),
+                metadataResponse.brokersById(),
+                partitions,
                 
metadataResponse.topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
                 metadataResponse.topicsByError(Errors.INVALID_TOPIC_EXCEPTION),
-                internalTopics, metadataResponse.controller());
+                internalTopics,
+                metadataResponse.controller());
     }
 
     /**
-     * Compute the correct PartitionInfo to cache for a topic+partition and 
pass to the given consumer.
+     * Compute the latest partition metadata to cache given ordering by leader 
epochs (if both
+     * available and reliable).
      */
-    private void updatePartitionInfo(String topic,
-                                     MetadataResponse.PartitionMetadata 
partitionMetadata,
-                                     boolean hasReliableLeaderEpoch,
-                                     
Consumer<MetadataCache.PartitionInfoAndEpoch> partitionInfoConsumer) {
-        TopicPartition tp = new TopicPartition(topic, 
partitionMetadata.partition());
-
-        if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch().isPresent()) {
-            int newEpoch = partitionMetadata.leaderEpoch().get();
+    private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(
+            MetadataResponse.PartitionMetadata partitionMetadata,
+            boolean hasReliableLeaderEpoch) {
+        TopicPartition tp = partitionMetadata.topicPartition;
+        if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch.isPresent()) {
+            int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the 
previous one, update the metadata
-            if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= 
oldEpoch, false)) {
-                PartitionInfo info = 
MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
-                partitionInfoConsumer.accept(new 
MetadataCache.PartitionInfoAndEpoch(info, newEpoch));
+            if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= 
oldEpoch)) {
+                return Optional.of(partitionMetadata);
             } else {
                 // Otherwise ignore the new metadata and use the previously 
cached info
-                cache.getPartitionInfo(tp).ifPresent(partitionInfoConsumer);
+                return cache.partitionMetadata(tp);
             }
         } else {
             // Handle old cluster formats as well as error responses where 
leader and epoch are missing
             lastSeenLeaderEpochs.remove(tp);
-            PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, 
partitionMetadata);
-            partitionInfoConsumer.accept(new 
MetadataCache.PartitionInfoAndEpoch(info,
-                    RecordBatch.NO_PARTITION_LEADER_EPOCH));
+            return Optional.of(partitionMetadata.withoutLeaderEpoch());
         }
     }
 
@@ -491,23 +502,19 @@ public class Metadata implements Closeable {
         }
     }
 
-    public synchronized LeaderAndEpoch leaderAndEpoch(TopicPartition tp) {
-        return partitionInfoIfCurrent(tp)
-                .map(infoAndEpoch -> {
-                    Node leader = infoAndEpoch.partitionInfo().leader();
-                    return new LeaderAndEpoch(leader == null ? Node.noNode() : 
leader, Optional.of(infoAndEpoch.epoch()));
-                })
-                .orElse(new LeaderAndEpoch(Node.noNode(), 
lastSeenLeaderEpoch(tp)));
-    }
-
+    /**
+     * Represents current leader state known in metadata. It is possible that 
we know the leader, but not the
+     * epoch if the metadata is received from a broker which does not support 
a sufficient Metadata API version.
+     * It is also possible that we know of the leader epoch, but not the 
leader when it is derived
+     * from an external source (e.g. a committed offset).
+     */
     public static class LeaderAndEpoch {
+        private static final LeaderAndEpoch NO_LEADER_OR_EPOCH = new 
LeaderAndEpoch(Optional.empty(), Optional.empty());
 
-        public static final LeaderAndEpoch NO_LEADER_OR_EPOCH = new 
LeaderAndEpoch(Node.noNode(), Optional.empty());
-
-        public final Node leader;
+        public final Optional<Node> leader;
         public final Optional<Integer> epoch;
 
-        public LeaderAndEpoch(Node leader, Optional<Integer> epoch) {
+        public LeaderAndEpoch(Optional<Node> leader, Optional<Integer> epoch) {
             this.leader = Objects.requireNonNull(leader);
             this.epoch = Objects.requireNonNull(epoch);
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java 
b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index e58da12..3230087 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -17,18 +17,18 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.MetadataResponse;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -39,18 +39,18 @@ import java.util.stream.Collectors;
  */
 public class MetadataCache {
     private final String clusterId;
-    private final List<Node> nodes;
+    private final Map<Integer, Node> nodes;
     private final Set<String> unauthorizedTopics;
     private final Set<String> invalidTopics;
     private final Set<String> internalTopics;
     private final Node controller;
-    private final Map<TopicPartition, PartitionInfoAndEpoch> 
metadataByPartition;
+    private final Map<TopicPartition, MetadataResponse.PartitionMetadata> 
metadataByPartition;
 
     private Cluster clusterInstance;
 
     MetadataCache(String clusterId,
-                  List<Node> nodes,
-                  Collection<PartitionInfoAndEpoch> partitions,
+                  Map<Integer, Node> nodes,
+                  Collection<MetadataResponse.PartitionMetadata> partitions,
                   Set<String> unauthorizedTopics,
                   Set<String> invalidTopics,
                   Set<String> internalTopics,
@@ -58,14 +58,14 @@ public class MetadataCache {
         this(clusterId, nodes, partitions, unauthorizedTopics, invalidTopics, 
internalTopics, controller, null);
     }
 
-    MetadataCache(String clusterId,
-                  List<Node> nodes,
-                  Collection<PartitionInfoAndEpoch> partitions,
-                  Set<String> unauthorizedTopics,
-                  Set<String> invalidTopics,
-                  Set<String> internalTopics,
-                  Node controller,
-                  Cluster clusterInstance) {
+    private MetadataCache(String clusterId,
+                          Map<Integer, Node> nodes,
+                          Collection<MetadataResponse.PartitionMetadata> 
partitions,
+                          Set<String> unauthorizedTopics,
+                          Set<String> invalidTopics,
+                          Set<String> internalTopics,
+                          Node controller,
+                          Cluster clusterInstance) {
         this.clusterId = clusterId;
         this.nodes = nodes;
         this.unauthorizedTopics = unauthorizedTopics;
@@ -74,8 +74,8 @@ public class MetadataCache {
         this.controller = controller;
 
         this.metadataByPartition = new HashMap<>(partitions.size());
-        for (PartitionInfoAndEpoch p : partitions) {
-            this.metadataByPartition.put(new 
TopicPartition(p.partitionInfo().topic(), p.partitionInfo().partition()), p);
+        for (MetadataResponse.PartitionMetadata p : partitions) {
+            this.metadataByPartition.put(p.topicPartition, p);
         }
 
         if (clusterInstance == null) {
@@ -85,16 +85,12 @@ public class MetadataCache {
         }
     }
 
-    /**
-     * Return the cached PartitionInfo iff it was for the given epoch
-     */
-    Optional<PartitionInfoAndEpoch> getPartitionInfoHavingEpoch(TopicPartition 
topicPartition, int epoch) {
-        PartitionInfoAndEpoch infoAndEpoch = 
metadataByPartition.get(topicPartition);
-        return Optional.ofNullable(infoAndEpoch).filter(infoEpoch -> 
infoEpoch.epoch() == epoch);
+    Optional<MetadataResponse.PartitionMetadata> 
partitionMetadata(TopicPartition topicPartition) {
+        return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
-    Optional<PartitionInfoAndEpoch> getPartitionInfo(TopicPartition 
topicPartition) {
-        return Optional.ofNullable(metadataByPartition.get(topicPartition));
+    Optional<Node> nodeById(int id) {
+        return Optional.ofNullable(nodes.get(id));
     }
 
     Cluster cluster() {
@@ -105,25 +101,33 @@ public class MetadataCache {
         }
     }
 
+    ClusterResource clusterResource() {
+        return new ClusterResource(clusterId);
+    }
+
     private void computeClusterView() {
         List<PartitionInfo> partitionInfos = metadataByPartition.values()
                 .stream()
-                .map(PartitionInfoAndEpoch::partitionInfo)
+                .map(metadata -> MetadataResponse.toPartitionInfo(metadata, 
nodes))
                 .collect(Collectors.toList());
-        this.clusterInstance = new Cluster(clusterId, nodes, partitionInfos, 
unauthorizedTopics, invalidTopics, internalTopics, controller);
+        this.clusterInstance = new Cluster(clusterId, nodes.values(), 
partitionInfos, unauthorizedTopics,
+                invalidTopics, internalTopics, controller);
     }
 
     static MetadataCache bootstrap(List<InetSocketAddress> addresses) {
-        List<Node> nodes = new ArrayList<>();
+        Map<Integer, Node> nodes = new HashMap<>();
         int nodeId = -1;
-        for (InetSocketAddress address : addresses)
-            nodes.add(new Node(nodeId--, address.getHostString(), 
address.getPort()));
+        for (InetSocketAddress address : addresses) {
+            nodes.put(nodeId, new Node(nodeId, address.getHostString(), 
address.getPort()));
+            nodeId--;
+        }
         return new MetadataCache(null, nodes, Collections.emptyList(),
-                Collections.emptySet(), Collections.emptySet(), 
Collections.emptySet(), null, Cluster.bootstrap(addresses));
+                Collections.emptySet(), Collections.emptySet(), 
Collections.emptySet(),
+                null, Cluster.bootstrap(addresses));
     }
 
     static MetadataCache empty() {
-        return new MetadataCache(null, Collections.emptyList(), 
Collections.emptyList(),
+        return new MetadataCache(null, Collections.emptyMap(), 
Collections.emptyList(),
                 Collections.emptySet(), Collections.emptySet(), 
Collections.emptySet(), null, Cluster.empty());
     }
 
@@ -137,43 +141,4 @@ public class MetadataCache {
                 '}';
     }
 
-    public static class PartitionInfoAndEpoch {
-        private final PartitionInfo partitionInfo;
-        private final int epoch;
-
-        PartitionInfoAndEpoch(PartitionInfo partitionInfo, int epoch) {
-            this.partitionInfo = partitionInfo;
-            this.epoch = epoch;
-        }
-
-        public PartitionInfo partitionInfo() {
-            return partitionInfo;
-        }
-
-        public int epoch() {
-            return epoch;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            PartitionInfoAndEpoch that = (PartitionInfoAndEpoch) o;
-            return epoch == that.epoch &&
-                    Objects.equals(partitionInfo, that.partitionInfo);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(partitionInfo, epoch);
-        }
-
-        @Override
-        public String toString() {
-            return "PartitionInfoAndEpoch{" +
-                    "partitionInfo=" + partitionInfo +
-                    ", epoch=" + epoch +
-                    '}';
-        }
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 61e8cd1..3060a50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1055,7 +1055,7 @@ public class NetworkClient implements KafkaClient {
             // This could be a transient issue if listeners were added 
dynamically to brokers.
             List<TopicPartition> missingListenerPartitions = 
response.topicMetadata().stream().flatMap(topicMetadata ->
                 topicMetadata.partitionMetadata().stream()
-                    .filter(partitionMetadata -> partitionMetadata.error() == 
Errors.LISTENER_NOT_FOUND)
+                    .filter(partitionMetadata -> partitionMetadata.error == 
Errors.LISTENER_NOT_FOUND)
                     .map(partitionMetadata -> new 
TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
                 .collect(Collectors.toList());
             if (!missingListenerPartitions.isEmpty()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
index e6f4054..c05e5cf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -83,8 +83,8 @@ public final class MetadataOperationContext<T, O extends 
AbstractOptions<O>> {
     public static void handleMetadataErrors(MetadataResponse response) {
         for (TopicMetadata tm : response.topicMetadata()) {
             for (PartitionMetadata pm : tm.partitionMetadata()) {
-                if (shouldRefreshMetadata(pm.error())) {
-                    throw pm.error().exception();
+                if (shouldRefreshMetadata(pm.error)) {
+                    throw pm.error.exception();
                 }
             }
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2052af9..20da83f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1604,7 +1604,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
             SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
                     offset,
                     Optional.empty(), // This will ensure we skip validation
-                    this.metadata.leaderAndEpoch(partition));
+                    this.metadata.currentLeader(partition));
             this.subscriptions.seekUnvalidated(partition, newPosition);
         } finally {
             release();
@@ -1635,7 +1635,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
             } else {
                 log.info("Seeking to offset {} for partition {}", offset, 
partition);
             }
-            Metadata.LeaderAndEpoch currentLeaderAndEpoch = 
this.metadata.leaderAndEpoch(partition);
+            Metadata.LeaderAndEpoch currentLeaderAndEpoch = 
this.metadata.currentLeader(partition);
             SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
                     offsetAndMetadata.offset(),
                     offsetAndMetadata.leaderEpoch(),
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index bd8b93c..67b4e9f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -22,7 +22,6 @@ import 
org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
@@ -37,6 +36,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -202,8 +202,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
                     if (assignment().contains(entry.getKey()) && rec.offset() 
>= position) {
                         results.computeIfAbsent(entry.getKey(), partition -> 
new ArrayList<>()).add(rec);
+                        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.empty(), rec.leaderEpoch());
                         SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
-                                rec.offset() + 1, rec.leaderEpoch(), new 
Metadata.LeaderAndEpoch(Node.noNode(), rec.leaderEpoch()));
+                                rec.offset() + 1, rec.leaderEpoch(), 
leaderAndEpoch);
                         subscriptions.position(entry.getKey(), newPosition);
                     }
                 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index aa91e50..d6b3b94 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -94,7 +94,9 @@ public class OffsetAndMetadata implements Serializable {
      * @return the leader epoch or empty if not known
      */
     public Optional<Integer> leaderEpoch() {
-        return Optional.ofNullable(leaderEpoch);
+        if (leaderEpoch == null || leaderEpoch < 0)
+            return Optional.empty();
+        return Optional.of(leaderEpoch);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 6610bfb02..94209d2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -766,10 +766,10 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                 // it's possible that the partition is no longer assigned when 
the response is received,
                 // so we need to ignore seeking if that's the case
                 if (this.subscriptions.isAssigned(tp)) {
-                    final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.leaderAndEpoch(tp);
+                    final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
                     final SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(
-                        offsetAndMetadata.offset(), 
offsetAndMetadata.leaderEpoch(),
-                        leaderAndEpoch);
+                            offsetAndMetadata.offset(), 
offsetAndMetadata.leaderEpoch(),
+                            leaderAndEpoch);
 
                     this.subscriptions.seekUnvalidated(tp, position);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 32d3976..7890c9a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.FetchSessionHandler;
-import org.apache.kafka.clients.MetadataCache;
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.ApiVersion;
 import org.apache.kafka.clients.StaleMetadataException;
@@ -485,7 +485,7 @@ public class Fetcher<K, V> implements Closeable {
 
         // Validate each partition against the current leader and epoch
         subscriptions.assignedPartitions().forEach(topicPartition -> {
-            ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.leaderAndEpoch(topicPartition);
+            ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(topicPartition);
             
subscriptions.maybeValidatePositionForCurrentLeader(topicPartition, 
leaderAndEpoch);
         });
 
@@ -716,7 +716,7 @@ public class Fetcher<K, V> implements Closeable {
 
     private void resetOffsetIfNeeded(TopicPartition partition, 
OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) {
         SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(
-                offsetData.offset, offsetData.leaderEpoch, 
metadata.leaderAndEpoch(partition));
+                offsetData.offset, offsetData.leaderEpoch, 
metadata.currentLeader(partition));
         offsetData.leaderEpoch.ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(partition, epoch));
         subscriptions.maybeSeekUnvalidated(partition, position.offset, 
requestedResetStrategy);
     }
@@ -904,27 +904,26 @@ public class Fetcher<K, V> implements Closeable {
         for (Map.Entry<TopicPartition, Long> entry: 
timestampsToSearch.entrySet()) {
             TopicPartition tp  = entry.getKey();
             Long offset = entry.getValue();
-            Optional<MetadataCache.PartitionInfoAndEpoch> currentInfo = 
metadata.partitionInfoIfCurrent(tp);
-            if (!currentInfo.isPresent()) {
+            Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
+
+            if (!leaderAndEpoch.leader.isPresent()) {
                 log.debug("Leader for partition {} is unknown for fetching 
offset {}", tp, offset);
                 metadata.requestUpdate();
                 partitionsToRetry.add(tp);
-            } else if (currentInfo.get().partitionInfo().leader() == null) {
-                log.debug("Leader for partition {} is unavailable for fetching 
offset {}", tp, offset);
-                metadata.requestUpdate();
-                partitionsToRetry.add(tp);
-            } else if 
(client.isUnavailable(currentInfo.get().partitionInfo().leader())) {
-                
client.maybeThrowAuthFailure(currentInfo.get().partitionInfo().leader());
-
-                // The connection has failed and we need to await the blackout 
period before we can
-                // try again. No need to request a metadata update since the 
disconnect will have
-                // done so already.
-                log.debug("Leader {} for partition {} is unavailable for 
fetching offset until reconnect backoff expires",
-                        currentInfo.get().partitionInfo().leader(), tp);
-                partitionsToRetry.add(tp);
             } else {
-                partitionDataMap.put(tp,
-                        new ListOffsetRequest.PartitionData(offset, 
Optional.of(currentInfo.get().epoch())));
+                Node leader = leaderAndEpoch.leader.get();
+                if (client.isUnavailable(leader)) {
+                    client.maybeThrowAuthFailure(leader);
+
+                    // The connection has failed and we need to await the 
blackout period before we can
+                    // try again. No need to request a metadata update since 
the disconnect will have
+                    // done so already.
+                    log.debug("Leader {} for partition {} is unavailable for 
fetching offset until reconnect backoff expires",
+                            leader, tp);
+                    partitionsToRetry.add(tp);
+                } else {
+                    partitionDataMap.put(tp, new 
ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch));
+                }
             }
         }
         return regroupPartitionMapByNode(partitionDataMap);
@@ -1100,18 +1099,21 @@ public class Fetcher<K, V> implements Closeable {
 
         // Ensure the position has an up-to-date leader
         subscriptions.assignedPartitions().forEach(
-            tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, 
metadata.leaderAndEpoch(tp)));
+            tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, 
metadata.currentLeader(tp)));
 
         long currentTimeMs = time.milliseconds();
 
         for (TopicPartition partition : fetchablePartitions()) {
             // Use the preferred read replica if set, or the position's leader
             SubscriptionState.FetchPosition position = 
this.subscriptions.position(partition);
-            Node node = selectReadReplica(partition, 
position.currentLeader.leader, currentTimeMs);
-
-            if (node == null || node.isEmpty()) {
+            Optional<Node> leaderOpt = position.currentLeader.leader;
+            if (!leaderOpt.isPresent()) {
                 metadata.requestUpdate();
-            } else if (client.isUnavailable(node)) {
+                continue;
+            }
+
+            Node node = selectReadReplica(partition, leaderOpt.get(), 
currentTimeMs);
+            if (client.isUnavailable(node)) {
                 client.maybeThrowAuthFailure(node);
 
                 // If we try to send during the reconnect blackout window, 
then the request is just
@@ -1153,7 +1155,8 @@ public class Fetcher<K, V> implements Closeable {
             Map<TopicPartition, SubscriptionState.FetchPosition> partitionMap) 
{
         return partitionMap.entrySet()
                 .stream()
-                .collect(Collectors.groupingBy(entry -> 
entry.getValue().currentLeader.leader,
+                .filter(entry -> 
entry.getValue().currentLeader.leader.isPresent())
+                .collect(Collectors.groupingBy(entry -> 
entry.getValue().currentLeader.leader.get(),
                         Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index ec7c376..9200eb8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -16,20 +16,19 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import java.util.ArrayList;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.IsolationLevel;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.internals.PartitionStates;
 import org.apache.kafka.common.requests.EpochEndOffset;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -750,8 +749,7 @@ public class SubscriptionState {
                 return false;
             }
 
-            if 
(currentLeaderAndEpoch.equals(Metadata.LeaderAndEpoch.noLeaderOrEpoch())) {
-                // Ignore empty LeaderAndEpochs
+            if (!currentLeaderAndEpoch.leader.isPresent() && 
!currentLeaderAndEpoch.epoch.isPresent()) {
                 return false;
             }
 
@@ -985,7 +983,7 @@ public class SubscriptionState {
         final Metadata.LeaderAndEpoch currentLeader;
 
         FetchPosition(long offset) {
-            this(offset, Optional.empty(), new 
Metadata.LeaderAndEpoch(Node.noNode(), Optional.empty()));
+            this(offset, Optional.empty(), 
Metadata.LeaderAndEpoch.noLeaderOrEpoch());
         }
 
         public FetchPosition(long offset, Optional<Integer> offsetEpoch, 
Metadata.LeaderAndEpoch currentLeader) {
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java 
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index c765cdc..a6bf763 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -122,18 +122,15 @@ public final class Cluster {
         Map<String, List<PartitionInfo>> tmpPartitionsByTopic = new 
HashMap<>();
         for (PartitionInfo p : partitions) {
             tmpPartitionsByTopicPartition.put(new TopicPartition(p.topic(), 
p.partition()), p);
-            List<PartitionInfo> partitionsForTopic = 
tmpPartitionsByTopic.get(p.topic());
-            if (partitionsForTopic == null) {
-                partitionsForTopic = new ArrayList<>();
-                tmpPartitionsByTopic.put(p.topic(), partitionsForTopic);
-            }
-            partitionsForTopic.add(p);
-            if (p.leader() != null) {
-                // The broker guarantees that if a partition has a non-null 
leader, it is one of the brokers returned
-                // in the metadata response
-                List<PartitionInfo> partitionsForNode = 
Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
-                partitionsForNode.add(p);
-            }
+            tmpPartitionsByTopic.computeIfAbsent(p.topic(), topic -> new 
ArrayList<>()).add(p);
+
+            // The leader may not be known
+            if (p.leader() == null || p.leader().isEmpty())
+                continue;
+
+            // If it is known, its node information should be available
+            List<PartitionInfo> partitionsForNode = 
Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
+            partitionsForNode.add(p);
         }
 
         // Update the values of `tmpPartitionsByNode` to contain unmodifiable 
lists
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index e4e09a5..4fee563 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -19,10 +19,11 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.MetadataResponseData;
-import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
-import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -40,6 +41,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -55,7 +57,7 @@ import java.util.stream.Collectors;
  */
 public class MetadataResponse extends AbstractResponse {
     public static final int NO_CONTROLLER_ID = -1;
-
+    public static final int NO_LEADER_ID = -1;
     public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;
 
     private final MetadataResponseData data;
@@ -131,12 +133,13 @@ public class MetadataResponse extends AbstractResponse {
     public Cluster cluster() {
         Set<String> internalTopics = new HashSet<>();
         List<PartitionInfo> partitions = new ArrayList<>();
+
         for (TopicMetadata metadata : topicMetadata()) {
             if (metadata.error == Errors.NONE) {
                 if (metadata.isInternal)
                     internalTopics.add(metadata.topic);
                 for (PartitionMetadata partitionMetadata : 
metadata.partitionMetadata) {
-                    partitions.add(partitionMetaToInfo(metadata.topic, 
partitionMetadata));
+                    partitions.add(toPartitionInfo(partitionMetadata, 
holder().brokers));
                 }
             }
         }
@@ -144,6 +147,24 @@ public class MetadataResponse extends AbstractResponse {
                 topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, 
controller());
     }
 
+    public static PartitionInfo toPartitionInfo(PartitionMetadata metadata, 
Map<Integer, Node> nodesById) {
+        return new PartitionInfo(metadata.topic(),
+                metadata.partition(),
+                metadata.leaderId.map(nodesById::get).orElse(null),
+                convertToNodeArray(metadata.replicaIds, nodesById),
+                convertToNodeArray(metadata.inSyncReplicaIds, nodesById),
+                convertToNodeArray(metadata.offlineReplicaIds, nodesById));
+    }
+
+    private static Node[] convertToNodeArray(List<Integer> replicaIds, 
Map<Integer, Node> nodesById) {
+        return replicaIds.stream().map(replicaId -> {
+            Node node = nodesById.get(replicaId);
+            if (node == null)
+                return new Node(replicaId, "", -1);
+            return node;
+        }).toArray(Node[]::new);
+    }
+
     /**
      * Returns a 32-bit bitfield to represent authorized operations for this 
topic.
      */
@@ -162,19 +183,6 @@ public class MetadataResponse extends AbstractResponse {
         return data.clusterAuthorizedOperations();
     }
 
-    /**
-     * Transform a topic and PartitionMetadata into PartitionInfo.
-     */
-    public static PartitionInfo partitionMetaToInfo(String topic, 
PartitionMetadata partitionMetadata) {
-        return new PartitionInfo(
-                topic,
-                partitionMetadata.partition(),
-                partitionMetadata.leader(),
-                partitionMetadata.replicas().toArray(new Node[0]),
-                partitionMetadata.isr().toArray(new Node[0]),
-                partitionMetadata.offlineReplicas().toArray(new Node[0]));
-    }
-
     private Holder holder() {
         if (holder == null) {
             synchronized (data) {
@@ -190,6 +198,10 @@ public class MetadataResponse extends AbstractResponse {
      * @return the brokers
      */
     public Collection<Node> brokers() {
+        return holder().brokers.values();
+    }
+
+    public Map<Integer, Node> brokersById() {
         return holder().brokers;
     }
 
@@ -314,93 +326,78 @@ public class MetadataResponse extends AbstractResponse {
 
     // This is used to describe per-partition state in the MetadataResponse
     public static class PartitionMetadata {
-        private final Errors error;
-        private final int partition;
-        private final Node leader;
-        private final Optional<Integer> leaderEpoch;
-        private final List<Node> replicas;
-        private final List<Node> isr;
-        private final List<Node> offlineReplicas;
+        public final TopicPartition topicPartition;
+        public final Errors error;
+        public final Optional<Integer> leaderId;
+        public final Optional<Integer> leaderEpoch;
+        public final List<Integer> replicaIds;
+        public final List<Integer> inSyncReplicaIds;
+        public final List<Integer> offlineReplicaIds;
 
         public PartitionMetadata(Errors error,
-                                 int partition,
-                                 Node leader,
+                                 TopicPartition topicPartition,
+                                 Optional<Integer> leaderId,
                                  Optional<Integer> leaderEpoch,
-                                 List<Node> replicas,
-                                 List<Node> isr,
-                                 List<Node> offlineReplicas) {
+                                 List<Integer> replicaIds,
+                                 List<Integer> inSyncReplicaIds,
+                                 List<Integer> offlineReplicaIds) {
             this.error = error;
-            this.partition = partition;
-            this.leader = leader;
+            this.topicPartition = topicPartition;
+            this.leaderId = leaderId;
             this.leaderEpoch = leaderEpoch;
-            this.replicas = replicas;
-            this.isr = isr;
-            this.offlineReplicas = offlineReplicas;
-        }
-
-        public Errors error() {
-            return error;
+            this.replicaIds = replicaIds;
+            this.inSyncReplicaIds = inSyncReplicaIds;
+            this.offlineReplicaIds = offlineReplicaIds;
         }
 
         public int partition() {
-            return partition;
-        }
-
-        public int leaderId() {
-            return leader == null ? -1 : leader.id();
-        }
-
-        public Optional<Integer> leaderEpoch() {
-            return leaderEpoch;
-        }
-
-        public Node leader() {
-            return leader;
+            return topicPartition.partition();
         }
 
-        public List<Node> replicas() {
-            return replicas;
-        }
-
-        public List<Node> isr() {
-            return isr;
+        public String topic() {
+            return topicPartition.topic();
         }
 
-        public List<Node> offlineReplicas() {
-            return offlineReplicas;
+        public PartitionMetadata withoutLeaderEpoch() {
+            return new PartitionMetadata(error,
+                    topicPartition,
+                    leaderId,
+                    Optional.empty(),
+                    replicaIds,
+                    inSyncReplicaIds,
+                    offlineReplicaIds);
         }
 
         @Override
         public String toString() {
-            return "(type=PartitionMetadata" +
+            return "PartitionMetadata(" +
                     ", error=" + error +
-                    ", partition=" + partition +
-                    ", leader=" + leader +
+                    ", partition=" + topicPartition +
+                    ", leader=" + leaderId +
                     ", leaderEpoch=" + leaderEpoch +
-                    ", replicas=" + Utils.join(replicas, ",") +
-                    ", isr=" + Utils.join(isr, ",") +
-                    ", offlineReplicas=" + Utils.join(offlineReplicas, ",") + 
')';
+                    ", replicas=" + Utils.join(replicaIds, ",") +
+                    ", isr=" + Utils.join(inSyncReplicaIds, ",") +
+                    ", offlineReplicas=" + Utils.join(offlineReplicaIds, ",") 
+ ')';
         }
     }
 
     private static class Holder {
-        private final Collection<Node> brokers;
+        private final Map<Integer, Node> brokers;
         private final Node controller;
         private final Collection<TopicMetadata> topicMetadata;
 
         Holder(MetadataResponseData data) {
-            this.brokers = 
Collections.unmodifiableCollection(createBrokers(data));
-            Map<Integer, Node> brokerMap = 
brokers.stream().collect(Collectors.toMap(Node::id, b -> b));
-            this.topicMetadata = createTopicMetadata(data, brokerMap);
-            this.controller = brokerMap.get(data.controllerId());
+            this.brokers = Collections.unmodifiableMap(createBrokers(data));
+            this.topicMetadata = createTopicMetadata(data);
+            this.controller = brokers.get(data.controllerId());
         }
 
-        private Collection<Node> createBrokers(MetadataResponseData data) {
-            return data.brokers().valuesList().stream().map(b ->
-                    new Node(b.nodeId(), b.host(), b.port(), 
b.rack())).collect(Collectors.toList());
+        private Map<Integer, Node> createBrokers(MetadataResponseData data) {
+            return data.brokers().valuesList().stream().map(b -> new 
Node(b.nodeId(), b.host(), b.port(), b.rack()))
+                    .collect(Collectors.toMap(Node::id, Function.identity()));
         }
 
-        private Collection<TopicMetadata> 
createTopicMetadata(MetadataResponseData data, Map<Integer, Node> brokerMap) {
+        private Collection<TopicMetadata> 
createTopicMetadata(MetadataResponseData data) {
             List<TopicMetadata> topicMetadataList = new ArrayList<>();
             for (MetadataResponseTopic topicMetadata : data.topics()) {
                 Errors topicError = Errors.forCode(topicMetadata.errorCode());
@@ -411,14 +408,15 @@ public class MetadataResponse extends AbstractResponse {
                 for (MetadataResponsePartition partitionMetadata : 
topicMetadata.partitions()) {
                     Errors partitionError = 
Errors.forCode(partitionMetadata.errorCode());
                     int partitionIndex = partitionMetadata.partitionIndex();
-                    int leader = partitionMetadata.leaderId();
+
+                    int leaderId = partitionMetadata.leaderId();
+                    Optional<Integer> leaderIdOpt = leaderId < 0 ? 
Optional.empty() : Optional.of(leaderId);
+
                     Optional<Integer> leaderEpoch = 
RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
-                    Node leaderNode = leader == -1 ? null : 
brokerMap.get(leader);
-                    List<Node> replicaNodes = convertToNodes(brokerMap, 
partitionMetadata.replicaNodes());
-                    List<Node> isrNodes = convertToNodes(brokerMap, 
partitionMetadata.isrNodes());
-                    List<Node> offlineNodes = convertToNodes(brokerMap, 
partitionMetadata.offlineReplicas());
-                    partitionMetadataList.add(new 
PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
-                            replicaNodes, isrNodes, offlineNodes));
+                    TopicPartition topicPartition = new TopicPartition(topic, 
partitionIndex);
+                    partitionMetadataList.add(new 
PartitionMetadata(partitionError, topicPartition, leaderIdOpt,
+                            leaderEpoch, partitionMetadata.replicaNodes(), 
partitionMetadata.isrNodes(),
+                            partitionMetadata.offlineReplicas()));
                 }
 
                 topicMetadataList.add(new TopicMetadata(topicError, topic, 
isInternal, partitionMetadataList,
@@ -427,18 +425,6 @@ public class MetadataResponse extends AbstractResponse {
             return topicMetadataList;
         }
 
-        private List<Node> convertToNodes(Map<Integer, Node> brokers, 
List<Integer> brokerIds) {
-            List<Node> nodes = new ArrayList<>(brokerIds.size());
-            for (Integer brokerId : brokerIds) {
-                Node node = brokers.get(brokerId);
-                if (node == null)
-                    nodes.add(new Node(brokerId, "", -1));
-                else
-                    nodes.add(node);
-            }
-            return nodes;
-        }
-
     }
 
     public static MetadataResponse prepareResponse(int throttleTimeMs, 
Collection<Node> brokers, String clusterId,
@@ -469,12 +455,12 @@ public class MetadataResponse extends AbstractResponse {
             for (PartitionMetadata partitionMetadata : 
topicMetadata.partitionMetadata) {
                 metadataResponseTopic.partitions().add(new 
MetadataResponsePartition()
                     .setErrorCode(partitionMetadata.error.code())
-                    .setPartitionIndex(partitionMetadata.partition)
-                    .setLeaderId(partitionMetadata.leader == null ? -1 : 
partitionMetadata.leader.id())
-                    
.setLeaderEpoch(partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
-                    
.setReplicaNodes(partitionMetadata.replicas.stream().map(Node::id).collect(Collectors.toList()))
-                    
.setIsrNodes(partitionMetadata.isr.stream().map(Node::id).collect(Collectors.toList()))
-                    
.setOfflineReplicas(partitionMetadata.offlineReplicas.stream().map(Node::id).collect(Collectors.toList())));
+                    .setPartitionIndex(partitionMetadata.partition())
+                    
.setLeaderId(partitionMetadata.leaderId.orElse(NO_LEADER_ID))
+                    
.setLeaderEpoch(partitionMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+                    .setReplicaNodes(partitionMetadata.replicaIds)
+                    .setIsrNodes(partitionMetadata.inSyncReplicaIds)
+                    .setOfflineReplicas(partitionMetadata.offlineReplicaIds));
             }
             responseData.topics().add(metadataResponseTopic);
         });
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java 
b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index 2b1f04c..be72edb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
 
 import java.nio.ByteBuffer;
 import java.util.Optional;
@@ -33,15 +33,12 @@ public final class RequestUtils {
 
     static Optional<Integer> getLeaderEpoch(Struct struct, Field.Int32 
leaderEpochField) {
         int leaderEpoch = struct.getOrElse(leaderEpochField, 
RecordBatch.NO_PARTITION_LEADER_EPOCH);
-        Optional<Integer> leaderEpochOpt = leaderEpoch == 
RecordBatch.NO_PARTITION_LEADER_EPOCH ?
-                Optional.empty() : Optional.of(leaderEpoch);
-        return leaderEpochOpt;
+        return getLeaderEpoch(leaderEpoch);
     }
 
     static Optional<Integer> getLeaderEpoch(int leaderEpoch) {
-        Optional<Integer> leaderEpochOpt = leaderEpoch == 
RecordBatch.NO_PARTITION_LEADER_EPOCH ?
+        return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
             Optional.empty() : Optional.of(leaderEpoch);
-        return leaderEpochOpt;
     }
 
     public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) 
{
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java 
b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
new file mode 100644
index 0000000..afdf89c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class MetadataCacheTest {
+
+    @Test
+    public void testMissingLeaderEndpoint() {
+        // Although the broker attempts to ensure leader information is 
available, the
+        // client metadata cache may retain partition metadata across multiple 
responses.
+        // For example, separate responses may contain conflicting leader 
epochs for
+        // separate partitions and the client will always retain the highest.
+
+        TopicPartition topicPartition = new TopicPartition("topic", 0);
+
+        MetadataResponse.PartitionMetadata partitionMetadata = new 
MetadataResponse.PartitionMetadata(
+                Errors.NONE,
+                topicPartition,
+                Optional.of(5),
+                Optional.of(10),
+                Arrays.asList(5, 6, 7),
+                Arrays.asList(5, 6, 7),
+                Collections.emptyList());
+
+        Map<Integer, Node> nodesById = new HashMap<>();
+        nodesById.put(6, new Node(6, "localhost", 2077));
+        nodesById.put(7, new Node(7, "localhost", 2078));
+        nodesById.put(8, new Node(8, "localhost", 2079));
+
+        MetadataCache cache = new MetadataCache("clusterId",
+                nodesById,
+                Collections.singleton(partitionMetadata),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                null);
+
+        Cluster cluster = cache.cluster();
+        assertNull(cluster.leaderFor(topicPartition));
+
+        PartitionInfo partitionInfo = cluster.partition(topicPartition);
+        Map<Integer, Node> replicas = Arrays.stream(partitionInfo.replicas())
+                .collect(Collectors.toMap(Node::id, Function.identity()));
+        assertNull(partitionInfo.leader());
+        assertEquals(3, replicas.size());
+        assertTrue(replicas.get(5).isEmpty());
+        assertEquals(nodesById.get(6), replicas.get(6));
+        assertEquals(nodesById.get(7), replicas.get(7));
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java 
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 7067e88..3dc4c9a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -46,7 +46,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 import static org.apache.kafka.test.TestUtils.assertOptional;
 import static org.junit.Assert.assertEquals;
@@ -196,9 +195,9 @@ public class MetadataTest {
             MetadataResponse response = new MetadataResponse(struct, version);
             assertFalse(response.hasReliableLeaderEpochs());
             metadata.update(response, 100);
-            assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-            MetadataCache.PartitionInfoAndEpoch info = 
metadata.partitionInfoIfCurrent(tp).get();
-            assertEquals(-1, info.epoch());
+            assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+            MetadataResponse.PartitionMetadata metadata = 
this.metadata.partitionMetadataIfCurrent(tp).get();
+            assertEquals(Optional.empty(), metadata.leaderEpoch);
         }
 
         for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); 
version++) {
@@ -206,9 +205,9 @@ public class MetadataTest {
             MetadataResponse response = new MetadataResponse(struct, version);
             assertTrue(response.hasReliableLeaderEpochs());
             metadata.update(response, 100);
-            assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-            MetadataCache.PartitionInfoAndEpoch info = 
metadata.partitionInfoIfCurrent(tp).get();
-            assertEquals(10, info.epoch());
+            assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+            MetadataResponse.PartitionMetadata info = 
metadata.partitionMetadataIfCurrent(tp).get();
+            assertEquals(Optional.of(10), info.leaderEpoch);
         }
     }
 
@@ -255,13 +254,11 @@ public class MetadataTest {
         metadata.update(new MetadataResponse(data), 101);
         assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
 
-        assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-        MetadataCache.PartitionInfoAndEpoch info = 
metadata.partitionInfoIfCurrent(tp).get();
+        assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+        MetadataResponse.PartitionMetadata metadata = 
this.metadata.partitionMetadataIfCurrent(tp).get();
 
-        List<Integer> cachedIsr = 
Arrays.stream(info.partitionInfo().inSyncReplicas())
-                .map(Node::id).collect(Collectors.toList());
-        assertEquals(Arrays.asList(1, 2, 3), cachedIsr);
-        assertEquals(10, info.epoch());
+        assertEquals(Arrays.asList(1, 2, 3), metadata.inSyncReplicaIds);
+        assertEquals(Optional.of(10), metadata.leaderEpoch);
     }
 
     @Test
@@ -419,14 +416,14 @@ public class MetadataTest {
         // Cache of partition stays, but current partition info is not 
available since it's stale
         assertNotNull(metadata.fetch().partition(tp));
         
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
-        assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
+        assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
 
         // Metadata with older epoch is rejected, metadata state is unchanged
         metadata.update(metadataResponse, 20L);
         assertNotNull(metadata.fetch().partition(tp));
         
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
-        assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
+        assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
 
         // Metadata with equal or newer epoch is accepted
@@ -434,7 +431,7 @@ public class MetadataTest {
         metadata.update(metadataResponse, 30L);
         assertNotNull(metadata.fetch().partition(tp));
         
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
-        assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
+        assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
     }
 
@@ -450,9 +447,9 @@ public class MetadataTest {
         assertFalse(metadata.lastSeenLeaderEpoch(tp).isPresent());
 
         // still works
-        assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-        
assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().partition(),
 0);
-        
assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().leader().id(),
 0);
+        assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+        assertEquals(0, 
metadata.partitionMetadataIfCurrent(tp).get().partition());
+        assertEquals(Optional.of(0), 
metadata.partitionMetadataIfCurrent(tp).get().leaderId);
     }
 
     @Test
@@ -611,8 +608,9 @@ public class MetadataTest {
 
         MetadataResponse metadataResponse = 
TestUtils.metadataUpdateWith("dummy", 2, Collections.emptyMap(), 
partitionCounts, _tp -> 99,
             (error, partition, leader, leaderEpoch, replicas, isr, 
offlineReplicas) ->
-                new MetadataResponse.PartitionMetadata(error, partition, 
node0, leaderEpoch,
-                    Collections.singletonList(node0), Collections.emptyList(), 
Collections.singletonList(node1)));
+                new MetadataResponse.PartitionMetadata(error, partition, 
Optional.of(node0.id()), leaderEpoch,
+                    Collections.singletonList(node0.id()), 
Collections.emptyList(),
+                        Collections.singletonList(node1.id())));
         metadata.update(emptyMetadataResponse(), 0L);
         metadata.update(metadataResponse, 10L);
 
@@ -623,4 +621,79 @@ public class MetadataTest {
         assertEquals(metadata.fetch().nodeById(0).id(), 0);
         assertEquals(metadata.fetch().nodeById(1).id(), 1);
     }
+
+    @Test
+    public void testLeaderMetadataInconsistentWithBrokerMetadata() {
+        // Tests a reordering scenario which can lead to inconsistent leader 
state.
+        // A partition initially has one broker offline. That broker comes 
online and
+        // is elected leader. The client sees these two events in the opposite 
order.
+
+        TopicPartition tp = new TopicPartition("topic", 0);
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node2 = new Node(2, "localhost", 9094);
+
+        // The first metadata received by broker (epoch=10)
+        MetadataResponsePartition firstPartitionMetadata = new 
MetadataResponsePartition()
+                .setPartitionIndex(tp.partition())
+                .setErrorCode(Errors.NONE.code())
+                .setLeaderEpoch(10)
+                .setLeaderId(0)
+                .setReplicaNodes(Arrays.asList(0, 1, 2))
+                .setIsrNodes(Arrays.asList(0, 1, 2))
+                .setOfflineReplicas(Collections.emptyList());
+
+        // The second metadata received has stale metadata (epoch=8)
+        MetadataResponsePartition secondPartitionMetadata = new 
MetadataResponsePartition()
+                .setPartitionIndex(tp.partition())
+                .setErrorCode(Errors.NONE.code())
+                .setLeaderEpoch(8)
+                .setLeaderId(1)
+                .setReplicaNodes(Arrays.asList(0, 1, 2))
+                .setIsrNodes(Arrays.asList(1, 2))
+                .setOfflineReplicas(Collections.singletonList(0));
+
+        metadata.update(new MetadataResponse(new MetadataResponseData()
+                        .setTopics(buildTopicCollection(tp.topic(), 
firstPartitionMetadata))
+                        .setBrokers(buildBrokerCollection(Arrays.asList(node0, 
node1, node2)))),
+                10L);
+
+        metadata.update(new MetadataResponse(new MetadataResponseData()
+                        .setTopics(buildTopicCollection(tp.topic(), 
secondPartitionMetadata))
+                        .setBrokers(buildBrokerCollection(Arrays.asList(node1, 
node2)))),
+                20L);
+
+        assertNull(metadata.fetch().leaderFor(tp));
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+        assertFalse(metadata.currentLeader(tp).leader.isPresent());
+    }
+
+    private MetadataResponseTopicCollection buildTopicCollection(String topic, 
MetadataResponsePartition partitionMetadata) {
+        MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
+                .setErrorCode(Errors.NONE.code())
+                .setName(topic)
+                .setIsInternal(false);
+
+        
topicMetadata.setPartitions(Collections.singletonList(partitionMetadata));
+
+        MetadataResponseTopicCollection topics = new 
MetadataResponseTopicCollection();
+        topics.add(topicMetadata);
+        return topics;
+    }
+
+    private MetadataResponseBrokerCollection buildBrokerCollection(List<Node> 
nodes) {
+        MetadataResponseBrokerCollection brokers = new 
MetadataResponseBrokerCollection();
+        for (Node node : nodes) {
+            MetadataResponseData.MetadataResponseBroker broker = new 
MetadataResponseData.MetadataResponseBroker()
+                    .setNodeId(node.id())
+                    .setHost(node.host())
+                    .setPort(node.port())
+                    .setRack(node.rack());
+            brokers.add(broker);
+        }
+        return brokers;
+    }
+
+
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index c869aaa..cba9b48 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -347,12 +347,12 @@ public class KafkaAdminClientTest {
             List<PartitionMetadata> pms = new ArrayList<>();
             for (PartitionInfo pInfo : 
cluster.availablePartitionsForTopic(topic)) {
                 PartitionMetadata pm = new PartitionMetadata(error,
-                        pInfo.partition(),
-                        pInfo.leader(),
+                        new TopicPartition(topic, pInfo.partition()),
+                        Optional.of(pInfo.leader().id()),
                         Optional.of(234),
-                        Arrays.asList(pInfo.replicas()),
-                        Arrays.asList(pInfo.inSyncReplicas()),
-                        Arrays.asList(pInfo.offlineReplicas()));
+                        
Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList()),
+                        
Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList()),
+                        
Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
                 pms.add(pm);
             }
             TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
@@ -614,8 +614,8 @@ public class KafkaAdminClientTest {
             // Then we respond to the DescribeTopic request
             Node leader = initializedCluster.nodes().get(0);
             MetadataResponse.PartitionMetadata partitionMetadata = new 
MetadataResponse.PartitionMetadata(
-                    Errors.NONE, 0, leader, Optional.of(10), 
singletonList(leader),
-                    singletonList(leader), singletonList(leader));
+                    Errors.NONE, new TopicPartition(topic, 0), 
Optional.of(leader.id()), Optional.of(10),
+                    singletonList(leader.id()), singletonList(leader.id()), 
singletonList(leader.id()));
             
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(),
                     initializedCluster.clusterResource().clusterId(), 1,
                     singletonList(new 
MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
@@ -960,11 +960,12 @@ public class KafkaAdminClientTest {
             List<Node> nodes = env.cluster().nodes();
 
             List<MetadataResponse.PartitionMetadata> partitionMetadata = new 
ArrayList<>();
-            partitionMetadata.add(new 
MetadataResponse.PartitionMetadata(Errors.NONE, tp0.partition(), nodes.get(0),
-                    Optional.of(5), singletonList(nodes.get(0)), 
singletonList(nodes.get(0)),
-                    Collections.emptyList()));
-            partitionMetadata.add(new 
MetadataResponse.PartitionMetadata(Errors.NONE, tp1.partition(), nodes.get(1),
-                    Optional.of(5), singletonList(nodes.get(1)), 
singletonList(nodes.get(1)), Collections.emptyList()));
+            partitionMetadata.add(new 
MetadataResponse.PartitionMetadata(Errors.NONE, tp0,
+                    Optional.of(nodes.get(0).id()), Optional.of(5), 
singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), 
Collections.emptyList()));
+            partitionMetadata.add(new 
MetadataResponse.PartitionMetadata(Errors.NONE, tp1,
+                    Optional.of(nodes.get(1).id()), Optional.of(5), 
singletonList(nodes.get(1).id()),
+                    singletonList(nodes.get(1).id()), 
Collections.emptyList()));
 
             List<MetadataResponse.TopicMetadata> topicMetadata = new 
ArrayList<>();
             topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, 
topic, false, partitionMetadata));
@@ -1024,17 +1025,21 @@ public class KafkaAdminClientTest {
 
             List<MetadataResponse.TopicMetadata> t = new ArrayList<>();
             List<MetadataResponse.PartitionMetadata> p = new ArrayList<>();
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, 
nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), 
Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, 
nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), 
Collections.emptyList()));
-            p.add(new 
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 2, null,
-                    Optional.empty(), singletonList(nodes.get(0)), 
singletonList(nodes.get(0)),
-                    Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 3, 
nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), 
Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 4, 
nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), 
Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 
myTopicPartition0,
+                    Optional.of(nodes.get(0).id()), Optional.of(5), 
singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), 
Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 
myTopicPartition1,
+                    Optional.of(nodes.get(0).id()), Optional.of(5), 
singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), 
Collections.emptyList()));
+            p.add(new 
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 
myTopicPartition2,
+                    Optional.empty(), Optional.empty(), 
singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), 
Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 
myTopicPartition3,
+                    Optional.of(nodes.get(0).id()), Optional.of(5), 
singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), 
Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 
myTopicPartition4,
+                    Optional.of(nodes.get(0).id()), Optional.of(5), 
singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), 
Collections.emptyList()));
 
             t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic", 
false, p));
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b0b6fb8..e6483f5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1391,8 +1391,9 @@ public class ConsumerCoordinatorTest {
             subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
             Node node = new Node(0, "localhost", 9999);
             MetadataResponse.PartitionMetadata partitionMetadata =
-                new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node, 
Optional.empty(),
-                    singletonList(node), singletonList(node), 
singletonList(node));
+                new MetadataResponse.PartitionMetadata(Errors.NONE, new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
+                        Optional.of(node.id()), Optional.empty(), 
singletonList(node.id()), singletonList(node.id()),
+                        singletonList(node.id()));
             MetadataResponse.TopicMetadata topicMetadata = new 
MetadataResponse.TopicMetadata(Errors.NONE,
                 Topic.GROUP_METADATA_TOPIC_NAME, true, 
singletonList(partitionMetadata));
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index b373192..b456e6c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -160,7 +160,8 @@ public class ConsumerMetadataTest {
 
     private MetadataResponse.TopicMetadata topicMetadata(String topic, boolean 
isInternal) {
         MetadataResponse.PartitionMetadata partitionMetadata = new 
MetadataResponse.PartitionMetadata(Errors.NONE,
-                0, node, Optional.of(5), singletonList(node), 
singletonList(node), singletonList(node));
+                new TopicPartition(topic, 0), Optional.of(node.id()), 
Optional.of(5),
+                singletonList(node.id()), singletonList(node.id()), 
singletonList(node.id()));
         return new MetadataResponse.TopicMetadata(Errors.NONE, topic, 
isInternal, singletonList(partitionMetadata));
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 2e45935..843e1bf 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1946,14 +1946,14 @@ public class FetcherTest {
             List<MetadataResponse.PartitionMetadata> altPartitions = new 
ArrayList<>();
             for (MetadataResponse.PartitionMetadata p : partitions) {
                 altPartitions.add(new MetadataResponse.PartitionMetadata(
-                    p.error(),
-                    p.partition(),
-                    null, //no leader
+                    p.error,
+                    p.topicPartition,
+                    Optional.empty(), //no leader
                     Optional.empty(),
-                    p.replicas(),
-                    p.isr(),
-                    p.offlineReplicas())
-                );
+                    p.replicaIds,
+                    p.inSyncReplicaIds,
+                    p.offlineReplicaIds
+                ));
             }
             MetadataResponse.TopicMetadata alteredTopic = new 
MetadataResponse.TopicMetadata(
                 item.error(),
@@ -3023,8 +3023,8 @@ public class FetcherTest {
 
         List<ConsumerRecord<byte[], byte[]>> records;
         assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
-        subscriptions.seekValidated(tp0, new 
SubscriptionState.FetchPosition(0, Optional.empty(), 
metadata.leaderAndEpoch(tp0)));
-        subscriptions.seekValidated(tp1, new 
SubscriptionState.FetchPosition(1, Optional.empty(), 
metadata.leaderAndEpoch(tp1)));
+        subscriptions.seekValidated(tp0, new 
SubscriptionState.FetchPosition(0, Optional.empty(), 
metadata.currentLeader(tp0)));
+        subscriptions.seekValidated(tp1, new 
SubscriptionState.FetchPosition(1, Optional.empty(), 
metadata.currentLeader(tp1)));
 
         // Fetch some records and establish an incremental fetch session.
         LinkedHashMap<TopicPartition, 
FetchResponse.PartitionData<MemoryRecords>> partitions1 = new LinkedHashMap<>();
@@ -3503,7 +3503,7 @@ public class FetcherTest {
 
         // Seek with a position and leader+epoch
         Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
-                metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+                metadata.currentLeader(tp0).leader, Optional.of(epochOne));
         subscriptions.seekUnvalidated(tp0, new 
SubscriptionState.FetchPosition(20L, Optional.of(epochOne), leaderAndEpoch));
         assertFalse(client.isConnected(node.idString()));
         assertTrue(subscriptions.awaitingValidation(tp0));
@@ -3552,7 +3552,7 @@ public class FetcherTest {
 
         // Seek with a position and leader+epoch
         Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
-                metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+                metadata.currentLeader(tp0).leader, Optional.of(epochOne));
         subscriptions.seekUnvalidated(tp0, new 
SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
 
         // Update metadata to epoch=2, enter validation
@@ -3580,7 +3580,7 @@ public class FetcherTest {
         Node node = metadata.fetch().nodes().get(0);
         apiVersions.update(node.idString(), NodeApiVersions.create());
 
-        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, 
Optional.of(epochOne));
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(metadata.currentLeader(tp0).leader, 
Optional.of(epochOne));
         subscriptions.seekUnvalidated(tp0, new 
SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
 
         fetcher.validateOffsetsIfNeeded();
@@ -3623,7 +3623,7 @@ public class FetcherTest {
         apiVersions.update(node.idString(), NodeApiVersions.create());
 
         // Seek with a position and leader+epoch
-        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, 
Optional.of(epochOne));
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(metadata.currentLeader(tp0).leader, 
Optional.of(epochOne));
         subscriptions.seekValidated(tp0, new 
SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
 
         // Update metadata to epoch=2, enter validation
@@ -3693,7 +3693,7 @@ public class FetcherTest {
         apiVersions.update(node.idString(), NodeApiVersions.create());
 
         // Seek
-        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(1));
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(metadata.currentLeader(tp0).leader, Optional.of(1));
         subscriptions.seekValidated(tp0, new 
SubscriptionState.FetchPosition(0, Optional.of(1), leaderAndEpoch));
 
         // Check for truncation, this should cause tp0 to go into validation
@@ -3817,19 +3817,22 @@ public class FetcherTest {
         client.prepareResponse(fullFetchResponse(tp0, buildRecords(1L, 1, 1), 
Errors.NONE, 100L, 0));
         consumerClient.poll(time.timer(0));
         fetchedRecords();
-        Node node = fetcher.selectReadReplica(tp0, 
subscriptions.position(tp0).currentLeader.leader, time.milliseconds());
+
+        Metadata.LeaderAndEpoch leaderAndEpoch = 
subscriptions.position(tp0).currentLeader;
+        assertTrue(leaderAndEpoch.leader.isPresent());
+        Node readReplica = fetcher.selectReadReplica(tp0, 
leaderAndEpoch.leader.get(), time.milliseconds());
 
         AtomicBoolean wokenUp = new AtomicBoolean(false);
         client.setWakeupHook(() -> {
             if (!wokenUp.getAndSet(true)) {
-                consumerClient.disconnectAsync(node);
+                consumerClient.disconnectAsync(readReplica);
                 consumerClient.poll(time.timer(0));
             }
         });
 
         assertEquals(1, fetcher.sendFetches());
 
-        consumerClient.disconnectAsync(node);
+        consumerClient.disconnectAsync(readReplica);
         consumerClient.poll(time.timer(0));
 
         assertEquals(1, fetcher.sendFetches());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
index 55b8754..b3339ac 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
@@ -69,7 +69,7 @@ public class OffsetForLeaderEpochClientTest {
     public void testUnexpectedEmptyResponse() {
         Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new 
HashMap<>();
         positionMap.put(tp0, new SubscriptionState.FetchPosition(0, 
Optional.of(1),
-                new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+                new Metadata.LeaderAndEpoch(Optional.empty(), 
Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
         RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future 
=
@@ -88,7 +88,7 @@ public class OffsetForLeaderEpochClientTest {
     public void testOkResponse() {
         Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new 
HashMap<>();
         positionMap.put(tp0, new SubscriptionState.FetchPosition(0, 
Optional.of(1),
-                new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+                new Metadata.LeaderAndEpoch(Optional.empty(), 
Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
         RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future 
=
@@ -111,7 +111,7 @@ public class OffsetForLeaderEpochClientTest {
     public void testUnauthorizedTopic() {
         Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new 
HashMap<>();
         positionMap.put(tp0, new SubscriptionState.FetchPosition(0, 
Optional.of(1),
-                new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+                new Metadata.LeaderAndEpoch(Optional.empty(), 
Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
         RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future 
=
@@ -131,7 +131,7 @@ public class OffsetForLeaderEpochClientTest {
     public void testRetriableError() {
         Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new 
HashMap<>();
         positionMap.put(tp0, new SubscriptionState.FetchPosition(0, 
Optional.of(1),
-                new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+                new Metadata.LeaderAndEpoch(Optional.empty(), 
Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
         RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future 
=
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 6e1e8a3..74048ad 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -51,7 +51,7 @@ public class SubscriptionStateTest {
     private final TopicPartition tp1 = new TopicPartition(topic, 1);
     private final TopicPartition t1p0 = new TopicPartition(topic1, 0);
     private final MockRebalanceListener rebalanceListener = new 
MockRebalanceListener();
-    private final Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Node.noNode(), Optional.empty());
+    private final Metadata.LeaderAndEpoch leaderAndEpoch = 
Metadata.LeaderAndEpoch.noLeaderOrEpoch();
 
     @Test
     public void partitionAssignment() {
@@ -363,15 +363,17 @@ public class SubscriptionStateTest {
 
         // Seek with no offset epoch requires no validation no matter what the 
current leader is
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, 
Optional.empty(),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(5))));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
 
-        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new 
Metadata.LeaderAndEpoch(broker1, Optional.empty())));
+        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new 
Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.empty())));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
 
-        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new 
Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new 
Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(10))));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
     }
@@ -383,12 +385,12 @@ public class SubscriptionStateTest {
 
         // Seek with no offset epoch requires no validation no matter what the 
current leader is
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, 
Optional.of(2),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(5))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, 
Optional.empty(),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(5))));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
     }
@@ -399,22 +401,25 @@ public class SubscriptionStateTest {
         state.assignFromUser(Collections.singleton(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, 
Optional.of(2),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(5))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
 
         // Update using the current leader and epoch
-        assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new 
Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+        assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new 
Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(5))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
 
         // Update with a newer leader and epoch
-        assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new 
Metadata.LeaderAndEpoch(broker1, Optional.of(15))));
+        assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new 
Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(15))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
 
         // If the updated leader has no epoch information, then skip 
validation and begin fetching
-        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new 
Metadata.LeaderAndEpoch(broker1, Optional.empty())));
+        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new 
Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.empty())));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
     }
@@ -425,13 +430,13 @@ public class SubscriptionStateTest {
         state.assignFromUser(Collections.singleton(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, 
Optional.of(5),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(10))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
         assertEquals(10L, state.position(tp0).offset);
 
         state.seekValidated(tp0, new SubscriptionState.FetchPosition(8L, 
Optional.of(4),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(10))));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
         assertEquals(8L, state.position(tp0).offset);
@@ -443,7 +448,7 @@ public class SubscriptionStateTest {
         state.assignFromUser(Collections.singleton(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, 
Optional.of(5),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(10))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
         assertEquals(10L, state.position(tp0).offset);
@@ -460,7 +465,7 @@ public class SubscriptionStateTest {
         state.assignFromUser(Collections.singleton(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, 
Optional.of(5),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(10))));
         assertTrue(state.awaitingValidation(tp0));
 
         state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
@@ -478,7 +483,7 @@ public class SubscriptionStateTest {
         int initialOffsetEpoch = 5;
 
         SubscriptionState.FetchPosition initialPosition = new 
SubscriptionState.FetchPosition(initialOffset,
-                Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, initialPosition);
         assertTrue(state.awaitingValidation(tp0));
 
@@ -501,12 +506,12 @@ public class SubscriptionStateTest {
         int updateOffsetEpoch = 8;
 
         SubscriptionState.FetchPosition initialPosition = new 
SubscriptionState.FetchPosition(initialOffset,
-                Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, initialPosition);
         assertTrue(state.awaitingValidation(tp0));
 
         SubscriptionState.FetchPosition updatePosition = new 
SubscriptionState.FetchPosition(updateOffset,
-                Optional.of(updateOffsetEpoch), new 
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(updateOffsetEpoch), new 
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, updatePosition);
 
         Optional<OffsetAndMetadata> divergentOffsetMetadataOpt = 
state.maybeCompleteValidation(tp0, initialPosition,
@@ -526,7 +531,7 @@ public class SubscriptionStateTest {
         int initialOffsetEpoch = 5;
 
         SubscriptionState.FetchPosition initialPosition = new 
SubscriptionState.FetchPosition(initialOffset,
-                Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, initialPosition);
         assertTrue(state.awaitingValidation(tp0));
 
@@ -551,7 +556,7 @@ public class SubscriptionStateTest {
         int divergentOffsetEpoch = 7;
 
         SubscriptionState.FetchPosition initialPosition = new 
SubscriptionState.FetchPosition(initialOffset,
-                Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, initialPosition);
         assertTrue(state.awaitingValidation(tp0));
 
@@ -561,7 +566,7 @@ public class SubscriptionStateTest {
         assertFalse(state.awaitingValidation(tp0));
 
         SubscriptionState.FetchPosition updatedPosition = new 
SubscriptionState.FetchPosition(divergentOffset,
-                Optional.of(divergentOffsetEpoch), new 
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(divergentOffsetEpoch), new 
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         assertEquals(updatedPosition, state.position(tp0));
     }
 
@@ -578,7 +583,7 @@ public class SubscriptionStateTest {
         int divergentOffsetEpoch = 7;
 
         SubscriptionState.FetchPosition initialPosition = new 
SubscriptionState.FetchPosition(initialOffset,
-                Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, initialPosition);
         assertTrue(state.awaitingValidation(tp0));
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8330e6a..487eabf 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -169,6 +169,8 @@ import java.util.Optional;
 import java.util.Set;
 
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.protocol.ApiKeys.FETCH;
 import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.apache.kafka.test.TestUtils.toBuffer;
@@ -637,7 +639,7 @@ public class RequestResponseTest {
         responseData.put(new TopicPartition("bar", 1), new 
FetchResponse.PartitionData<>(Errors.NONE, 900000,
                 5, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), 
null, records));
         responseData.put(new TopicPartition("foo", 0), new 
FetchResponse.PartitionData<>(Errors.NONE, 70000,
-                6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), 
Collections.emptyList(), records));
+                6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), 
emptyList(), records));
 
         FetchResponse<MemoryRecords> response = new 
FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
         FetchResponse deserialized = 
FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -1186,19 +1188,21 @@ public class RequestResponseTest {
 
     private MetadataResponse createMetadataResponse() {
         Node node = new Node(1, "host1", 1001);
-        List<Node> replicas = asList(node);
-        List<Node> isr = asList(node);
-        List<Node> offlineReplicas = asList();
+        List<Integer> replicas = singletonList(node.id());
+        List<Integer> isr = singletonList(node.id());
+        List<Integer> offlineReplicas = emptyList();
 
         List<MetadataResponse.TopicMetadata> allTopicMetadata = new 
ArrayList<>();
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, 
"__consumer_offsets", true,
-                asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, 
node,
-                        Optional.of(5), replicas, isr, offlineReplicas))));
+                asList(new MetadataResponse.PartitionMetadata(Errors.NONE,
+                        new TopicPartition("__consumer_offsets", 1),
+                        Optional.of(node.id()), Optional.of(5), replicas, isr, 
offlineReplicas))));
         allTopicMetadata.add(new 
MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
-                Collections.emptyList()));
+                emptyList()));
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, 
"topic3", false,
-            asList(new 
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 0, null,
-                Optional.empty(), replicas, isr, offlineReplicas))));
+            asList(new 
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE,
+                    new TopicPartition("topic3", 0), Optional.empty(),
+                    Optional.empty(), replicas, isr, offlineReplicas))));
 
         return MetadataResponse.prepareResponse(asList(node), null, 
MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
     }
@@ -1808,13 +1812,13 @@ public class RequestResponseTest {
         Map<ConfigResource, Collection<String>> resources = new HashMap<>();
         resources.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), 
asList("foo", "bar"));
         resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), 
null);
-        resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic 
a"), Collections.emptyList());
+        resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic 
a"), emptyList());
         return new DescribeConfigsRequest.Builder(resources).build((short) 
version);
     }
 
     private DescribeConfigsResponse createDescribeConfigsResponse() {
         Map<ConfigResource, DescribeConfigsResponse.Config> configs = new 
HashMap<>();
-        List<DescribeConfigsResponse.ConfigSynonym> synonyms = 
Collections.emptyList();
+        List<DescribeConfigsResponse.ConfigSynonym> synonyms = emptyList();
         List<DescribeConfigsResponse.ConfigEntry> configEntries = asList(
                 new DescribeConfigsResponse.ConfigEntry("config_name", 
"config_value",
                         
DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG, true, false, 
synonyms),
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 5782b6d..ad2ad99 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -159,9 +159,10 @@ public class TestUtils {
             for (int i = 0; i < numPartitions; i++) {
                 TopicPartition tp = new TopicPartition(topic, i);
                 Node leader = nodes.get(i % nodes.size());
-                List<Node> replicas = Collections.singletonList(leader);
+                List<Integer> replicaIds = 
Collections.singletonList(leader.id());
                 partitionMetadata.add(partitionSupplier.supply(
-                        Errors.NONE, i, leader, 
Optional.ofNullable(epochSupplier.apply(tp)), replicas, replicas, replicas));
+                        Errors.NONE, tp, Optional.of(leader.id()), 
Optional.ofNullable(epochSupplier.apply(tp)),
+                        replicaIds, replicaIds, replicaIds));
             }
 
             topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, 
topic,
@@ -180,12 +181,12 @@ public class TestUtils {
     @FunctionalInterface
     public interface PartitionMetadataSupplier {
         MetadataResponse.PartitionMetadata supply(Errors error,
-                              int partition,
-                              Node leader,
+                              TopicPartition partition,
+                              Optional<Integer> leaderId,
                               Optional<Integer> leaderEpoch,
-                              List<Node> replicas,
-                              List<Node> isr,
-                              List<Node> offlineReplicas);
+                              List<Integer> replicas,
+                              List<Integer> isr,
+                              List<Integer> offlineReplicas);
     }
 
     public static Cluster clusterWith(final int nodes, final String topic, 
final int partitions) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index e3adb04..8a14cd7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1290,11 +1290,13 @@ class KafkaApis(val requestChannel: RequestChannel,
         } else {
           val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
             .find(_.partition == partition)
-            .map(_.leader)
-            .flatMap(p => Option(p))
+            .filter(_.leaderId.isPresent)
+            .flatMap(metadata => 
metadataCache.getAliveBroker(metadata.leaderId.get))
+            .flatMap(_.getNode(request.context.listenerName))
+            .filterNot(_.isEmpty)
 
           coordinatorEndpoint match {
-            case Some(endpoint) if !endpoint.isEmpty =>
+            case Some(endpoint) =>
               createFindCoordinatorResponse(Errors.NONE, endpoint)
             case _ =>
               createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 1ad0e41..10c25b5 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.util
 import java.util.{Collections, Optional}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
@@ -55,18 +56,22 @@ class MetadataCache(brokerId: Int) extends Logging {
 
   // This method is the main hotspot when it comes to the performance of 
metadata requests,
   // we should be careful about adding additional logic here. Relatedly, 
`brokers` is
-  // `Iterable[Integer]` instead of `Iterable[Int]` to avoid a collection copy.
+  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
   // filterUnavailableEndpoints exists to support v0 MetadataResponses
-  private def getEndpoints(snapshot: MetadataSnapshot, brokers: 
Iterable[java.lang.Integer], listenerName: ListenerName, 
filterUnavailableEndpoints: Boolean): Seq[Node] = {
-    val result = new 
mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size))
-    brokers.foreach { brokerId =>
-      val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match {
-        case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId, 
"", -1)) else None
-        case Some(node) => Some(node)
+  private def maybeFilterAliveReplicas(snapshot: MetadataSnapshot,
+                                       brokers: java.util.List[Integer],
+                                       listenerName: ListenerName,
+                                       filterUnavailableEndpoints: Boolean): 
java.util.List[Integer] = {
+    if (!filterUnavailableEndpoints) {
+      brokers
+    } else {
+      val res = new 
util.ArrayList[Integer](math.min(snapshot.aliveBrokers.size, brokers.size))
+      for (brokerId <- brokers.asScala) {
+        if (hasAliveEndpoint(snapshot, brokerId, listenerName))
+          res.add(brokerId)
       }
-      endpoint.foreach(result +=)
+      res
     }
-    result
   }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
@@ -80,53 +85,71 @@ class MetadataCache(brokerId: Int) extends Logging {
         val leaderBrokerId = partitionState.leader
         val leaderEpoch = partitionState.leaderEpoch
         val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, 
listenerName)
-        val replicas = partitionState.replicas.asScala
-        val replicaInfo = getEndpoints(snapshot, replicas, listenerName, 
errorUnavailableEndpoints)
-        val offlineReplicaInfo = getEndpoints(snapshot, 
partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints)
 
-        val isr = partitionState.isr.asScala
-        val isrInfo = getEndpoints(snapshot, isr, listenerName, 
errorUnavailableEndpoints)
+        val replicas = partitionState.replicas
+        val filteredReplicas = maybeFilterAliveReplicas(snapshot, replicas, 
listenerName, errorUnavailableEndpoints)
+
+        val isr = partitionState.isr
+        val filteredIsr = maybeFilterAliveReplicas(snapshot, isr, 
listenerName, errorUnavailableEndpoints)
+
+        val offlineReplicas = partitionState.offlineReplicas
+
         maybeLeader match {
           case None =>
             val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we 
are already holding the read lock
               debug(s"Error while fetching metadata for $topicPartition: 
leader not available")
               Errors.LEADER_NOT_AVAILABLE
             } else {
-              debug(s"Error while fetching metadata for $topicPartition: 
listener $listenerName not found on leader $leaderBrokerId")
+              debug(s"Error while fetching metadata for $topicPartition: 
listener $listenerName " +
+                s"not found on leader $leaderBrokerId")
               if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else 
Errors.LEADER_NOT_AVAILABLE
             }
-            new MetadataResponse.PartitionMetadata(error, partitionId.toInt, 
Node.noNode(),
-              Optional.empty(), replicaInfo.asJava, isrInfo.asJava,
-              offlineReplicaInfo.asJava)
+            new MetadataResponse.PartitionMetadata(error, topicPartition, 
Optional.empty(),
+              Optional.of(leaderEpoch), filteredReplicas, filteredIsr, 
offlineReplicas)
 
           case Some(leader) =>
-            if (replicaInfo.size < replicas.size) {
+            if (filteredReplicas.size < replicas.size) {
               debug(s"Error while fetching metadata for $topicPartition: 
replica information not available for " +
-                s"following brokers 
${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
-
-              new 
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, 
partitionId.toInt, leader,
-                Optional.empty(), replicaInfo.asJava, isrInfo.asJava, 
offlineReplicaInfo.asJava)
-            } else if (isrInfo.size < isr.size) {
+                s"following brokers 
${replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
+              new 
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition,
+                Optional.of(leader.id), Optional.of(leaderEpoch), 
filteredReplicas, filteredIsr, offlineReplicas)
+            } else if (filteredIsr.size < isr.size) {
               debug(s"Error while fetching metadata for $topicPartition: in 
sync replica information not available for " +
-                s"following brokers 
${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}")
-              new 
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, 
partitionId.toInt, leader,
-                Optional.empty(), replicaInfo.asJava, isrInfo.asJava, 
offlineReplicaInfo.asJava)
+                s"following brokers 
${isr.asScala.filterNot(filteredIsr.contains).mkString(",")}")
+              new 
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition,
+                Optional.of(leader.id), Optional.of(leaderEpoch), 
filteredReplicas, filteredIsr, offlineReplicas)
             } else {
-              new MetadataResponse.PartitionMetadata(Errors.NONE, 
partitionId.toInt, leader, Optional.of(leaderEpoch),
-                replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
+              new MetadataResponse.PartitionMetadata(Errors.NONE, 
topicPartition,
+                Optional.of(leader.id), Optional.of(leaderEpoch), 
filteredReplicas, filteredIsr, offlineReplicas)
             }
         }
       }
     }
   }
 
-  private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, 
listenerName: ListenerName): Option[Node] =
-    // Returns None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
-    // Since listeners can be added dynamically, a broker with a missing 
listener could be a transient error.
+  /**
+   * Check whether a broker is alive and has a registered listener matching 
the provided name.
+   * This method was added to avoid unnecessary allocations in 
[[maybeFilterAliveReplicas]], which is
+   * a hotspot in metadata handling.
+   */
+  private def hasAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, 
listenerName: ListenerName): Boolean = {
+    snapshot.aliveNodes.get(brokerId).exists(_.contains(listenerName))
+  }
+
+  /**
+   * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
+   * be added dynamically, so a broker with a missing listener could be a 
transient error.
+   *
+   * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
+   */
+  private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, 
listenerName: ListenerName): Option[Node] = {
     snapshot.aliveNodes.get(brokerId).flatMap(_.get(listenerName))
+  }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  def getTopicMetadata(topics: Set[String], listenerName: ListenerName, 
errorUnavailableEndpoints: Boolean = false,
+  def getTopicMetadata(topics: Set[String],
+                       listenerName: ListenerName,
+                       errorUnavailableEndpoints: Boolean = false,
                        errorUnavailableListeners: Boolean = false): 
Seq[MetadataResponse.TopicMetadata] = {
     val snapshot = metadataSnapshot
     topics.toSeq.flatMap { topic =>
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index efd721d..cdb4c0a 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.admin
 
+import java.util.Optional
+
 import kafka.controller.ReplicaAssignment
 import kafka.server.BaseRequestTest
 import kafka.utils.TestUtils
@@ -101,9 +103,14 @@ class AddPartitionsTest extends BaseRequestTest {
     assertEquals(partitions.size, 3)
     assertEquals(1, partitions(1).partition)
     assertEquals(2, partitions(2).partition)
-    val replicas = partitions(1).replicas
-    assertEquals(replicas.size, 2)
-    assertTrue(replicas.contains(partitions(1).leader))
+
+    for (partition <- partitions) {
+      val replicas = partition.replicaIds
+      assertEquals(2, replicas.size)
+      assertTrue(partition.leaderId.isPresent)
+      val leaderId = partition.leaderId.get
+      assertTrue(replicas.contains(leaderId))
+    }
   }
 
   @Test
@@ -131,10 +138,9 @@ class AddPartitionsTest extends BaseRequestTest {
     assertEquals(0, partitionMetadata(0).partition)
     assertEquals(1, partitionMetadata(1).partition)
     assertEquals(2, partitionMetadata(2).partition)
-    val replicas = partitionMetadata(1).replicas
+    val replicas = partitionMetadata(1).replicaIds
     assertEquals(2, replicas.size)
-    assertTrue(replicas.asScala.head.id == 0 || replicas.asScala.head.id == 1)
-    assertTrue(replicas.asScala(1).id == 0 || replicas.asScala(1).id == 1)
+    assertEquals(Set(0, 1), replicas.asScala.toSet)
   }
 
   @Test
@@ -185,9 +191,8 @@ class AddPartitionsTest extends BaseRequestTest {
     assertTrue(s"Partition $partitionId should exist", partitionOpt.isDefined)
     val partition = partitionOpt.get
 
-    assertNotNull("Partition leader should exist", partition.leader)
-    assertEquals("Partition leader id should match", expectedLeaderId, 
partition.leaderId)
-    assertEquals("Replica set should match", expectedReplicas, 
partition.replicas.asScala.map(_.id).toSet)
+    assertEquals("Partition leader id should match", 
Optional.of(expectedLeaderId), partition.leaderId)
+    assertEquals("Replica set should match", expectedReplicas, 
partition.replicaIds.asScala.toSet)
   }
 
 }
diff --git 
a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala 
b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index b110139..5281d80 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -584,7 +584,7 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
               testPartitionMetadata match {
                 case None => fail(s"Partition metadata is not found in 
metadata cache")
                 case Some(metadata) => {
-                  result && metadata.error() == Errors.LEADER_NOT_AVAILABLE
+                  result && metadata.error == Errors.LEADER_NOT_AVAILABLE
                 }
               }
             }
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 5f1f671..8ca2c70 100644
--- 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -131,9 +131,9 @@ abstract class AbstractCreateTopicsRequestTest extends 
BaseRequestTest {
 
           if (replication == -1) {
             assertEquals("The topic should have the default replication 
factor",
-              configs.head.defaultReplicationFactor, 
metadataForTopic.partitionMetadata.asScala.head.replicas.size)
+              configs.head.defaultReplicationFactor, 
metadataForTopic.partitionMetadata.asScala.head.replicaIds.size)
           } else {
-            assertEquals("The topic should have the correct replication 
factor", replication, 
metadataForTopic.partitionMetadata.asScala.head.replicas.size)
+            assertEquals("The topic should have the correct replication 
factor", replication, 
metadataForTopic.partitionMetadata.asScala.head.replicaIds.size)
           }
         }
       }
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index a42a86a..60cff13 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -128,16 +128,12 @@ class MetadataCacheTest {
         partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata, 
partitionId) =>
           assertEquals(Errors.NONE, partitionMetadata.error)
           assertEquals(partitionId, partitionMetadata.partition)
-          val leader = partitionMetadata.leader
           val partitionState = topicPartitionStates.find(_.partitionIndex == 
partitionId).getOrElse(
             Assertions.fail(s"Unable to find partition state for partition 
$partitionId"))
-          assertEquals(partitionState.leader, leader.id)
+          assertEquals(Optional.of(partitionState.leader), 
partitionMetadata.leaderId)
           assertEquals(Optional.of(partitionState.leaderEpoch), 
partitionMetadata.leaderEpoch)
-          assertEquals(partitionState.isr, 
partitionMetadata.isr.asScala.map(_.id).asJava)
-          assertEquals(partitionState.replicas, 
partitionMetadata.replicas.asScala.map(_.id).asJava)
-          val endpoint = 
endpoints(partitionMetadata.leader.id).find(_.listener == 
listenerName.value).get
-          assertEquals(endpoint.host, leader.host)
-          assertEquals(endpoint.port, leader.port)
+          assertEquals(partitionState.isr, partitionMetadata.inSyncReplicaIds)
+          assertEquals(partitionState.replicas, partitionMetadata.replicaIds)
         }
       }
 
@@ -267,9 +263,8 @@ class MetadataCacheTest {
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
     assertEquals(expectedError, partitionMetadata.error)
-    assertFalse(partitionMetadata.isr.isEmpty)
-    assertEquals(1, partitionMetadata.replicas.size)
-    assertEquals(0, partitionMetadata.replicas.get(0).id)
+    assertFalse(partitionMetadata.inSyncReplicaIds.isEmpty)
+    assertEquals(List(0), partitionMetadata.replicaIds.asScala)
   }
 
   @Test
@@ -326,8 +321,8 @@ class MetadataCacheTest {
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
     assertEquals(Errors.NONE, partitionMetadata.error)
-    assertEquals(Set(0, 1), partitionMetadata.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0, 1), partitionMetadata.replicaIds.asScala.toSet)
+    assertEquals(Set(0), partitionMetadata.inSyncReplicaIds.asScala.toSet)
 
     // Validate errorUnavailableEndpoints = true
     val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), 
listenerName, errorUnavailableEndpoints = true)
@@ -342,8 +337,8 @@ class MetadataCacheTest {
     val partitionMetadataWithError = partitionMetadatasWithError.get(0)
     assertEquals(0, partitionMetadataWithError.partition)
     assertEquals(Errors.REPLICA_NOT_AVAILABLE, 
partitionMetadataWithError.error)
-    assertEquals(Set(0), 
partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0), 
partitionMetadataWithError.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet)
+    assertEquals(Set(0), 
partitionMetadataWithError.inSyncReplicaIds.asScala.toSet)
   }
 
   @Test
@@ -400,8 +395,8 @@ class MetadataCacheTest {
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
     assertEquals(Errors.NONE, partitionMetadata.error)
-    assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0, 1), partitionMetadata.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0), partitionMetadata.replicaIds.asScala.toSet)
+    assertEquals(Set(0, 1), partitionMetadata.inSyncReplicaIds.asScala.toSet)
 
     // Validate errorUnavailableEndpoints = true
     val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), 
listenerName, errorUnavailableEndpoints = true)
@@ -416,8 +411,8 @@ class MetadataCacheTest {
     val partitionMetadataWithError = partitionMetadatasWithError.get(0)
     assertEquals(0, partitionMetadataWithError.partition)
     assertEquals(Errors.REPLICA_NOT_AVAILABLE, 
partitionMetadataWithError.error)
-    assertEquals(Set(0), 
partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0), 
partitionMetadataWithError.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet)
+    assertEquals(Set(0), 
partitionMetadataWithError.inSyncReplicaIds.asScala.toSet)
   }
 
   @Test
@@ -455,7 +450,7 @@ class MetadataCacheTest {
     val topicMetadata = cache.getTopicMetadata(Set(topic), 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
     assertEquals(1, topicMetadata.size)
     assertEquals(1, topicMetadata.head.partitionMetadata.size)
-    assertEquals(-1, topicMetadata.head.partitionMetadata.get(0).leaderId)
+    assertEquals(Optional.empty, 
topicMetadata.head.partitionMetadata.get(0).leaderId)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index aa6a53f..599ae17 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -17,11 +17,10 @@
 
 package kafka.server
 
-import java.util.Properties
+import java.util.{Optional, Properties}
 
 import kafka.network.SocketServer
 import kafka.utils.TestUtils
-import org.apache.kafka.common.Node
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.MetadataRequestData
@@ -202,8 +201,9 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals(1, topicMetadata1.partitionMetadata.size)
     val partitionMetadata = topicMetadata1.partitionMetadata.asScala.head
     assertEquals(0, partitionMetadata.partition)
-    assertEquals(2, partitionMetadata.replicas.size)
-    assertNotNull(partitionMetadata.leader)
+    assertEquals(2, partitionMetadata.replicaIds.size)
+    assertTrue(partitionMetadata.leaderId.isPresent)
+    assertTrue(partitionMetadata.leaderId.get >= 0)
   }
 
   @Test
@@ -243,9 +243,9 @@ class MetadataRequestTest extends BaseRequestTest {
       assertEquals(Set(0, 1), 
topicMetadata.partitionMetadata.asScala.map(_.partition).toSet)
       topicMetadata.partitionMetadata.asScala.foreach { partitionMetadata =>
         val assignment = replicaAssignment(partitionMetadata.partition)
-        assertEquals(assignment, partitionMetadata.replicas.asScala.map(_.id))
-        assertEquals(assignment, partitionMetadata.isr.asScala.map(_.id))
-        assertEquals(assignment.head, partitionMetadata.leader.id)
+        assertEquals(assignment, partitionMetadata.replicaIds.asScala)
+        assertEquals(assignment, partitionMetadata.inSyncReplicaIds.asScala)
+        assertEquals(Optional.of(assignment.head), partitionMetadata.leaderId)
       }
     }
   }
@@ -268,23 +268,22 @@ class MetadataRequestTest extends BaseRequestTest {
     createTopic(replicaDownTopic, 1, replicaCount)
 
     // Kill a replica node that is not the leader
-    val metadataResponse = sendMetadataRequest(new 
MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
+    val metadataResponse = sendMetadataRequest(new 
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
     val partitionMetadata = 
metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     val downNode = servers.find { server =>
       val serverId = server.dataPlaneRequestProcessor.brokerId
-      val leaderId = partitionMetadata.leader.id
-      val replicaIds = partitionMetadata.replicas.asScala.map(_.id)
+      val leaderId = partitionMetadata.leaderId
+      val replicaIds = partitionMetadata.replicaIds.asScala
       serverId != leaderId && replicaIds.contains(serverId)
     }.get
     downNode.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-      val response = sendMetadataRequest(new 
MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
-      val metadata = 
response.topicMetadata.asScala.head.partitionMetadata.asScala.head
-      val replica = metadata.replicas.asScala.find(_.id == 
downNode.dataPlaneRequestProcessor.brokerId).get
-      replica.host == "" & replica.port == -1
+      val response = sendMetadataRequest(new 
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
+      !response.brokers.asScala.exists(_.id == 
downNode.dataPlaneRequestProcessor.brokerId)
     }, "Replica was not found down", 5000)
 
+
     // Validate version 0 still filters unavailable replicas and contains error
     val v0MetadataResponse = sendMetadataRequest(new 
MetadataRequest(requestData(List(replicaDownTopic), true), 0.toShort))
     val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
@@ -293,37 +292,35 @@ class MetadataRequestTest extends BaseRequestTest {
     assertTrue("Response should have one topic", 
v0MetadataResponse.topicMetadata.size == 1)
     val v0PartitionMetadata = 
v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     assertTrue("PartitionMetadata should have an error", 
v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE)
-    assertTrue(s"Response should have ${replicaCount - 1} replicas", 
v0PartitionMetadata.replicas.size == replicaCount - 1)
+    assertTrue(s"Response should have ${replicaCount - 1} replicas", 
v0PartitionMetadata.replicaIds.size == replicaCount - 1)
 
     // Validate version 1 returns unavailable replicas with no error
-    val v1MetadataResponse = sendMetadataRequest(new 
MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
+    val v1MetadataResponse = sendMetadataRequest(new 
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(1))
     val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
     assertTrue("Response should have no errors", 
v1MetadataResponse.errors.isEmpty)
     assertFalse(s"The downed broker should not be in the brokers list", 
v1BrokerIds.contains(downNode))
     assertEquals("Response should have one topic", 1, 
v1MetadataResponse.topicMetadata.size)
     val v1PartitionMetadata = 
v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     assertEquals("PartitionMetadata should have no errors", Errors.NONE, 
v1PartitionMetadata.error)
-    assertEquals(s"Response should have $replicaCount replicas", replicaCount, 
v1PartitionMetadata.replicas.size)
+    assertEquals(s"Response should have $replicaCount replicas", replicaCount, 
v1PartitionMetadata.replicaIds.size)
   }
 
   @Test
   def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
     def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
       val activeBrokers = servers.filter(_.brokerState.currentState != 
NotRunning.state)
-      val expectedIsr = activeBrokers.map { broker =>
-        new Node(broker.config.brokerId, "localhost", 
TestUtils.boundPort(broker), broker.config.rack.orNull)
-      }.sortBy(_.id)
+      val expectedIsr = activeBrokers.map(_.config.brokerId).toSet
 
       // Assert that topic metadata at new brokers is updated correctly
       activeBrokers.foreach { broker =>
-        var actualIsr: Seq[Node] = Seq.empty
+        var actualIsr = Set.empty[Int]
         TestUtils.waitUntilTrue(() => {
           val metadataResponse = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic).asJava, false).build,
             Some(brokerSocketServer(broker.config.brokerId)))
           val firstPartitionMetadata = 
metadataResponse.topicMetadata.asScala.headOption.flatMap(_.partitionMetadata.asScala.headOption)
           actualIsr = firstPartitionMetadata.map { partitionMetadata =>
-            partitionMetadata.isr.asScala.sortBy(_.id)
-          }.getOrElse(Seq.empty)
+            partitionMetadata.inSyncReplicaIds.asScala.map(Int.unbox).toSet
+          }.getOrElse(Set.empty)
           expectedIsr == actualIsr
         }, s"Topic metadata not updated correctly in broker $broker\n" +
           s"Expected ISR: $expectedIsr \n" +

Reply via email to