This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1dd1e7f  KAFKA-10545: Create topic IDs and propagate to brokers (#9626)
1dd1e7f is described below

commit 1dd1e7f945d7a8c1dc177223cd88800680f1ff46
Author: Justine Olshan <jols...@confluent.io>
AuthorDate: Fri Dec 18 17:19:50 2020 -0500

    KAFKA-10545: Create topic IDs and propagate to brokers (#9626)
    
    This change propagates topic ids to brokers in LeaderAndIsr Request. It 
also removes the topic name from the LeaderAndIsr Response, reorganizes the 
response to be sorted by topic, and includes the topic ID.
    
    In addition, the topic ID is persisted to each replica in Log as well as in 
a file on disk. This file is read on startup and if the topic ID exists, it 
will be reloaded.
    
    Reviewers: David Jacot <dja...@confluent.io>, dengziming 
<dengziming1...@gmail.com>, Nikhil Bhatia <rite2nik...@gmail.com>, Rajini 
Sivaram <rajinisiva...@googlemail.com>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../kafka/common/requests/LeaderAndIsrRequest.java |  57 ++++++--
 .../common/requests/LeaderAndIsrResponse.java      |  58 +++++++--
 .../common/message/LeaderAndIsrRequest.json        |  12 +-
 .../common/message/LeaderAndIsrResponse.json       |  24 +++-
 .../common/requests/LeaderAndIsrRequestTest.java   |  38 +++++-
 .../common/requests/LeaderAndIsrResponseTest.java  | 114 ++++++++++++----
 .../kafka/common/requests/RequestResponseTest.java |  60 ++++++---
 core/src/main/scala/kafka/api/ApiVersion.scala     |   2 +-
 .../controller/ControllerChannelManager.scala      |  10 +-
 .../scala/kafka/controller/KafkaController.scala   |   7 +-
 core/src/main/scala/kafka/log/Log.scala            |  21 ++-
 .../scala/kafka/server/PartitionMetadataFile.scala | 144 +++++++++++++++++++++
 .../main/scala/kafka/server/ReplicaManager.scala   |  63 +++++++--
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   9 +-
 .../controller/ControllerChannelManagerTest.scala  |  46 +++++--
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   8 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  46 ++++++-
 .../kafka/server/BrokerEpochIntegrationTest.scala  |   5 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   5 +-
 .../unit/kafka/server/LeaderElectionTest.scala     |   7 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 136 ++++++++++++++++++-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +
 .../unit/kafka/server/ServerShutdownTest.scala     |   5 +-
 .../apache/kafka/message/MessageDataGenerator.java |   1 -
 26 files changed, 760 insertions(+), 124 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index b44e713..8539034 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -100,7 +100,7 @@
               
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
 
     <suppress checks="NPathComplexity"
-              
files="MemoryRecordsTest|MetricsTest|TestSslUtils|AclAuthorizerBenchmark"/>
+              
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
 
     <suppress 
checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 833e025..939212a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -17,11 +17,13 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.LeaderAndIsrRequestData;
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrLiveLeader;
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrTopicState;
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
 import org.apache.kafka.common.message.LeaderAndIsrResponseData;
+import 
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
 import 
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
@@ -43,12 +45,15 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
     public static class Builder extends 
AbstractControlRequest.Builder<LeaderAndIsrRequest> {
 
         private final List<LeaderAndIsrPartitionState> partitionStates;
+        private final Map<String, Uuid> topicIds;
         private final Collection<Node> liveLeaders;
 
         public Builder(short version, int controllerId, int controllerEpoch, 
long brokerEpoch,
-                       List<LeaderAndIsrPartitionState> partitionStates, 
Collection<Node> liveLeaders) {
+                       List<LeaderAndIsrPartitionState> partitionStates, 
Map<String, Uuid> topicIds,
+                       Collection<Node> liveLeaders) {
             super(ApiKeys.LEADER_AND_ISR, version, controllerId, 
controllerEpoch, brokerEpoch);
             this.partitionStates = partitionStates;
+            this.topicIds = topicIds;
             this.liveLeaders = liveLeaders;
         }
 
@@ -67,7 +72,7 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
                 .setLiveLeaders(leaders);
 
             if (version >= 2) {
-                Map<String, LeaderAndIsrTopicState> topicStatesMap = 
groupByTopic(partitionStates);
+                Map<String, LeaderAndIsrTopicState> topicStatesMap = 
groupByTopic(partitionStates, topicIds);
                 data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
             } else {
                 data.setUngroupedPartitionStates(partitionStates);
@@ -76,13 +81,14 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
             return new LeaderAndIsrRequest(data, version);
         }
 
-        private static Map<String, LeaderAndIsrTopicState> 
groupByTopic(List<LeaderAndIsrPartitionState> partitionStates) {
+        private static Map<String, LeaderAndIsrTopicState> 
groupByTopic(List<LeaderAndIsrPartitionState> partitionStates, Map<String, 
Uuid> topicIds) {
             Map<String, LeaderAndIsrTopicState> topicStates = new HashMap<>();
             // We don't null out the topic name in 
LeaderAndIsrRequestPartition since it's ignored by
             // the generated code if version >= 2
             for (LeaderAndIsrPartitionState partition : partitionStates) {
-                LeaderAndIsrTopicState topicState = 
topicStates.computeIfAbsent(partition.topicName(),
-                    t -> new 
LeaderAndIsrTopicState().setTopicName(partition.topicName()));
+                LeaderAndIsrTopicState topicState = 
topicStates.computeIfAbsent(partition.topicName(), t -> new 
LeaderAndIsrTopicState()
+                                .setTopicName(partition.topicName())
+                                
.setTopicId(topicIds.getOrDefault(partition.topicName(), Uuid.ZERO_UUID)));
                 topicState.partitionStates().add(partition);
             }
             return topicStates;
@@ -96,6 +102,7 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
                 .append(", controllerEpoch=").append(controllerEpoch)
                 .append(", brokerEpoch=").append(brokerEpoch)
                 .append(", partitionStates=").append(partitionStates)
+                .append(", topicIds=").append(topicIds)
                 .append(", liveLeaders=(").append(Utils.join(liveLeaders, ", 
")).append(")")
                 .append(")");
             return bld.toString();
@@ -129,15 +136,34 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
         Errors error = Errors.forException(e);
         responseData.setErrorCode(error.code());
 
-        List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
-        for (LeaderAndIsrPartitionState partition : partitionStates()) {
-            partitions.add(new LeaderAndIsrPartitionError()
-                .setTopicName(partition.topicName())
-                .setPartitionIndex(partition.partitionIndex())
-                .setErrorCode(error.code()));
+        if (version() < 5) {
+            List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
+            for (LeaderAndIsrPartitionState partition : partitionStates()) {
+                partitions.add(new LeaderAndIsrPartitionError()
+                        .setTopicName(partition.topicName())
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(error.code()));
+            }
+            responseData.setPartitionErrors(partitions);
+            return new LeaderAndIsrResponse(responseData, version());
+        }
+
+        List<LeaderAndIsrTopicError> topics = new 
ArrayList<>(data.topicStates().size());
+        Map<String, Uuid> topicIds = topicIds();
+        for (LeaderAndIsrTopicState topicState : data.topicStates()) {
+            LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError();
+            topicError.setTopicId(topicIds.get(topicState.topicName()));
+            List<LeaderAndIsrPartitionError> partitions = new 
ArrayList<>(topicState.partitionStates().size());
+            for (LeaderAndIsrPartitionState partition : 
topicState.partitionStates()) {
+                partitions.add(new LeaderAndIsrPartitionError()
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(error.code()));
+            }
+            topicError.setPartitionErrors(partitions);
+            topics.add(topicError);
         }
-        responseData.setPartitionErrors(partitions);
-        return new LeaderAndIsrResponse(responseData);
+        responseData.setTopics(topics);
+        return new LeaderAndIsrResponse(responseData, version());
     }
 
     @Override
@@ -162,6 +188,11 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
         return data.ungroupedPartitionStates();
     }
 
+    public Map<String, Uuid> topicIds() {
+        return data.topicStates().stream()
+                .collect(Collectors.toMap(LeaderAndIsrTopicState::topicName, 
LeaderAndIsrTopicState::topicId));
+    }
+
     public List<LeaderAndIsrLiveLeader> liveLeaders() {
         return Collections.unmodifiableList(data.liveLeaders());
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index 974dde8..60ab3d5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -16,15 +16,20 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.LeaderAndIsrResponseData;
+import 
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
 import 
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.FlattenedIterator;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
+import java.util.HashMap;
 import java.util.Map;
 
 public class LeaderAndIsrResponse extends AbstractResponse {
@@ -36,14 +41,24 @@ public class LeaderAndIsrResponse extends AbstractResponse {
      * STALE_BROKER_EPOCH (77)
      */
     private final LeaderAndIsrResponseData data;
+    private short version;
 
-    public LeaderAndIsrResponse(LeaderAndIsrResponseData data) {
+    public LeaderAndIsrResponse(LeaderAndIsrResponseData data, short version) {
         super(ApiKeys.LEADER_AND_ISR);
         this.data = data;
+        this.version = version;
     }
 
-    public List<LeaderAndIsrPartitionError> partitions() {
-        return data.partitionErrors();
+    public List<LeaderAndIsrTopicError> topics() {
+        return this.data.topics();
+    }
+
+    public Iterable<LeaderAndIsrPartitionError> partitions() {
+        if (version < 5) {
+            return data.partitionErrors();
+        }
+        return () -> new FlattenedIterator<>(data.topics().iterator(),
+            topic -> topic.partitionErrors().iterator());
     }
 
     public Errors error() {
@@ -53,22 +68,49 @@ public class LeaderAndIsrResponse extends AbstractResponse {
     @Override
     public Map<Errors, Integer> errorCounts() {
         Errors error = error();
-        if (error != Errors.NONE)
+        if (error != Errors.NONE) {
             // Minor optimization since the top-level error applies to all 
partitions
-            return Collections.singletonMap(error, 
data.partitionErrors().size() + 1);
-        Map<Errors, Integer> errors = 
errorCounts(data.partitionErrors().stream().map(l -> 
Errors.forCode(l.errorCode())));
-        // Top level error
+            if (version < 5) 
+                return Collections.singletonMap(error, 
data.partitionErrors().size() + 1);
+            return Collections.singletonMap(error, 
+                    data.topics().stream().mapToInt(t -> 
t.partitionErrors().size()).sum() + 1);
+        }
+        Map<Errors, Integer> errors;
+        if (version < 5)
+            errors = errorCounts(data.partitionErrors().stream().map(l -> 
Errors.forCode(l.errorCode())));
+        else
+            errors = errorCounts(data.topics().stream().flatMap(t -> 
t.partitionErrors().stream()).map(l ->
+                Errors.forCode(l.errorCode())));
         updateErrorCounts(errors, Errors.NONE);
         return errors;
     }
 
+    public Map<TopicPartition, Errors> partitionErrors(Map<Uuid, String> 
topicNames) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        if (version < 5) {
+            data.partitionErrors().forEach(partition ->
+                    errors.put(new TopicPartition(partition.topicName(), 
partition.partitionIndex()),
+                            Errors.forCode(partition.errorCode())));
+        } else {
+            for (LeaderAndIsrTopicError topic : data.topics()) {
+                String topicName = topicNames.get(topic.topicId());
+                if (topicName != null) {
+                    topic.partitionErrors().forEach(partition ->
+                            errors.put(new TopicPartition(topicName, 
partition.partitionIndex()),
+                                    Errors.forCode(partition.errorCode())));
+                }
+            }
+        }
+        return errors;
+    }
+
     @Override
     public int throttleTimeMs() {
         return DEFAULT_THROTTLE_TIME;
     }
 
     public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) 
{
-        return new LeaderAndIsrResponse(new LeaderAndIsrResponseData(new 
ByteBufferAccessor(buffer), version));
+        return new LeaderAndIsrResponse(new LeaderAndIsrResponseData(new 
ByteBufferAccessor(buffer), version), version);
     }
 
     @Override
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json 
b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index 8529688..129b7f7 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -21,8 +21,12 @@
   //
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
   //
-  // Version 3 adds AddingReplicas and RemovingReplicas
-  "validVersions": "0-4",
+  // Version 3 adds AddingReplicas and RemovingReplicas.
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 adds Topic ID and Type to the TopicStates, as described in 
KIP-516.
+  "validVersions": "0-5",
   "flexibleVersions": "4+",
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
@@ -31,6 +35,8 @@
       "about": "The current controller epoch." },
     { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": 
true, "default": "-1",
       "about": "The current broker epoch." },
+    { "name": "Type", "type": "int8", "versions": "5+",
+      "about": "The type that indicates whether all topics are included in the 
request"},
     { "name": "UngroupedPartitionStates", "type": 
"[]LeaderAndIsrPartitionState", "versions": "0-1",
       "about": "The state of each partition, in a v0 or v1 message." },
     // In v0 or v1 requests, each partition is listed alongside its topic name.
@@ -40,6 +46,8 @@
       "about": "Each topic.", "fields": [
       { "name": "TopicName", "type": "string", "versions": "2+", "entityType": 
"topicName",
         "about": "The topic name." },
+      { "name": "TopicId", "type": "uuid", "versions": "5+", "ignorable": true,
+        "about": "The unique topic ID." },
       { "name": "PartitionStates", "type": "[]LeaderAndIsrPartitionState", 
"versions": "2+",
         "about": "The state of each partition" }
     ]},
diff --git 
a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json 
b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
index 10c3cd9..dc5879b 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
@@ -22,15 +22,29 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the same as version 2.
-  "validVersions": "0-4",
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 removes TopicName and replaces it with TopicId and reorganizes 
+  // the partitions by topic, as described by KIP-516.
+  "validVersions": "0-5",
   "flexibleVersions": "4+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
-    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", 
"versions": "0+",
-      "about": "Each partition.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": 
"topicName",
-        "about": "The topic name." },
+    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", 
"versions": "0-4",
+      "about": "Each partition in v0 to v4 message."},
+    { "name":  "Topics", "type": "[]LeaderAndIsrTopicError", "versions": "5+",
+      "about": "Each topic", "fields": [
+      { "name": "TopicId", "type": "uuid", "versions": "5+", "about": "The 
unique topic ID" },
+      { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", 
"versions": "5+",
+        "about": "Each partition."}
+      ]}
+    ],
+    "commonStructs": [
+    { "name": "LeaderAndIsrPartitionError", "versions": "0+", "fields": [
+      { "name": "TopicName", "type": "string", "versions": "0-4", 
"entityType": "topicName", "ignorable": true,
+        "about": "The topic name."},
       { "name": "PartitionIndex", "type": "int32", "versions": "0+",
         "about": "The partition index." },
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
index 939514e..c45682f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.LeaderAndIsrRequestData;
@@ -31,8 +32,10 @@ import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -50,19 +53,25 @@ public class LeaderAndIsrRequestTest {
     public void testUnsupportedVersion() {
         LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(
                 (short) (LEADER_AND_ISR.latestVersion() + 1), 0, 0, 0,
-                Collections.emptyList(), Collections.emptySet());
+                Collections.emptyList(), Collections.emptyMap(), 
Collections.emptySet());
         assertThrows(UnsupportedVersionException.class, builder::build);
     }
 
     @Test
     public void testGetErrorResponse() {
+        Uuid id = Uuid.randomUuid();
         for (short version = LEADER_AND_ISR.oldestVersion(); version < 
LEADER_AND_ISR.latestVersion(); version++) {
             LeaderAndIsrRequest.Builder builder = new 
LeaderAndIsrRequest.Builder(version, 0, 0, 0,
-                    Collections.emptyList(), Collections.emptySet());
+                    Collections.emptyList(), Collections.singletonMap("topic", 
id), Collections.emptySet());
             LeaderAndIsrRequest request = builder.build();
             LeaderAndIsrResponse response = request.getErrorResponse(0,
                     new ClusterAuthorizationException("Not authorized"));
             assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, 
response.error());
+            if (version < 5) {
+                assertEquals(0, response.topics().size());
+            } else {
+                assertEquals(id, response.topics().get(0).topicId());
+            }
         }
     }
 
@@ -115,8 +124,13 @@ public class LeaderAndIsrRequestTest {
                 new Node(0, "host0", 9090),
                 new Node(1, "host1", 9091)
             );
+
+            Map<String, Uuid> topicIds = new HashMap<>();
+            topicIds.put("topic0", Uuid.randomUuid());
+            topicIds.put("topic1", Uuid.randomUuid());
+
             LeaderAndIsrRequest request = new 
LeaderAndIsrRequest.Builder(version, 1, 2, 3, partitionStates,
-                liveNodes).build();
+                topicIds, liveNodes).build();
 
             List<LeaderAndIsrLiveLeader> liveLeaders = 
liveNodes.stream().map(n -> new LeaderAndIsrLiveLeader()
                 .setBrokerId(n.id())
@@ -140,7 +154,21 @@ public class LeaderAndIsrRequestTest {
                     .setRemovingReplicas(emptyList());
             }
 
+            // Prior to version 2, there were no TopicStates, so a map of 
Topic Ids from a list of
+            // TopicStates is an empty map.
+            if (version < 2) {
+                topicIds = new HashMap<>();
+            }
+
+            //  In versions 2-4 there are TopicStates, but no topicIds, so 
deserialized requests will have
+            //  Zero Uuids in place.
+            if (version > 1 && version < 5) {
+                topicIds.put("topic0", Uuid.ZERO_UUID);
+                topicIds.put("topic1", Uuid.ZERO_UUID);
+            }
+
             assertEquals(new HashSet<>(partitionStates), 
iterableToSet(deserializedRequest.partitionStates()));
+            assertEquals(topicIds, deserializedRequest.topicIds());
             assertEquals(liveLeaders, deserializedRequest.liveLeaders());
             assertEquals(1, request.controllerId());
             assertEquals(2, request.controllerEpoch());
@@ -152,13 +180,15 @@ public class LeaderAndIsrRequestTest {
     public void testTopicPartitionGroupingSizeReduction() {
         Set<TopicPartition> tps = TestUtils.generateRandomTopicPartitions(10, 
10);
         List<LeaderAndIsrPartitionState> partitionStates = new ArrayList<>();
+        Map<String, Uuid> topicIds = new HashMap<>();
         for (TopicPartition tp : tps) {
             partitionStates.add(new LeaderAndIsrPartitionState()
                 .setTopicName(tp.topic())
                 .setPartitionIndex(tp.partition()));
+            topicIds.put(tp.topic(), Uuid.randomUuid());
         }
         LeaderAndIsrRequest.Builder builder = new 
LeaderAndIsrRequest.Builder((short) 2, 0, 0, 0,
-            partitionStates, Collections.emptySet());
+            partitionStates, topicIds, Collections.emptySet());
 
         LeaderAndIsrRequest v2 = builder.build((short) 2);
         LeaderAndIsrRequest v1 = builder.build((short) 1);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
index fbd7d48..9940e55 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.Uuid;
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
 import org.apache.kafka.common.message.LeaderAndIsrResponseData;
+import 
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
 import 
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -29,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -57,46 +60,87 @@ public class LeaderAndIsrResponseTest {
             .setZkVersion(20)
             .setReplicas(Collections.singletonList(10))
             .setIsNew(false));
+        Map<String, Uuid> topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+
         LeaderAndIsrRequest request = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(),
-                15, 20, 0, partitionStates, Collections.emptySet()).build();
+                15, 20, 0, partitionStates, topicIds, 
Collections.emptySet()).build();
         LeaderAndIsrResponse response = request.getErrorResponse(0, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
         
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 3), 
response.errorCounts());
     }
 
     @Test
     public void testErrorCountsWithTopLevelError() {
-        List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
-            asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
-        LeaderAndIsrResponse response = new LeaderAndIsrResponse(new 
LeaderAndIsrResponseData()
-            .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
-            .setPartitionErrors(partitions));
-        assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 3), 
response.errorCounts());
+        for (short version = LEADER_AND_ISR.oldestVersion(); version < 
LEADER_AND_ISR.latestVersion(); version++) {
+            LeaderAndIsrResponse response;
+            if (version < 5) {
+                List<LeaderAndIsrPartitionError> partitions = 
createPartitions("foo",
+                        asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
+                response = new LeaderAndIsrResponse(new 
LeaderAndIsrResponseData()
+                        .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+                        .setPartitionErrors(partitions), version);
+            } else {
+                Uuid id = Uuid.randomUuid();
+                List<LeaderAndIsrTopicError> topics = createTopic(id, 
asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
+                response = new LeaderAndIsrResponse(new 
LeaderAndIsrResponseData()
+                        .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+                        .setTopics(topics), version); 
+            }
+            assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 
3), response.errorCounts());
+        }
     }
 
     @Test
     public void testErrorCountsNoTopLevelError() {
-        List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
-            asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
-        LeaderAndIsrResponse response = new LeaderAndIsrResponse(new 
LeaderAndIsrResponseData()
-            .setErrorCode(Errors.NONE.code())
-            .setPartitionErrors(partitions));
-        Map<Errors, Integer> errorCounts = response.errorCounts();
-        assertEquals(2, errorCounts.size());
-        assertEquals(2, errorCounts.get(Errors.NONE).intValue());
-        assertEquals(1, 
errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue());
+        for (short version = LEADER_AND_ISR.oldestVersion(); version < 
LEADER_AND_ISR.latestVersion(); version++) {
+            LeaderAndIsrResponse response;
+            if (version < 5) {
+                List<LeaderAndIsrPartitionError> partitions = 
createPartitions("foo",
+                        asList(Errors.NONE, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+                response = new LeaderAndIsrResponse(new 
LeaderAndIsrResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setPartitionErrors(partitions), version);
+            } else {
+                Uuid id = Uuid.randomUuid();
+                List<LeaderAndIsrTopicError> topics = createTopic(id, 
asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
+                response = new LeaderAndIsrResponse(new 
LeaderAndIsrResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setTopics(topics), version);
+            }
+            Map<Errors, Integer> errorCounts = response.errorCounts();
+            assertEquals(2, errorCounts.size());
+            assertEquals(2, errorCounts.get(Errors.NONE).intValue());
+            assertEquals(1, 
errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue());
+        }
     }
 
     @Test
     public void testToString() {
-        List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
-            asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
-        LeaderAndIsrResponse response = new LeaderAndIsrResponse(new 
LeaderAndIsrResponseData()
-            .setErrorCode(Errors.NONE.code())
-            .setPartitionErrors(partitions));
-        String responseStr = response.toString();
-        
assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName()));
-        assertTrue(responseStr.contains(partitions.toString()));
-        assertTrue(responseStr.contains("errorCode=" + Errors.NONE.code()));
+        for (short version = LEADER_AND_ISR.oldestVersion(); version < 
LEADER_AND_ISR.latestVersion(); version++) {
+            LeaderAndIsrResponse response;
+            if (version < 5) {
+                List<LeaderAndIsrPartitionError> partitions = 
createPartitions("foo",
+                        asList(Errors.NONE, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+                response = new LeaderAndIsrResponse(new 
LeaderAndIsrResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setPartitionErrors(partitions), version);
+                String responseStr = response.toString();
+                
assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName()));
+                assertTrue(responseStr.contains(partitions.toString()));
+                assertTrue(responseStr.contains("errorCode=" + 
Errors.NONE.code()));
+
+            } else {
+                Uuid id = Uuid.randomUuid();
+                List<LeaderAndIsrTopicError> topics = createTopic(id, 
asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
+                response = new LeaderAndIsrResponse(new 
LeaderAndIsrResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setTopics(topics), version);
+                String responseStr = response.toString();
+                
assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName()));
+                assertTrue(responseStr.contains(topics.toString()));
+                assertTrue(responseStr.contains(id.toString()));
+                assertTrue(responseStr.contains("errorCode=" + 
Errors.NONE.code()));
+            }
+        }
     }
 
     private List<LeaderAndIsrPartitionError> createPartitions(String 
topicName, List<Errors> errors) {
@@ -104,11 +148,27 @@ public class LeaderAndIsrResponseTest {
         int partitionIndex = 0;
         for (Errors error : errors) {
             partitions.add(new LeaderAndIsrPartitionError()
-                .setTopicName(topicName)
+                    .setTopicName(topicName)
+                    .setPartitionIndex(partitionIndex++)
+                    .setErrorCode(error.code()));
+        }
+        return partitions;
+    }
+
+    private List<LeaderAndIsrTopicError> createTopic(Uuid id, List<Errors> 
errors) {
+        List<LeaderAndIsrTopicError> topics = new ArrayList<>();
+        LeaderAndIsrTopicError topic = new LeaderAndIsrTopicError();
+        topic.setTopicId(id);
+        List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
+        int partitionIndex = 0;
+        for (Errors error : errors) {
+            partitions.add(new LeaderAndIsrPartitionError()
                 .setPartitionIndex(partitionIndex++)
                 .setErrorCode(error.code()));
         }
-        return partitions;
+        topic.setPartitionErrors(partitions);
+        topics.add(topic);
+        return topics;
     }
 
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8f9a4dc..fdf541c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
@@ -320,13 +321,12 @@ public class RequestResponseTest {
             checkResponse(createStopReplicaResponse(), v, true);
         }
 
-        checkRequest(createLeaderAndIsrRequest(0), true);
-        checkErrorResponse(createLeaderAndIsrRequest(0), 
unknownServerException, false);
-        checkRequest(createLeaderAndIsrRequest(1), true);
-        checkErrorResponse(createLeaderAndIsrRequest(1), 
unknownServerException, false);
-        checkRequest(createLeaderAndIsrRequest(2), true);
-        checkErrorResponse(createLeaderAndIsrRequest(2), 
unknownServerException, false);
-        checkResponse(createLeaderAndIsrResponse(), 0, true);
+        for (int v = ApiKeys.LEADER_AND_ISR.oldestVersion(); v <= 
ApiKeys.LEADER_AND_ISR.latestVersion(); v++) {
+            checkRequest(createLeaderAndIsrRequest(v), true);
+            checkErrorResponse(createLeaderAndIsrRequest(v), 
unknownServerException, false);
+            checkResponse(createLeaderAndIsrResponse(v), v, true);
+        }
+        
         checkRequest(createSaslHandshakeRequest(), true);
         checkErrorResponse(createSaslHandshakeRequest(), 
unknownServerException, true);
         checkResponse(createSaslHandshakeResponse(), 0, true);
@@ -1550,18 +1550,37 @@ public class RequestResponseTest {
                 new Node(0, "test0", 1223),
                 new Node(1, "test1", 1223)
         );
-        return new LeaderAndIsrRequest.Builder((short) version, 1, 10, 0, 
partitionStates, leaders).build();
+
+        Map<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put("topic5", Uuid.randomUuid());
+        topicIds.put("topic20", Uuid.randomUuid());
+
+        return new LeaderAndIsrRequest.Builder((short) version, 1, 10, 0,
+                partitionStates, topicIds, leaders).build();
     }
 
-    private LeaderAndIsrResponse createLeaderAndIsrResponse() {
-        List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError> partitions = 
new ArrayList<>();
-        partitions.add(new 
LeaderAndIsrResponseData.LeaderAndIsrPartitionError()
-            .setTopicName("test")
-            .setPartitionIndex(0)
-            .setErrorCode(Errors.NONE.code()));
-        return new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-            .setErrorCode(Errors.NONE.code())
-            .setPartitionErrors(partitions));
+    private LeaderAndIsrResponse createLeaderAndIsrResponse(int version) {
+        if (version < 5) {
+            List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError> 
partitions = new ArrayList<>();
+            partitions.add(new 
LeaderAndIsrResponseData.LeaderAndIsrPartitionError()
+                    .setTopicName("test")
+                    .setPartitionIndex(0)
+                    .setErrorCode(Errors.NONE.code()));
+            return new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setPartitionErrors(partitions), (short) version);
+        } else {
+            List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError> 
partition = Collections.singletonList(
+                    new LeaderAndIsrResponseData.LeaderAndIsrPartitionError()
+                    .setPartitionIndex(0)
+                    .setErrorCode(Errors.NONE.code()));
+            List<LeaderAndIsrResponseData.LeaderAndIsrTopicError> topics = new 
ArrayList<>();
+            topics.add(new LeaderAndIsrResponseData.LeaderAndIsrTopicError()
+                    .setTopicId(Uuid.randomUuid())
+                    .setPartitionErrors(partition));
+            return new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+                    .setTopics(topics), (short) version);
+        }
     }
 
     private UpdateMetadataRequest createUpdateMetadataRequest(int version, 
String rack) {
@@ -1600,6 +1619,10 @@ public class RequestResponseTest {
                 .setReplicas(replicas)
                 .setOfflineReplicas(offlineReplicas));
 
+        Map<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put("topic5", Uuid.randomUuid());
+        topicIds.put("topic20", Uuid.randomUuid());
+
         SecurityProtocol plaintext = SecurityProtocol.PLAINTEXT;
         List<UpdateMetadataEndpoint> endpoints1 = new ArrayList<>();
         endpoints1.add(new UpdateMetadataEndpoint()
@@ -2541,7 +2564,8 @@ public class RequestResponseTest {
         assertEquals(Integer.valueOf(1), 
createHeartBeatResponse().errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(1), 
createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(1), 
createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE));
-        assertEquals(Integer.valueOf(2), 
createLeaderAndIsrResponse().errorCounts().get(Errors.NONE));
+        assertEquals(Integer.valueOf(2), 
createLeaderAndIsrResponse(4).errorCounts().get(Errors.NONE));
+        assertEquals(Integer.valueOf(2), 
createLeaderAndIsrResponse(5).errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(3), 
createLeaderEpochResponse().errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(1), 
createLeaveGroupResponse().errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(1), 
createListGroupsResponse(LIST_GROUPS.latestVersion()).errorCounts().get(Errors.NONE));
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala 
b/core/src/main/scala/kafka/api/ApiVersion.scala
index a019032..c859f8d 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -111,7 +111,7 @@ object ApiVersion {
     KAFKA_2_7_IV2,
     // Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch.
     KAFKA_2_8_IV0,
-  // Add topicId to MetadataUpdateRequest
+    // Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
     KAFKA_2_8_IV1
   )
 
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b716552..21a445b 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -455,7 +455,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
 
   private def sendLeaderAndIsrRequest(controllerEpoch: Int, stateChangeLog: 
StateChangeLogger): Unit = {
     val leaderAndIsrRequestVersion: Short =
-      if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 4
+      if (config.interBrokerProtocolVersion >= KAFKA_2_8_IV1) 5
+      else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 4
       else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV0) 3
       else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2
       else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
@@ -482,8 +483,13 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
           _.node(config.interBrokerListenerName)
         }
         val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
+        val topicIds = leaderAndIsrPartitionStates.keys
+          .map(_.topic)
+          .toSet[String]
+          .map(topic => (topic, controllerContext.topicIds(topic)))
+          .toMap
         val leaderAndIsrRequestBuilder = new 
LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
-          controllerEpoch, brokerEpoch, 
leaderAndIsrPartitionStates.values.toBuffer.asJava, leaders.asJava)
+          controllerEpoch, brokerEpoch, 
leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, 
leaders.asJava)
         sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) 
=> {
           val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
           sendEvent(LeaderAndIsrResponseReceived(leaderAndIsrResponse, broker))
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index fe14d42..b382fd9 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1378,11 +1378,10 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, 
partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+    
leaderAndIsrResponse.partitionErrors(controllerContext.topicNames.asJava).forEach{
 case (tp, error) =>
+      if (error.code() == Errors.KAFKA_STORAGE_ERROR.code)
         offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
+      else if (error.code() == Errors.NONE.code)
         onlineReplicas += tp
     }
 
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 438f234..47d6b92 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -32,7 +32,7 @@ import kafka.message.{BrokerCompressionCodec, 
CompressionCodec, NoCompressionCod
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, 
FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, 
LogOffsetMetadata, OffsetAndEpoch}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, 
FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, 
LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile}
 import kafka.utils._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.message.FetchResponseData
@@ -43,7 +43,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET
 import org.apache.kafka.common.requests.ProduceResponse.RecordError
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{InvalidRecordException, KafkaException, 
TopicPartition}
+import org.apache.kafka.common.{InvalidRecordException, KafkaException, 
TopicPartition, Uuid}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@@ -296,11 +296,16 @@ class Log(@volatile private var _dir: File,
   // Visible for testing
   @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
 
+  @volatile var partitionMetadataFile : Option[PartitionMetadataFile] = None
+
+  @volatile var topicId : Uuid = Uuid.ZERO_UUID
+
   locally {
     // create the log directory if it doesn't exist
     Files.createDirectories(dir.toPath)
 
     initializeLeaderEpochCache()
+    initializePartitionMetadata()
 
     val nextOffset = loadSegments()
 
@@ -324,6 +329,12 @@ class Log(@volatile private var _dir: File,
     // deletion.
     
producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq)
     loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown)
+
+    // Recover topic ID if present
+    partitionMetadataFile.foreach { file =>
+      if (!file.isEmpty())
+        topicId = file.read().topicId
+    }
   }
 
   def dir: File = _dir
@@ -536,6 +547,11 @@ class Log(@volatile private var _dir: File,
 
   private def recordVersion: RecordVersion = 
config.messageFormatVersion.recordVersion
 
+  private def initializePartitionMetadata(): Unit = lock synchronized {
+    val partitionMetadata = PartitionMetadataFile.newFile(dir)
+    partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata, 
logDirFailureChannel))
+  }
+
   private def initializeLeaderEpochCache(): Unit = lock synchronized {
     val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
 
@@ -1003,6 +1019,7 @@ class Log(@volatile private var _dir: File,
           // re-initialize leader epoch cache so that 
LeaderEpochCheckpointFile.checkpoint can correctly reference
           // the checkpoint file in renamed log directory
           initializeLeaderEpochCache()
+          initializePartitionMetadata()
         }
       }
     }
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala 
b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
new file mode 100644
index 0000000..1adcbc3
--- /dev/null
+++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, 
IOException, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{FileAlreadyExistsException, Files, Paths}
+import java.util.regex.Pattern
+
+import kafka.utils.Logging
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.utils.Utils
+
+
+
+object PartitionMetadataFile {
+  private val PartitionMetadataFilename = "partition.metadata"
+  private val WhiteSpacesPattern = Pattern.compile(":\\s+")
+  private val CurrentVersion = 0
+
+  def newFile(dir: File): File = new File(dir, PartitionMetadataFilename)
+
+  object PartitionMetadataFileFormatter {
+    def toFile(data: PartitionMetadata): String = {
+      s"version: ${data.version}\ntopic_id: ${data.topicId}"
+    }
+
+  }
+
+  class PartitionMetadataReadBuffer[T](location: String,
+                                       reader: BufferedReader,
+                                       version: Int) extends Logging {
+    def read(): PartitionMetadata = {
+      def malformedLineException(line: String) =
+        new IOException(s"Malformed line in checkpoint file ($location): 
'$line'")
+
+      var line: String = null
+      var metadataTopicId: Uuid = null
+      try {
+        line = reader.readLine()
+        WhiteSpacesPattern.split(line) match {
+          case Array(_, version) =>
+            if (version.toInt == CurrentVersion) {
+              line = reader.readLine()
+              WhiteSpacesPattern.split(line) match {
+                case Array(_, topicId) => metadataTopicId = 
Uuid.fromString(topicId)
+                case _ => throw malformedLineException(line)
+              }
+              if (metadataTopicId.equals(Uuid.ZERO_UUID)) {
+                throw new IOException(s"Invalid topic ID in partition metadata 
file ($location)")
+              }
+              new PartitionMetadata(CurrentVersion, metadataTopicId)
+            } else {
+              throw new IOException(s"Unrecognized version of partition 
metadata file ($location): " + version)
+            }
+          case _ => throw malformedLineException(line)
+        }
+      } catch {
+        case _: NumberFormatException => throw malformedLineException(line)
+      }
+    }
+  }
+
+}
+
+class PartitionMetadata(val version: Int, val topicId: Uuid)
+
+
+class PartitionMetadataFile(val file: File,
+                            logDirFailureChannel: LogDirFailureChannel) 
extends Logging {
+  import kafka.server.PartitionMetadataFile.{CurrentVersion, 
PartitionMetadataFileFormatter, PartitionMetadataReadBuffer}
+
+  private val path = file.toPath.toAbsolutePath
+  private val tempPath = Paths.get(path.toString + ".tmp")
+  private val lock = new Object()
+  private val logDir = file.getParentFile.getParent
+
+
+  try Files.createFile(file.toPath) // create the file if it doesn't exist
+  catch { case _: FileAlreadyExistsException => }
+
+  def write(topicId: Uuid): Unit = {
+    lock synchronized {
+      try {
+        // write to temp file and then swap with the existing file
+        val fileOutputStream = new FileOutputStream(tempPath.toFile)
+        val writer = new BufferedWriter(new 
OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
+        try {
+          writer.write(PartitionMetadataFileFormatter.toFile(new 
PartitionMetadata(CurrentVersion,topicId)))
+          writer.flush()
+          fileOutputStream.getFD().sync()
+        } finally {
+          writer.close()
+        }
+
+        Utils.atomicMoveWithFallback(tempPath, path)
+      } catch {
+        case e: IOException =>
+          val msg = s"Error while writing to partition metadata file 
${file.getAbsolutePath}"
+          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
+          throw new KafkaStorageException(msg, e)
+      }
+    }
+  }
+
+  def read(): PartitionMetadata = {
+    lock synchronized {
+      try {
+        val reader = Files.newBufferedReader(path)
+        try {
+          val partitionBuffer = new 
PartitionMetadataReadBuffer(file.getAbsolutePath, reader, CurrentVersion)
+          partitionBuffer.read()
+        } finally {
+          reader.close()
+        }
+      } catch {
+        case e: IOException =>
+          val msg = s"Error while reading partition metadata file 
${file.getAbsolutePath}"
+          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
+          throw new KafkaStorageException(msg, e)
+      }
+    }
+  }
+
+  def isEmpty(): Boolean = {
+    file.length() == 0
+  }
+}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index ea44021..a3934a5 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -36,12 +36,13 @@ import kafka.server.checkpoints.{LazyOffsetCheckpoints, 
OffsetCheckpointFile, Of
 import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, 
TopicPartition}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, 
TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import 
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
 import org.apache.kafka.common.message.{DescribeLogDirsResponseData, 
FetchResponseData, LeaderAndIsrResponseData}
+import 
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError
 import 
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{OffsetForLeaderTopicResult,
 EpochEndOffset}
@@ -1331,6 +1332,7 @@ class ReplicaManager(val config: KafkaConfig,
             s"correlation id $correlationId from controller $controllerId " +
             s"epoch ${leaderAndIsrRequest.controllerEpoch}")
         }
+      val topicIds = leaderAndIsrRequest.topicIds()
 
       val response = {
         if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
@@ -1437,6 +1439,24 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if topic ID is in memory, if not, it must be new to 
the broker and does not have a metadata file.
+                // This is because if the broker previously wrote it to file, 
it would be recovered on restart after failure.
+                if (log.topicId.equals(Uuid.ZERO_UUID)) {
+                  log.partitionMetadataFile.get.write(id)
+                  log.topicId = id
+                  // Warn if the topic ID in the request does not match the 
log.
+                } else if (!log.topicId.equals(id)) {
+                  stateChangeLogger.warn(s"Topic Id in memory: 
${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${id.toString}.")
+                }
+              }
+            }
           }
 
           // we initialize highwatermark thread after the first 
leaderisrrequest. This ensures that all the partitions
@@ -1448,15 +1468,38 @@ class ReplicaManager(val config: KafkaConfig,
           replicaFetcherManager.shutdownIdleFetcherThreads()
           replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
           onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-          val responsePartitions = responseMap.iterator.map { case (tp, error) 
=>
-            new LeaderAndIsrPartitionError()
-              .setTopicName(tp.topic)
-              .setPartitionIndex(tp.partition)
-              .setErrorCode(error.code)
-          }.toBuffer
-          new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-            .setErrorCode(Errors.NONE.code)
-            .setPartitionErrors(responsePartitions.asJava))
+          if (leaderAndIsrRequest.version() < 5) {
+            val responsePartitions = responseMap.iterator.map { case (tp, 
error) =>
+              new LeaderAndIsrPartitionError()
+                .setTopicName(tp.topic)
+                .setPartitionIndex(tp.partition)
+                .setErrorCode(error.code)
+            }.toBuffer
+            new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+              .setErrorCode(Errors.NONE.code)
+              .setPartitionErrors(responsePartitions.asJava), 
leaderAndIsrRequest.version())
+          } else {
+            val topics = new mutable.HashMap[String, 
List[LeaderAndIsrPartitionError]]
+            responseMap.asJava.forEach { case (tp, error) =>
+              if (!topics.contains(tp.topic)) {
+                topics.put(tp.topic, List(new LeaderAndIsrPartitionError()
+                                                                
.setPartitionIndex(tp.partition)
+                                                                
.setErrorCode(error.code)))
+              } else {
+                topics.put(tp.topic, new LeaderAndIsrPartitionError()
+                  .setPartitionIndex(tp.partition)
+                  .setErrorCode(error.code)::topics(tp.topic))
+              }
+            }
+            val topicErrors = topics.iterator.map { case (topic, 
partitionError) =>
+              new LeaderAndIsrTopicError()
+                .setTopicId(topicIds.get(topic))
+                .setPartitionErrors(partitionError.asJava)
+            }.toBuffer
+            new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+              .setErrorCode(Errors.NONE.code)
+              .setTopics(topicErrors.asJava), leaderAndIsrRequest.version())
+          }
         }
       }
       val endMs = time.milliseconds()
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index c8122d7..7f84dab 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -507,7 +507,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
   /**
    * Sets the topic znode with the given assignment.
    * @param topic the topic whose assignment is being set.
-   * @param topicId optional topic ID if the topic has one
+   * @param topicId unique topic ID for the topic
    * @param assignment the partition to replica mapping to set for the given 
topic
    * @param expectedControllerEpochZkVersion expected controller epoch 
zkVersion.
    * @return SetDataResponse
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 11faf35..0ca1e17 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -56,7 +56,7 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL, 
PREFIXED}
 import org.apache.kafka.common.resource.ResourceType._
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
-import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, 
TopicPartition, requests}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, 
TopicPartition, requests, Uuid}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -99,6 +99,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   val brokerId: Integer = 0
   val topic = "topic"
+  val topicId = Uuid.randomUuid()
   val topicPattern = "topic.*"
   val transactionalId = "transactional.id"
   val producerId = 83392L
@@ -106,6 +107,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val correlationId = 0
   val clientId = "client-Id"
   val tp = new TopicPartition(topic, part)
+  val topicIds = Collections.singletonMap(topic, topicId)
+  val topicNames = Collections.singletonMap(topicId, topic)
   val logDir = "logDir"
   val group = "my-group"
   val protocolType = "consumer"
@@ -181,7 +184,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
     ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)),
     ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => 
Errors.forCode(
-      resp.partitions.asScala.find(p => p.topicName == tp.topic && 
p.partitionIndex == tp.partition).get.errorCode)),
+      resp.topics.asScala.find(t => topicNames.get(t.topicId) == 
tp.topic).get.partitionErrors.asScala.find(
+        p => p.partitionIndex == tp.partition).get.errorCode)),
     ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => 
Errors.forCode(
       resp.partitionErrors.asScala.find(pe => pe.topicName == tp.topic && 
pe.partitionIndex == tp.partition).get.errorCode)),
     ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: 
requests.ControlledShutdownResponse) => resp.error),
@@ -474,6 +478,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
         .setZkVersion(2)
         .setReplicas(Seq(brokerId).asJava)
         .setIsNew(false)).asJava,
+      topicIds,
       Set(new Node(brokerId, "localhost", 0)).asJava).build()
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
index 837ac9e..7a4d0ba 100644
--- 
a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
@@ -30,6 +30,7 @@ import 
org.apache.kafka.common.message.StopReplicaResponseData.StopReplicaPartit
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractControlRequest, 
AbstractResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, 
StopReplicaRequest, StopReplicaResponse, UpdateMetadataRequest, 
UpdateMetadataResponse}
+import 
org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.Assert._
 import org.junit.Test
@@ -72,6 +73,8 @@ class ControllerChannelManagerTest {
     assertEquals(1, updateMetadataRequests.size)
 
     val leaderAndIsrRequest = leaderAndIsrRequests.head
+    val topicIds = leaderAndIsrRequest.topicIds();
+    val topicNames = topicIds.asScala.map { case (k, v) => (v, k) }
     assertEquals(controllerId, leaderAndIsrRequest.controllerId)
     assertEquals(controllerEpoch, leaderAndIsrRequest.controllerEpoch)
     assertEquals(partitions.keySet,
@@ -87,7 +90,10 @@ class ControllerChannelManagerTest {
     val LeaderAndIsrResponseReceived(leaderAndIsrResponse, brokerId) = 
batch.sentEvents.head
     assertEquals(2, brokerId)
     assertEquals(partitions.keySet,
-      leaderAndIsrResponse.partitions.asScala.map(p => new 
TopicPartition(p.topicName, p.partitionIndex)).toSet)
+      leaderAndIsrResponse.topics.asScala.flatMap(t => 
t.partitionErrors.asScala.map(p =>
+        new TopicPartition(topicNames(t.topicId), p.partitionIndex))).toSet)
+    leaderAndIsrResponse.topics.forEach(topic =>
+      assertEquals(topicIds.get(topicNames.get(topic.topicId).get), 
topic.topicId))
   }
 
   @Test
@@ -157,7 +163,8 @@ class ControllerChannelManagerTest {
 
     for (apiVersion <- ApiVersion.allVersions) {
       val leaderAndIsrRequestVersion: Short =
-        if (apiVersion >= KAFKA_2_4_IV1) 4
+        if (apiVersion >= KAFKA_2_8_IV1) 5
+        else if (apiVersion >= KAFKA_2_4_IV1) 4
         else if (apiVersion >= KAFKA_2_4_IV0) 3
         else if (apiVersion >= KAFKA_2_2_IV0) 2
         else if (apiVersion >= KAFKA_1_0_IV0) 1
@@ -187,6 +194,21 @@ class ControllerChannelManagerTest {
     assertEquals(1, leaderAndIsrRequests.size)
     assertEquals(s"IBP $interBrokerProtocolVersion should use version 
$expectedLeaderAndIsrVersion",
       expectedLeaderAndIsrVersion, leaderAndIsrRequests.head.version)
+    
+    val request = leaderAndIsrRequests.head
+    val byteBuffer = request.serialize
+    val deserializedRequest = LeaderAndIsrRequest.parse(byteBuffer, 
expectedLeaderAndIsrVersion)
+    
+    if (interBrokerProtocolVersion >= KAFKA_2_8_IV1) {
+      assertTrue(!request.topicIds().get("foo").equals(Uuid.ZERO_UUID))
+      
assertTrue(!deserializedRequest.topicIds().get("foo").equals(Uuid.ZERO_UUID))
+    } else if (interBrokerProtocolVersion >= KAFKA_2_2_IV0) {
+      assertTrue(!request.topicIds().get("foo").equals(Uuid.ZERO_UUID))
+      
assertTrue(deserializedRequest.topicIds().get("foo").equals(Uuid.ZERO_UUID))
+    } else {
+      assertTrue(request.topicIds().get("foo") == null)
+      assertTrue(deserializedRequest.topicIds().get("foo") == null)
+    }
   }
 
   @Test
@@ -827,15 +849,18 @@ class ControllerChannelManagerTest {
   private def applyLeaderAndIsrResponseCallbacks(error: Errors, sentRequests: 
List[SentRequest]): Unit = {
     sentRequests.filter(_.request.apiKey == 
ApiKeys.LEADER_AND_ISR).filter(_.responseCallback != null).foreach { 
sentRequest =>
       val leaderAndIsrRequest = 
sentRequest.request.build().asInstanceOf[LeaderAndIsrRequest]
-      val partitionErrors = leaderAndIsrRequest.partitionStates.asScala.map(p 
=>
-        new LeaderAndIsrPartitionError()
-          .setTopicName(p.topicName)
-          .setPartitionIndex(p.partitionIndex)
-          .setErrorCode(error.code))
+      val topicIds = leaderAndIsrRequest.topicIds
+      val topicErrors = leaderAndIsrRequest.data.topicStates.asScala.map(t =>
+        new LeaderAndIsrTopicError()
+          .setTopicId(topicIds.get(t.topicName))
+          .setPartitionErrors(t.partitionStates.asScala.map(p =>
+            new LeaderAndIsrPartitionError()
+              .setPartitionIndex(p.partitionIndex)
+              .setErrorCode(error.code)).asJava))
       val leaderAndIsrResponse = new LeaderAndIsrResponse(
         new LeaderAndIsrResponseData()
           .setErrorCode(error.code)
-          .setPartitionErrors(partitionErrors.toBuffer.asJava))
+          .setTopics(topicErrors.toBuffer.asJava), 
leaderAndIsrRequest.version())
       sentRequest.responseCallback(leaderAndIsrResponse)
     }
   }
@@ -871,6 +896,11 @@ class ControllerChannelManagerTest {
     }.toMap
 
     context.setLiveBrokers(brokerEpochs)
+    context.setAllTopics(topics)
+
+    for (topic <- topics) {
+      context.addTopicId(topic, Uuid.randomUuid())
+    }
 
     // Simple round-robin replica assignment
     var leaderIndex = 0
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a13bedc..031000d 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -228,8 +228,8 @@ class LogManagerTest {
       s.lazyTimeIndex.get
     })
 
-    // there should be a log file, two indexes, one producer snapshot, and the 
leader epoch checkpoint
-    assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 
1, log.dir.list.length)
+    // there should be a log file, two indexes, one producer snapshot, 
partition metadata, and the leader epoch checkpoint
+    assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 
2, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, readLog(log, offset 
+ 1).records.sizeInBytes)
 
     try {
@@ -278,8 +278,8 @@ class LogManagerTest {
     time.sleep(log.config.fileDeleteDelayMs + 1)
 
     // there should be a log file, two indexes (the txn index is created 
lazily),
-    // and a producer snapshot file per segment, and the leader epoch 
checkpoint.
-    assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 
1, log.dir.list.length)
+    // and a producer snapshot file per segment, and the leader epoch 
checkpoint and partition metadata file.
+    assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 
2, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, readLog(log, offset 
+ 1).records.sizeInBytes)
     try {
       readLog(log, 0)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index ce52c6b..b107c21 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -30,9 +30,9 @@ import kafka.log.Log.DeleteDirSuffix
 import kafka.metrics.KafkaYammerMetrics
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
-import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo, 
FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, 
KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo, 
FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, 
KafkaConfig, LogDirFailureChannel, LogOffsetMetadata, PartitionMetadataFile}
 import kafka.utils._
-import org.apache.kafka.common.{InvalidRecordException, KafkaException, 
TopicPartition}
+import org.apache.kafka.common.{InvalidRecordException, KafkaException, 
TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
@@ -2372,6 +2372,21 @@ class LogTest {
     log.close()
   }
 
+  @Test
+  def testLogRecoversTopicId(): Unit = {
+    val logConfig = LogTest.createLogConfig()
+    var log = createLog(logDir, logConfig)
+
+    val topicId = Uuid.randomUuid()
+    log.partitionMetadataFile.get.write(topicId)
+    log.close()
+
+    // test recovery case
+    log = createLog(logDir, logConfig)
+    assertTrue(log.topicId == topicId)
+    log.close()
+  }
+
   /**
    * Test building the time index on the follower by setting assignOffsets to 
false.
    */
@@ -2907,6 +2922,33 @@ class LogTest {
   }
 
   @Test
+  def testTopicIdTransfersAfterDirectoryRename(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+
+    // Write a topic ID to the partition metadata file to ensure it is 
transferred correctly.
+    val id = Uuid.randomUuid()
+    log.topicId = id
+    log.partitionMetadataFile.get.write(id)
+
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
+    assertEquals(Some(5), log.latestEpoch)
+
+    // Ensure that after a directory rename, the partition metadata file is 
written to the right location.
+    val tp = Log.parseTopicPartitionName(log.dir)
+    log.renameDir(Log.logDeleteDirName(tp))
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
+    assertEquals(Some(10), log.latestEpoch)
+    assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
+    assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
+
+    // Check the topic ID remains in memory and was copied correctly.
+    assertEquals(id, log.topicId)
+    assertTrue(!log.partitionMetadataFile.isEmpty)
+    assertEquals(id, log.partitionMetadataFile.get.read().topicId)
+  }
+
+  @Test
   def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
     val log = createLog(logDir, logConfig)
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
index be8766c..e733909 100755
--- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
@@ -25,7 +25,7 @@ import kafka.controller.{ControllerChannelManager, 
ControllerContext, StateChang
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils.createTopic
 import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import 
org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState,
 StopReplicaTopicState}
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataEndpoint, UpdateMetadataPartitionState}
@@ -112,6 +112,7 @@ class BrokerEpochIntegrationTest extends 
ZooKeeperTestHarness {
 
   private def 
testControlRequestWithBrokerEpoch(epochInRequestDiffFromCurrentEpoch: Long): 
Unit = {
     val tp = new TopicPartition("new-topic", 0)
+    val topicIds = Collections.singletonMap("new-topic", Uuid.randomUuid)
 
     // create topic with 1 partition, 2 replicas, one on each broker
     createTopic(zkClient, tp.topic(), partitionReplicaAssignment = Map(0 -> 
Seq(brokerId1, brokerId2)), servers = servers)
@@ -155,7 +156,7 @@ class BrokerEpochIntegrationTest extends 
ZooKeeperTestHarness {
         val requestBuilder = new LeaderAndIsrRequest.Builder(
           ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, controllerEpoch,
           epochInRequest,
-          partitionStates.asJava, nodes.toSet.asJava)
+          partitionStates.asJava, topicIds, nodes.toSet.asJava)
 
         if (epochInRequestDiffFromCurrentEpoch < 0) {
           // stale broker epoch in LEADER_AND_ISR
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e37d93b..87a870a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -65,7 +65,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => 
JFetchMetadata, _}
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.ProducerIdAndEpoch
-import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
+import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
 import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, 
Authorizer}
 import org.easymock.EasyMock._
 import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher}
@@ -2681,12 +2681,13 @@ class KafkaApisTest {
       controllerEpoch,
       brokerEpochInRequest,
       partitionStates,
+      Collections.singletonMap("topicW", Uuid.randomUuid()),
       asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
     ).build()
     val request = buildRequest(leaderAndIsrRequest)
     val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
       .setErrorCode(Errors.NONE.code)
-      .setPartitionErrors(asList()))
+      .setPartitionErrors(asList()), leaderAndIsrRequest.version())
 
     EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
     EasyMock.expect(replicaManager.becomeLeaderOrFollower(
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index a3eb5d7..fa0b940 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -17,7 +17,9 @@
 
 package kafka.server
 
-import org.apache.kafka.common.TopicPartition
+import java.util.Collections
+
+import org.apache.kafka.common.{TopicPartition, Uuid}
 
 import scala.jdk.CollectionConverters._
 import kafka.api.LeaderAndIsr
@@ -155,7 +157,8 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
       )
       val requestBuilder = new LeaderAndIsrRequest.Builder(
         ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, 
staleControllerEpoch,
-        servers(brokerId2).kafkaController.brokerEpoch, 
partitionStates.asJava, nodes.toSet.asJava)
+        servers(brokerId2).kafkaController.brokerEpoch, partitionStates.asJava,
+        Collections.singletonMap(topic, Uuid.randomUuid()), nodes.toSet.asJava)
 
       controllerChannelManager.sendRequest(brokerId2, requestBuilder, 
staleControllerEpochCallback)
       TestUtils.waitUntilTrue(() => staleControllerEpochDetected, "Controller 
epoch should be stale")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index f73223e..80fdc60 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.net.InetAddress
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.{Optional, Properties}
+import java.util.{Collections, Optional, Properties}
 
 import kafka.api._
 import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, 
ProducerStateManager}
@@ -50,7 +50,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
+import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -174,6 +174,7 @@ class ReplicaManagerTest {
 
     try {
       val brokerList = Seq[Integer](0, 1).asJava
+      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
 
       val partition = rm.createPartition(new TopicPartition(topic, 0))
       partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
@@ -190,6 +191,7 @@ class ReplicaManagerTest {
           .setZkVersion(0)
           .setReplicas(brokerList)
           .setIsNew(false)).asJava,
+        topicIds,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       rm.getPartitionOrException(new TopicPartition(topic, 0))
@@ -212,6 +214,7 @@ class ReplicaManagerTest {
           .setZkVersion(0)
           .setReplicas(brokerList)
           .setIsNew(false)).asJava,
+        topicIds,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
 
@@ -236,6 +239,7 @@ class ReplicaManagerTest {
       replicaManager.createPartition(topicPartition)
         .createLogIfNotExists(isNew = false, isFutureReplica = false,
           new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
 
       def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
         Seq(new LeaderAndIsrPartitionState()
@@ -248,6 +252,7 @@ class ReplicaManagerTest {
           .setZkVersion(0)
           .setReplicas(brokerList)
           .setIsNew(true)).asJava,
+        topicIds,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
 
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) 
=> ())
@@ -307,6 +312,7 @@ class ReplicaManagerTest {
           .setZkVersion(0)
           .setReplicas(brokerList)
           .setIsNew(true)).asJava,
+        Collections.singletonMap(topic, Uuid.randomUuid()),
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
       replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
@@ -367,6 +373,7 @@ class ReplicaManagerTest {
           .setZkVersion(0)
           .setReplicas(brokerList)
           .setIsNew(true)).asJava,
+        Collections.singletonMap(topic, Uuid.randomUuid()),
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
       replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
@@ -473,6 +480,7 @@ class ReplicaManagerTest {
           .setZkVersion(0)
           .setReplicas(brokerList)
           .setIsNew(true)).asJava,
+        Collections.singletonMap(topic, Uuid.randomUuid()),
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
       replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
@@ -549,6 +557,7 @@ class ReplicaManagerTest {
           .setZkVersion(0)
           .setReplicas(brokerList)
           .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, Uuid.randomUuid()),
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, 
"host2", 2)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       rm.getPartitionOrException(new TopicPartition(topic, 0))
@@ -605,6 +614,7 @@ class ReplicaManagerTest {
         .setIsNew(true)
       val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
         Seq(leaderAndIsrPartitionState).asJava,
+        Collections.singletonMap(topic, Uuid.randomUuid()),
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest, (_, _) => ())
       assertEquals(Errors.NONE, leaderAndIsrResponse.error)
@@ -696,6 +706,7 @@ class ReplicaManagerTest {
       replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints)
       val partition0Replicas = Seq[Integer](0, 1).asJava
       val partition1Replicas = Seq[Integer](0, 2).asJava
+      val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> 
Uuid.randomUuid()).asJava
       val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
         Seq(
           new LeaderAndIsrPartitionState()
@@ -719,6 +730,7 @@ class ReplicaManagerTest {
             .setReplicas(partition1Replicas)
             .setIsNew(true)
         ).asJava,
+        topicIds,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
 
@@ -832,6 +844,7 @@ class ReplicaManagerTest {
     val leaderAndIsrRequest0 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
       controllerId, controllerEpoch, brokerEpoch,
       Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, 
aliveBrokerIds)).asJava,
+      Collections.singletonMap(topic, Uuid.randomUuid()),
       Set(new Node(followerBrokerId, "host1", 0),
         new Node(leaderBrokerId, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0,
@@ -908,6 +921,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(brokerList)
         .setIsNew(false)).asJava,
+      Collections.singletonMap(topic, Uuid.randomUuid()),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
 
@@ -957,6 +971,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(brokerList)
         .setIsNew(false)).asJava,
+      Collections.singletonMap(topic, Uuid.randomUuid()),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
 
@@ -1007,6 +1022,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(brokerList)
         .setIsNew(false)).asJava,
+      Collections.singletonMap(topic, Uuid.randomUuid()),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
 
@@ -1088,6 +1104,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
+      Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => 
())
 
@@ -1128,6 +1145,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
+      Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
@@ -1161,6 +1179,7 @@ class ReplicaManagerTest {
     val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
     replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints)
     val partition0Replicas = Seq[Integer](0, 1).asJava
+    val topicIds = Collections.singletonMap(tp0.topic, Uuid.randomUuid())
 
     val becomeLeaderRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
       Seq(new LeaderAndIsrPartitionState()
@@ -1173,6 +1192,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
+      topicIds,
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
@@ -1193,6 +1213,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
+      topicIds,
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => 
())
 
@@ -1209,6 +1230,7 @@ class ReplicaManagerTest {
     val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
     replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints)
     val partition0Replicas = Seq[Integer](0, 1).asJava
+    val topicIds = Collections.singletonMap(tp0.topic, Uuid.randomUuid())
 
     val becomeLeaderRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
       Seq(new LeaderAndIsrPartitionState()
@@ -1221,6 +1243,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
+      topicIds,
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
@@ -1242,6 +1265,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
+      topicIds,
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => 
())
 
@@ -1269,6 +1293,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
+      Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
@@ -1311,6 +1336,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
+      Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
@@ -1354,6 +1380,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
+      Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
@@ -1732,6 +1759,7 @@ class ReplicaManagerTest {
       val tp1 = new TopicPartition(topic, 1)
       val partition0Replicas = Seq[Integer](0, 1).asJava
       val partition1Replicas = Seq[Integer](1, 0).asJava
+      val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> 
Uuid.randomUuid()).asJava
 
       val leaderAndIsrRequest1 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
         controllerId, 0, brokerEpoch,
@@ -1757,6 +1785,7 @@ class ReplicaManagerTest {
             .setReplicas(partition1Replicas)
             .setIsNew(true)
         ).asJava,
+        topicIds,
         Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
 
       rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) 
=> ())
@@ -1787,6 +1816,7 @@ class ReplicaManagerTest {
             .setReplicas(partition1Replicas)
             .setIsNew(true)
         ).asJava,
+        topicIds,
         Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
 
       rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) 
=> ())
@@ -1821,6 +1851,7 @@ class ReplicaManagerTest {
       val tp1 = new TopicPartition(topic, 1)
       val partition0Replicas = Seq[Integer](1, 0).asJava
       val partition1Replicas = Seq[Integer](1, 0).asJava
+      val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> 
Uuid.randomUuid()).asJava
 
       val leaderAndIsrRequest1 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
         controllerId, 0, brokerEpoch,
@@ -1846,6 +1877,7 @@ class ReplicaManagerTest {
             .setReplicas(partition1Replicas)
             .setIsNew(true)
         ).asJava,
+        topicIds,
         Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
 
       rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) 
=> ())
@@ -1876,6 +1908,7 @@ class ReplicaManagerTest {
             .setReplicas(partition1Replicas)
             .setIsNew(true)
         ).asJava,
+        topicIds,
         Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
 
       rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) 
=> ())
@@ -1935,6 +1968,7 @@ class ReplicaManagerTest {
 
     val becomeLeaderRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 10, 
brokerEpoch,
       Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
+      Collections.singletonMap(topic, Uuid.randomUuid()),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
     ).build()
 
@@ -1961,6 +1995,7 @@ class ReplicaManagerTest {
 
     val becomeLeaderRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
       Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
+      Collections.singletonMap(topic, Uuid.randomUuid()),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
     ).build()
 
@@ -2110,6 +2145,7 @@ class ReplicaManagerTest {
 
     val becomeLeaderRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
       Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
+      Collections.singletonMap(tp0.topic(), Uuid.randomUuid()),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
     ).build()
 
@@ -2176,4 +2212,100 @@ class ReplicaManagerTest {
       replicaManager.shutdown(false)
     }
   }
+
+  @Test
+  def testPartitionMetadataFile() = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+
+      def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(epoch)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(true)).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) 
=> ())
+      assertFalse(replicaManager.localLog(topicPartition).isEmpty)
+      val id = topicIds.get(topicPartition.topic())
+      val log = replicaManager.localLog(topicPartition).get
+      assertFalse(log.partitionMetadataFile.isEmpty)
+      assertFalse(log.partitionMetadataFile.get.isEmpty())
+      val partitionMetadata = log.partitionMetadataFile.get.read()
+
+      // Current version of PartitionMetadataFile is 0.
+      assertEquals(0, partitionMetadata.version)
+      assertEquals(id, partitionMetadata.topicId)
+    } finally replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
+  def testPartitionMetadataFileNotCreated() = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+      val topicPartition = new TopicPartition(topic, 0)
+      val topicPartitionFoo = new TopicPartition("foo", 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" -> 
Uuid.randomUuid()).asJava
+
+      def leaderAndIsrRequest(epoch: Int, name: String, version: Short): 
LeaderAndIsrRequest = LeaderAndIsrRequest.parse(
+        new LeaderAndIsrRequest.Builder(version, 0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(name)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(epoch)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(true)).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 
1)).asJava).build().serialize(), version)
+
+      // The file has no contents if the topic does not have an associated 
topic ID.
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, 
"fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
+      assertFalse(replicaManager.localLog(topicPartition).isEmpty)
+      val log = replicaManager.localLog(topicPartition).get
+      assertFalse(log.partitionMetadataFile.isEmpty)
+      assertTrue(log.partitionMetadataFile.get.isEmpty())
+
+      // The file has no contents if the topic has the default UUID.
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, 
ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
+      assertFalse(replicaManager.localLog(topicPartition).isEmpty)
+      val log2 = replicaManager.localLog(topicPartition).get
+      assertFalse(log2.partitionMetadataFile.isEmpty)
+      assertTrue(log2.partitionMetadataFile.get.isEmpty())
+
+      // The file has no contents if the request is an older version
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 
0), (_, _) => ())
+      assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
+      val log3 = replicaManager.localLog(topicPartitionFoo).get
+      assertFalse(log3.partitionMetadataFile.isEmpty)
+      assertTrue(log3.partitionMetadataFile.get.isEmpty())
+
+      // The file has no contents if the request is an older version
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 
4), (_, _) => ())
+      assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
+      val log4 = replicaManager.localLog(topicPartitionFoo).get
+      assertFalse(log4.partitionMetadataFile.isEmpty)
+      assertTrue(log4.partitionMetadataFile.get.isEmpty())
+    } finally replicaManager.shutdown(checkpointHW = false)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 9ea4039..ed480c5 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -62,6 +62,7 @@ class RequestQuotaTest extends BaseRequestTest {
   private val topic = "topic-1"
   private val numPartitions = 1
   private val tp = new TopicPartition(topic, 0)
+  private val topicIds =  Collections.singletonMap(topic, Uuid.randomUuid())
   private val logDir = "logDir"
   private val unthrottledClientId = "unthrottled-client"
   private val smallQuotaProducerClientId = "small-quota-producer-client"
@@ -254,6 +255,7 @@ class RequestQuotaTest extends BaseRequestTest {
               .setZkVersion(2)
               .setReplicas(Seq(brokerId).asJava)
               .setIsNew(true)).asJava,
+            topicIds,
             Set(new Node(brokerId, "localhost", 0)).asJava)
 
         case ApiKeys.STOP_REPLICA =>
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 436dc9e..ce2ceda 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -21,6 +21,7 @@ import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
 import java.io.{DataInputStream, File}
 import java.net.ServerSocket
+import java.util.Collections
 import java.util.concurrent.{Executors, TimeUnit}
 
 import kafka.cluster.Broker
@@ -29,6 +30,7 @@ import kafka.log.LogManager
 import kafka.zookeeper.ZooKeeperClientTimeoutException
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
@@ -233,7 +235,8 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
 
       // Initiate a sendRequest and wait until connection is established and 
one byte is received by the peer
       val requestBuilder = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
-        controllerId, 1, 0L, Seq.empty.asJava, 
brokerAndEpochs.keys.map(_.node(listenerName)).toSet.asJava)
+        controllerId, 1, 0L, Seq.empty.asJava, Collections.singletonMap(topic, 
Uuid.randomUuid()),
+        brokerAndEpochs.keys.map(_.node(listenerName)).toSet.asJava)
       controllerChannelManager.sendRequest(1, requestBuilder)
       receiveFuture.get(10, TimeUnit.SECONDS)
 
diff --git 
a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index 97a27d9..1d51585 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -1891,7 +1891,6 @@ public final class MessageDataGenerator implements 
MessageClassGenerator {
                     prefix, field.camelCaseName(), field.camelCaseName());
         } else if (field.type().isStruct() ||
             field.type() instanceof FieldType.UUIDFieldType) {
-        } else if (field.type().isStruct()) {
             buffer.printf("+ \"%s%s=\" + %s.toString()%n",
                 prefix, field.camelCaseName(), field.camelCaseName());
         } else if (field.type().isArray()) {

Reply via email to