This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1dd1e7f KAFKA-10545: Create topic IDs and propagate to brokers (#9626) 1dd1e7f is described below commit 1dd1e7f945d7a8c1dc177223cd88800680f1ff46 Author: Justine Olshan <jols...@confluent.io> AuthorDate: Fri Dec 18 17:19:50 2020 -0500 KAFKA-10545: Create topic IDs and propagate to brokers (#9626) This change propagates topic ids to brokers in LeaderAndIsr Request. It also removes the topic name from the LeaderAndIsr Response, reorganizes the response to be sorted by topic, and includes the topic ID. In addition, the topic ID is persisted to each replica in Log as well as in a file on disk. This file is read on startup and if the topic ID exists, it will be reloaded. Reviewers: David Jacot <dja...@confluent.io>, dengziming <dengziming1...@gmail.com>, Nikhil Bhatia <rite2nik...@gmail.com>, Rajini Sivaram <rajinisiva...@googlemail.com> --- checkstyle/suppressions.xml | 2 +- .../kafka/common/requests/LeaderAndIsrRequest.java | 57 ++++++-- .../common/requests/LeaderAndIsrResponse.java | 58 +++++++-- .../common/message/LeaderAndIsrRequest.json | 12 +- .../common/message/LeaderAndIsrResponse.json | 24 +++- .../common/requests/LeaderAndIsrRequestTest.java | 38 +++++- .../common/requests/LeaderAndIsrResponseTest.java | 114 ++++++++++++---- .../kafka/common/requests/RequestResponseTest.java | 60 ++++++--- core/src/main/scala/kafka/api/ApiVersion.scala | 2 +- .../controller/ControllerChannelManager.scala | 10 +- .../scala/kafka/controller/KafkaController.scala | 7 +- core/src/main/scala/kafka/log/Log.scala | 21 ++- .../scala/kafka/server/PartitionMetadataFile.scala | 144 +++++++++++++++++++++ .../main/scala/kafka/server/ReplicaManager.scala | 63 +++++++-- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +- .../kafka/api/AuthorizerIntegrationTest.scala | 9 +- .../controller/ControllerChannelManagerTest.scala | 46 +++++-- .../test/scala/unit/kafka/log/LogManagerTest.scala | 8 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 46 ++++++- .../kafka/server/BrokerEpochIntegrationTest.scala | 5 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 5 +- .../unit/kafka/server/LeaderElectionTest.scala | 7 +- .../unit/kafka/server/ReplicaManagerTest.scala | 136 ++++++++++++++++++- .../scala/unit/kafka/server/RequestQuotaTest.scala | 2 + .../unit/kafka/server/ServerShutdownTest.scala | 5 +- .../apache/kafka/message/MessageDataGenerator.java | 1 - 26 files changed, 760 insertions(+), 124 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index b44e713..8539034 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -100,7 +100,7 @@ files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/> <suppress checks="NPathComplexity" - files="MemoryRecordsTest|MetricsTest|TestSslUtils|AclAuthorizerBenchmark"/> + files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/> <suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)" files="Murmur3Test.java"/> diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 833e025..939212a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -17,11 +17,13 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.LeaderAndIsrRequestData; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrLiveLeader; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrTopicState; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState; import org.apache.kafka.common.message.LeaderAndIsrResponseData; +import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError; import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; @@ -43,12 +45,15 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { public static class Builder extends AbstractControlRequest.Builder<LeaderAndIsrRequest> { private final List<LeaderAndIsrPartitionState> partitionStates; + private final Map<String, Uuid> topicIds; private final Collection<Node> liveLeaders; public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, - List<LeaderAndIsrPartitionState> partitionStates, Collection<Node> liveLeaders) { + List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds, + Collection<Node> liveLeaders) { super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch); this.partitionStates = partitionStates; + this.topicIds = topicIds; this.liveLeaders = liveLeaders; } @@ -67,7 +72,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { .setLiveLeaders(leaders); if (version >= 2) { - Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates); + Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates, topicIds); data.setTopicStates(new ArrayList<>(topicStatesMap.values())); } else { data.setUngroupedPartitionStates(partitionStates); @@ -76,13 +81,14 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { return new LeaderAndIsrRequest(data, version); } - private static Map<String, LeaderAndIsrTopicState> groupByTopic(List<LeaderAndIsrPartitionState> partitionStates) { + private static Map<String, LeaderAndIsrTopicState> groupByTopic(List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds) { Map<String, LeaderAndIsrTopicState> topicStates = new HashMap<>(); // We don't null out the topic name in LeaderAndIsrRequestPartition since it's ignored by // the generated code if version >= 2 for (LeaderAndIsrPartitionState partition : partitionStates) { - LeaderAndIsrTopicState topicState = topicStates.computeIfAbsent(partition.topicName(), - t -> new LeaderAndIsrTopicState().setTopicName(partition.topicName())); + LeaderAndIsrTopicState topicState = topicStates.computeIfAbsent(partition.topicName(), t -> new LeaderAndIsrTopicState() + .setTopicName(partition.topicName()) + .setTopicId(topicIds.getOrDefault(partition.topicName(), Uuid.ZERO_UUID))); topicState.partitionStates().add(partition); } return topicStates; @@ -96,6 +102,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { .append(", controllerEpoch=").append(controllerEpoch) .append(", brokerEpoch=").append(brokerEpoch) .append(", partitionStates=").append(partitionStates) + .append(", topicIds=").append(topicIds) .append(", liveLeaders=(").append(Utils.join(liveLeaders, ", ")).append(")") .append(")"); return bld.toString(); @@ -129,15 +136,34 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { Errors error = Errors.forException(e); responseData.setErrorCode(error.code()); - List<LeaderAndIsrPartitionError> partitions = new ArrayList<>(); - for (LeaderAndIsrPartitionState partition : partitionStates()) { - partitions.add(new LeaderAndIsrPartitionError() - .setTopicName(partition.topicName()) - .setPartitionIndex(partition.partitionIndex()) - .setErrorCode(error.code())); + if (version() < 5) { + List<LeaderAndIsrPartitionError> partitions = new ArrayList<>(); + for (LeaderAndIsrPartitionState partition : partitionStates()) { + partitions.add(new LeaderAndIsrPartitionError() + .setTopicName(partition.topicName()) + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(error.code())); + } + responseData.setPartitionErrors(partitions); + return new LeaderAndIsrResponse(responseData, version()); + } + + List<LeaderAndIsrTopicError> topics = new ArrayList<>(data.topicStates().size()); + Map<String, Uuid> topicIds = topicIds(); + for (LeaderAndIsrTopicState topicState : data.topicStates()) { + LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError(); + topicError.setTopicId(topicIds.get(topicState.topicName())); + List<LeaderAndIsrPartitionError> partitions = new ArrayList<>(topicState.partitionStates().size()); + for (LeaderAndIsrPartitionState partition : topicState.partitionStates()) { + partitions.add(new LeaderAndIsrPartitionError() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(error.code())); + } + topicError.setPartitionErrors(partitions); + topics.add(topicError); } - responseData.setPartitionErrors(partitions); - return new LeaderAndIsrResponse(responseData); + responseData.setTopics(topics); + return new LeaderAndIsrResponse(responseData, version()); } @Override @@ -162,6 +188,11 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { return data.ungroupedPartitionStates(); } + public Map<String, Uuid> topicIds() { + return data.topicStates().stream() + .collect(Collectors.toMap(LeaderAndIsrTopicState::topicName, LeaderAndIsrTopicState::topicId)); + } + public List<LeaderAndIsrLiveLeader> liveLeaders() { return Collections.unmodifiableList(data.liveLeaders()); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java index 974dde8..60ab3d5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java @@ -16,15 +16,20 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.LeaderAndIsrResponseData; +import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError; import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.FlattenedIterator; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.HashMap; import java.util.Map; public class LeaderAndIsrResponse extends AbstractResponse { @@ -36,14 +41,24 @@ public class LeaderAndIsrResponse extends AbstractResponse { * STALE_BROKER_EPOCH (77) */ private final LeaderAndIsrResponseData data; + private short version; - public LeaderAndIsrResponse(LeaderAndIsrResponseData data) { + public LeaderAndIsrResponse(LeaderAndIsrResponseData data, short version) { super(ApiKeys.LEADER_AND_ISR); this.data = data; + this.version = version; } - public List<LeaderAndIsrPartitionError> partitions() { - return data.partitionErrors(); + public List<LeaderAndIsrTopicError> topics() { + return this.data.topics(); + } + + public Iterable<LeaderAndIsrPartitionError> partitions() { + if (version < 5) { + return data.partitionErrors(); + } + return () -> new FlattenedIterator<>(data.topics().iterator(), + topic -> topic.partitionErrors().iterator()); } public Errors error() { @@ -53,22 +68,49 @@ public class LeaderAndIsrResponse extends AbstractResponse { @Override public Map<Errors, Integer> errorCounts() { Errors error = error(); - if (error != Errors.NONE) + if (error != Errors.NONE) { // Minor optimization since the top-level error applies to all partitions - return Collections.singletonMap(error, data.partitionErrors().size() + 1); - Map<Errors, Integer> errors = errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode()))); - // Top level error + if (version < 5) + return Collections.singletonMap(error, data.partitionErrors().size() + 1); + return Collections.singletonMap(error, + data.topics().stream().mapToInt(t -> t.partitionErrors().size()).sum() + 1); + } + Map<Errors, Integer> errors; + if (version < 5) + errors = errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode()))); + else + errors = errorCounts(data.topics().stream().flatMap(t -> t.partitionErrors().stream()).map(l -> + Errors.forCode(l.errorCode()))); updateErrorCounts(errors, Errors.NONE); return errors; } + public Map<TopicPartition, Errors> partitionErrors(Map<Uuid, String> topicNames) { + Map<TopicPartition, Errors> errors = new HashMap<>(); + if (version < 5) { + data.partitionErrors().forEach(partition -> + errors.put(new TopicPartition(partition.topicName(), partition.partitionIndex()), + Errors.forCode(partition.errorCode()))); + } else { + for (LeaderAndIsrTopicError topic : data.topics()) { + String topicName = topicNames.get(topic.topicId()); + if (topicName != null) { + topic.partitionErrors().forEach(partition -> + errors.put(new TopicPartition(topicName, partition.partitionIndex()), + Errors.forCode(partition.errorCode()))); + } + } + } + return errors; + } + @Override public int throttleTimeMs() { return DEFAULT_THROTTLE_TIME; } public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) { - return new LeaderAndIsrResponse(new LeaderAndIsrResponseData(new ByteBufferAccessor(buffer), version)); + return new LeaderAndIsrResponse(new LeaderAndIsrResponseData(new ByteBufferAccessor(buffer), version), version); } @Override diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json index 8529688..129b7f7 100644 --- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json +++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json @@ -21,8 +21,12 @@ // // Version 2 adds broker epoch and reorganizes the partitions by topic. // - // Version 3 adds AddingReplicas and RemovingReplicas - "validVersions": "0-4", + // Version 3 adds AddingReplicas and RemovingReplicas. + // + // Version 4 is the first flexible version. + // + // Version 5 adds Topic ID and Type to the TopicStates, as described in KIP-516. + "validVersions": "0-5", "flexibleVersions": "4+", "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", @@ -31,6 +35,8 @@ "about": "The current controller epoch." }, { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default": "-1", "about": "The current broker epoch." }, + { "name": "Type", "type": "int8", "versions": "5+", + "about": "The type that indicates whether all topics are included in the request"}, { "name": "UngroupedPartitionStates", "type": "[]LeaderAndIsrPartitionState", "versions": "0-1", "about": "The state of each partition, in a v0 or v1 message." }, // In v0 or v1 requests, each partition is listed alongside its topic name. @@ -40,6 +46,8 @@ "about": "Each topic.", "fields": [ { "name": "TopicName", "type": "string", "versions": "2+", "entityType": "topicName", "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "5+", "ignorable": true, + "about": "The unique topic ID." }, { "name": "PartitionStates", "type": "[]LeaderAndIsrPartitionState", "versions": "2+", "about": "The state of each partition" } ]}, diff --git a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json index 10c3cd9..dc5879b 100644 --- a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json +++ b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json @@ -22,15 +22,29 @@ // Version 2 is the same as version 1. // // Version 3 is the same as version 2. - "validVersions": "0-4", + // + // Version 4 is the first flexible version. + // + // Version 5 removes TopicName and replaces it with TopicId and reorganizes + // the partitions by topic, as described by KIP-516. + "validVersions": "0-5", "flexibleVersions": "4+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, - { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+", - "about": "Each partition.", "fields": [ - { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", - "about": "The topic name." }, + { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0-4", + "about": "Each partition in v0 to v4 message."}, + { "name": "Topics", "type": "[]LeaderAndIsrTopicError", "versions": "5+", + "about": "Each topic", "fields": [ + { "name": "TopicId", "type": "uuid", "versions": "5+", "about": "The unique topic ID" }, + { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "5+", + "about": "Each partition."} + ]} + ], + "commonStructs": [ + { "name": "LeaderAndIsrPartitionError", "versions": "0+", "fields": [ + { "name": "TopicName", "type": "string", "versions": "0-4", "entityType": "topicName", "ignorable": true, + "about": "The topic name."}, { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java index 939514e..c45682f 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.LeaderAndIsrRequestData; @@ -31,8 +32,10 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -50,19 +53,25 @@ public class LeaderAndIsrRequestTest { public void testUnsupportedVersion() { LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder( (short) (LEADER_AND_ISR.latestVersion() + 1), 0, 0, 0, - Collections.emptyList(), Collections.emptySet()); + Collections.emptyList(), Collections.emptyMap(), Collections.emptySet()); assertThrows(UnsupportedVersionException.class, builder::build); } @Test public void testGetErrorResponse() { + Uuid id = Uuid.randomUuid(); for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(version, 0, 0, 0, - Collections.emptyList(), Collections.emptySet()); + Collections.emptyList(), Collections.singletonMap("topic", id), Collections.emptySet()); LeaderAndIsrRequest request = builder.build(); LeaderAndIsrResponse response = request.getErrorResponse(0, new ClusterAuthorizationException("Not authorized")); assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, response.error()); + if (version < 5) { + assertEquals(0, response.topics().size()); + } else { + assertEquals(id, response.topics().get(0).topicId()); + } } } @@ -115,8 +124,13 @@ public class LeaderAndIsrRequestTest { new Node(0, "host0", 9090), new Node(1, "host1", 9091) ); + + Map<String, Uuid> topicIds = new HashMap<>(); + topicIds.put("topic0", Uuid.randomUuid()); + topicIds.put("topic1", Uuid.randomUuid()); + LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(version, 1, 2, 3, partitionStates, - liveNodes).build(); + topicIds, liveNodes).build(); List<LeaderAndIsrLiveLeader> liveLeaders = liveNodes.stream().map(n -> new LeaderAndIsrLiveLeader() .setBrokerId(n.id()) @@ -140,7 +154,21 @@ public class LeaderAndIsrRequestTest { .setRemovingReplicas(emptyList()); } + // Prior to version 2, there were no TopicStates, so a map of Topic Ids from a list of + // TopicStates is an empty map. + if (version < 2) { + topicIds = new HashMap<>(); + } + + // In versions 2-4 there are TopicStates, but no topicIds, so deserialized requests will have + // Zero Uuids in place. + if (version > 1 && version < 5) { + topicIds.put("topic0", Uuid.ZERO_UUID); + topicIds.put("topic1", Uuid.ZERO_UUID); + } + assertEquals(new HashSet<>(partitionStates), iterableToSet(deserializedRequest.partitionStates())); + assertEquals(topicIds, deserializedRequest.topicIds()); assertEquals(liveLeaders, deserializedRequest.liveLeaders()); assertEquals(1, request.controllerId()); assertEquals(2, request.controllerEpoch()); @@ -152,13 +180,15 @@ public class LeaderAndIsrRequestTest { public void testTopicPartitionGroupingSizeReduction() { Set<TopicPartition> tps = TestUtils.generateRandomTopicPartitions(10, 10); List<LeaderAndIsrPartitionState> partitionStates = new ArrayList<>(); + Map<String, Uuid> topicIds = new HashMap<>(); for (TopicPartition tp : tps) { partitionStates.add(new LeaderAndIsrPartitionState() .setTopicName(tp.topic()) .setPartitionIndex(tp.partition())); + topicIds.put(tp.topic(), Uuid.randomUuid()); } LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder((short) 2, 0, 0, 0, - partitionStates, Collections.emptySet()); + partitionStates, topicIds, Collections.emptySet()); LeaderAndIsrRequest v2 = builder.build((short) 2); LeaderAndIsrRequest v1 = builder.build((short) 1); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java index fbd7d48..9940e55 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState; import org.apache.kafka.common.message.LeaderAndIsrResponseData; +import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError; import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -29,6 +31,7 @@ import java.util.List; import java.util.Map; import static java.util.Arrays.asList; +import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -57,46 +60,87 @@ public class LeaderAndIsrResponseTest { .setZkVersion(20) .setReplicas(Collections.singletonList(10)) .setIsNew(false)); + Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); + LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(), - 15, 20, 0, partitionStates, Collections.emptySet()).build(); + 15, 20, 0, partitionStates, topicIds, Collections.emptySet()).build(); LeaderAndIsrResponse response = request.getErrorResponse(0, Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 3), response.errorCounts()); } @Test public void testErrorCountsWithTopLevelError() { - List<LeaderAndIsrPartitionError> partitions = createPartitions("foo", - asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER)); - LeaderAndIsrResponse response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() - .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) - .setPartitionErrors(partitions)); - assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 3), response.errorCounts()); + for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { + LeaderAndIsrResponse response; + if (version < 5) { + List<LeaderAndIsrPartitionError> partitions = createPartitions("foo", + asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER)); + response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setPartitionErrors(partitions), version); + } else { + Uuid id = Uuid.randomUuid(); + List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER)); + response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setTopics(topics), version); + } + assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 3), response.errorCounts()); + } } @Test public void testErrorCountsNoTopLevelError() { - List<LeaderAndIsrPartitionError> partitions = createPartitions("foo", - asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED)); - LeaderAndIsrResponse response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() - .setErrorCode(Errors.NONE.code()) - .setPartitionErrors(partitions)); - Map<Errors, Integer> errorCounts = response.errorCounts(); - assertEquals(2, errorCounts.size()); - assertEquals(2, errorCounts.get(Errors.NONE).intValue()); - assertEquals(1, errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue()); + for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { + LeaderAndIsrResponse response; + if (version < 5) { + List<LeaderAndIsrPartitionError> partitions = createPartitions("foo", + asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED)); + response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.NONE.code()) + .setPartitionErrors(partitions), version); + } else { + Uuid id = Uuid.randomUuid(); + List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED)); + response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.NONE.code()) + .setTopics(topics), version); + } + Map<Errors, Integer> errorCounts = response.errorCounts(); + assertEquals(2, errorCounts.size()); + assertEquals(2, errorCounts.get(Errors.NONE).intValue()); + assertEquals(1, errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue()); + } } @Test public void testToString() { - List<LeaderAndIsrPartitionError> partitions = createPartitions("foo", - asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED)); - LeaderAndIsrResponse response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() - .setErrorCode(Errors.NONE.code()) - .setPartitionErrors(partitions)); - String responseStr = response.toString(); - assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName())); - assertTrue(responseStr.contains(partitions.toString())); - assertTrue(responseStr.contains("errorCode=" + Errors.NONE.code())); + for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { + LeaderAndIsrResponse response; + if (version < 5) { + List<LeaderAndIsrPartitionError> partitions = createPartitions("foo", + asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED)); + response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.NONE.code()) + .setPartitionErrors(partitions), version); + String responseStr = response.toString(); + assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName())); + assertTrue(responseStr.contains(partitions.toString())); + assertTrue(responseStr.contains("errorCode=" + Errors.NONE.code())); + + } else { + Uuid id = Uuid.randomUuid(); + List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED)); + response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.NONE.code()) + .setTopics(topics), version); + String responseStr = response.toString(); + assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName())); + assertTrue(responseStr.contains(topics.toString())); + assertTrue(responseStr.contains(id.toString())); + assertTrue(responseStr.contains("errorCode=" + Errors.NONE.code())); + } + } } private List<LeaderAndIsrPartitionError> createPartitions(String topicName, List<Errors> errors) { @@ -104,11 +148,27 @@ public class LeaderAndIsrResponseTest { int partitionIndex = 0; for (Errors error : errors) { partitions.add(new LeaderAndIsrPartitionError() - .setTopicName(topicName) + .setTopicName(topicName) + .setPartitionIndex(partitionIndex++) + .setErrorCode(error.code())); + } + return partitions; + } + + private List<LeaderAndIsrTopicError> createTopic(Uuid id, List<Errors> errors) { + List<LeaderAndIsrTopicError> topics = new ArrayList<>(); + LeaderAndIsrTopicError topic = new LeaderAndIsrTopicError(); + topic.setTopicId(id); + List<LeaderAndIsrPartitionError> partitions = new ArrayList<>(); + int partitionIndex = 0; + for (Errors error : errors) { + partitions.add(new LeaderAndIsrPartitionError() .setPartitionIndex(partitionIndex++) .setErrorCode(error.code())); } - return partitions; + topic.setPartitionErrors(partitions); + topics.add(topic); + return topics; } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 8f9a4dc..fdf541c 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AccessControlEntryFilter; import org.apache.kafka.common.acl.AclBinding; @@ -320,13 +321,12 @@ public class RequestResponseTest { checkResponse(createStopReplicaResponse(), v, true); } - checkRequest(createLeaderAndIsrRequest(0), true); - checkErrorResponse(createLeaderAndIsrRequest(0), unknownServerException, false); - checkRequest(createLeaderAndIsrRequest(1), true); - checkErrorResponse(createLeaderAndIsrRequest(1), unknownServerException, false); - checkRequest(createLeaderAndIsrRequest(2), true); - checkErrorResponse(createLeaderAndIsrRequest(2), unknownServerException, false); - checkResponse(createLeaderAndIsrResponse(), 0, true); + for (int v = ApiKeys.LEADER_AND_ISR.oldestVersion(); v <= ApiKeys.LEADER_AND_ISR.latestVersion(); v++) { + checkRequest(createLeaderAndIsrRequest(v), true); + checkErrorResponse(createLeaderAndIsrRequest(v), unknownServerException, false); + checkResponse(createLeaderAndIsrResponse(v), v, true); + } + checkRequest(createSaslHandshakeRequest(), true); checkErrorResponse(createSaslHandshakeRequest(), unknownServerException, true); checkResponse(createSaslHandshakeResponse(), 0, true); @@ -1550,18 +1550,37 @@ public class RequestResponseTest { new Node(0, "test0", 1223), new Node(1, "test1", 1223) ); - return new LeaderAndIsrRequest.Builder((short) version, 1, 10, 0, partitionStates, leaders).build(); + + Map<String, Uuid> topicIds = new HashMap<>(); + topicIds.put("topic5", Uuid.randomUuid()); + topicIds.put("topic20", Uuid.randomUuid()); + + return new LeaderAndIsrRequest.Builder((short) version, 1, 10, 0, + partitionStates, topicIds, leaders).build(); } - private LeaderAndIsrResponse createLeaderAndIsrResponse() { - List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError> partitions = new ArrayList<>(); - partitions.add(new LeaderAndIsrResponseData.LeaderAndIsrPartitionError() - .setTopicName("test") - .setPartitionIndex(0) - .setErrorCode(Errors.NONE.code())); - return new LeaderAndIsrResponse(new LeaderAndIsrResponseData() - .setErrorCode(Errors.NONE.code()) - .setPartitionErrors(partitions)); + private LeaderAndIsrResponse createLeaderAndIsrResponse(int version) { + if (version < 5) { + List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError> partitions = new ArrayList<>(); + partitions.add(new LeaderAndIsrResponseData.LeaderAndIsrPartitionError() + .setTopicName("test") + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code())); + return new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.NONE.code()) + .setPartitionErrors(partitions), (short) version); + } else { + List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError> partition = Collections.singletonList( + new LeaderAndIsrResponseData.LeaderAndIsrPartitionError() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code())); + List<LeaderAndIsrResponseData.LeaderAndIsrTopicError> topics = new ArrayList<>(); + topics.add(new LeaderAndIsrResponseData.LeaderAndIsrTopicError() + .setTopicId(Uuid.randomUuid()) + .setPartitionErrors(partition)); + return new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setTopics(topics), (short) version); + } } private UpdateMetadataRequest createUpdateMetadataRequest(int version, String rack) { @@ -1600,6 +1619,10 @@ public class RequestResponseTest { .setReplicas(replicas) .setOfflineReplicas(offlineReplicas)); + Map<String, Uuid> topicIds = new HashMap<>(); + topicIds.put("topic5", Uuid.randomUuid()); + topicIds.put("topic20", Uuid.randomUuid()); + SecurityProtocol plaintext = SecurityProtocol.PLAINTEXT; List<UpdateMetadataEndpoint> endpoints1 = new ArrayList<>(); endpoints1.add(new UpdateMetadataEndpoint() @@ -2541,7 +2564,8 @@ public class RequestResponseTest { assertEquals(Integer.valueOf(1), createHeartBeatResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE)); - assertEquals(Integer.valueOf(2), createLeaderAndIsrResponse().errorCounts().get(Errors.NONE)); + assertEquals(Integer.valueOf(2), createLeaderAndIsrResponse(4).errorCounts().get(Errors.NONE)); + assertEquals(Integer.valueOf(2), createLeaderAndIsrResponse(5).errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(3), createLeaderEpochResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createLeaveGroupResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createListGroupsResponse(LIST_GROUPS.latestVersion()).errorCounts().get(Errors.NONE)); diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index a019032..c859f8d 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -111,7 +111,7 @@ object ApiVersion { KAFKA_2_7_IV2, // Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. KAFKA_2_8_IV0, - // Add topicId to MetadataUpdateRequest + // Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) KAFKA_2_8_IV1 ) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index b716552..21a445b 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -455,7 +455,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, private def sendLeaderAndIsrRequest(controllerEpoch: Int, stateChangeLog: StateChangeLogger): Unit = { val leaderAndIsrRequestVersion: Short = - if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 4 + if (config.interBrokerProtocolVersion >= KAFKA_2_8_IV1) 5 + else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 4 else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV0) 3 else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2 else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1 @@ -482,8 +483,13 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, _.node(config.interBrokerListenerName) } val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker) + val topicIds = leaderAndIsrPartitionStates.keys + .map(_.topic) + .toSet[String] + .map(topic => (topic, controllerContext.topicIds(topic))) + .toMap val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId, - controllerEpoch, brokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, leaders.asJava) + controllerEpoch, brokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, leaders.asJava) sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => { val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse] sendEvent(LeaderAndIsrResponseReceived(leaderAndIsrResponse, broker)) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index fe14d42..b382fd9 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1378,11 +1378,10 @@ class KafkaController(val config: KafkaConfig, val offlineReplicas = new ArrayBuffer[TopicPartition]() val onlineReplicas = new ArrayBuffer[TopicPartition]() - leaderAndIsrResponse.partitions.forEach { partition => - val tp = new TopicPartition(partition.topicName, partition.partitionIndex) - if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code) + leaderAndIsrResponse.partitionErrors(controllerContext.topicNames.asJava).forEach{ case (tp, error) => + if (error.code() == Errors.KAFKA_STORAGE_ERROR.code) offlineReplicas += tp - else if (partition.errorCode == Errors.NONE.code) + else if (error.code() == Errors.NONE.code) onlineReplicas += tp } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 438f234..47d6b92 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -32,7 +32,7 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCod import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.LeaderEpochFileCache -import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch} +import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile} import kafka.utils._ import org.apache.kafka.common.errors._ import org.apache.kafka.common.message.FetchResponseData @@ -43,7 +43,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET import org.apache.kafka.common.requests.ProduceResponse.RecordError import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} +import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import scala.jdk.CollectionConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -296,11 +296,16 @@ class Log(@volatile private var _dir: File, // Visible for testing @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None + @volatile var partitionMetadataFile : Option[PartitionMetadataFile] = None + + @volatile var topicId : Uuid = Uuid.ZERO_UUID + locally { // create the log directory if it doesn't exist Files.createDirectories(dir.toPath) initializeLeaderEpochCache() + initializePartitionMetadata() val nextOffset = loadSegments() @@ -324,6 +329,12 @@ class Log(@volatile private var _dir: File, // deletion. producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq) loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown) + + // Recover topic ID if present + partitionMetadataFile.foreach { file => + if (!file.isEmpty()) + topicId = file.read().topicId + } } def dir: File = _dir @@ -536,6 +547,11 @@ class Log(@volatile private var _dir: File, private def recordVersion: RecordVersion = config.messageFormatVersion.recordVersion + private def initializePartitionMetadata(): Unit = lock synchronized { + val partitionMetadata = PartitionMetadataFile.newFile(dir) + partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata, logDirFailureChannel)) + } + private def initializeLeaderEpochCache(): Unit = lock synchronized { val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir) @@ -1003,6 +1019,7 @@ class Log(@volatile private var _dir: File, // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference // the checkpoint file in renamed log directory initializeLeaderEpochCache() + initializePartitionMetadata() } } } diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala new file mode 100644 index 0000000..1adcbc3 --- /dev/null +++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, IOException, OutputStreamWriter} +import java.nio.charset.StandardCharsets +import java.nio.file.{FileAlreadyExistsException, Files, Paths} +import java.util.regex.Pattern + +import kafka.utils.Logging +import org.apache.kafka.common.Uuid +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.utils.Utils + + + +object PartitionMetadataFile { + private val PartitionMetadataFilename = "partition.metadata" + private val WhiteSpacesPattern = Pattern.compile(":\\s+") + private val CurrentVersion = 0 + + def newFile(dir: File): File = new File(dir, PartitionMetadataFilename) + + object PartitionMetadataFileFormatter { + def toFile(data: PartitionMetadata): String = { + s"version: ${data.version}\ntopic_id: ${data.topicId}" + } + + } + + class PartitionMetadataReadBuffer[T](location: String, + reader: BufferedReader, + version: Int) extends Logging { + def read(): PartitionMetadata = { + def malformedLineException(line: String) = + new IOException(s"Malformed line in checkpoint file ($location): '$line'") + + var line: String = null + var metadataTopicId: Uuid = null + try { + line = reader.readLine() + WhiteSpacesPattern.split(line) match { + case Array(_, version) => + if (version.toInt == CurrentVersion) { + line = reader.readLine() + WhiteSpacesPattern.split(line) match { + case Array(_, topicId) => metadataTopicId = Uuid.fromString(topicId) + case _ => throw malformedLineException(line) + } + if (metadataTopicId.equals(Uuid.ZERO_UUID)) { + throw new IOException(s"Invalid topic ID in partition metadata file ($location)") + } + new PartitionMetadata(CurrentVersion, metadataTopicId) + } else { + throw new IOException(s"Unrecognized version of partition metadata file ($location): " + version) + } + case _ => throw malformedLineException(line) + } + } catch { + case _: NumberFormatException => throw malformedLineException(line) + } + } + } + +} + +class PartitionMetadata(val version: Int, val topicId: Uuid) + + +class PartitionMetadataFile(val file: File, + logDirFailureChannel: LogDirFailureChannel) extends Logging { + import kafka.server.PartitionMetadataFile.{CurrentVersion, PartitionMetadataFileFormatter, PartitionMetadataReadBuffer} + + private val path = file.toPath.toAbsolutePath + private val tempPath = Paths.get(path.toString + ".tmp") + private val lock = new Object() + private val logDir = file.getParentFile.getParent + + + try Files.createFile(file.toPath) // create the file if it doesn't exist + catch { case _: FileAlreadyExistsException => } + + def write(topicId: Uuid): Unit = { + lock synchronized { + try { + // write to temp file and then swap with the existing file + val fileOutputStream = new FileOutputStream(tempPath.toFile) + val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8)) + try { + writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion,topicId))) + writer.flush() + fileOutputStream.getFD().sync() + } finally { + writer.close() + } + + Utils.atomicMoveWithFallback(tempPath, path) + } catch { + case e: IOException => + val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}" + logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e) + throw new KafkaStorageException(msg, e) + } + } + } + + def read(): PartitionMetadata = { + lock synchronized { + try { + val reader = Files.newBufferedReader(path) + try { + val partitionBuffer = new PartitionMetadataReadBuffer(file.getAbsolutePath, reader, CurrentVersion) + partitionBuffer.read() + } finally { + reader.close() + } + } catch { + case e: IOException => + val msg = s"Error while reading partition metadata file ${file.getAbsolutePath}" + logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e) + throw new KafkaStorageException(msg, e) + } + } + } + + def isEmpty(): Boolean = { + file.length() == 0 + } +} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ea44021..a3934a5 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -36,12 +36,13 @@ import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, Of import kafka.utils._ import kafka.utils.Implicits._ import kafka.zk.KafkaZkClient -import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition} +import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult import org.apache.kafka.common.message.{DescribeLogDirsResponseData, FetchResponseData, LeaderAndIsrResponseData} +import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic} import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{OffsetForLeaderTopicResult, EpochEndOffset} @@ -1331,6 +1332,7 @@ class ReplicaManager(val config: KafkaConfig, s"correlation id $correlationId from controller $controllerId " + s"epoch ${leaderAndIsrRequest.controllerEpoch}") } + val topicIds = leaderAndIsrRequest.topicIds() val response = { if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) { @@ -1437,6 +1439,24 @@ class ReplicaManager(val config: KafkaConfig, */ if (localLog(topicPartition).isEmpty) markPartitionOffline(topicPartition) + else { + val id = topicIds.get(topicPartition.topic()) + // Ensure we have not received a request from an older protocol + if (id != null && !id.equals(Uuid.ZERO_UUID)) { + val log = localLog(topicPartition).get + // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file. + // This is because if the broker previously wrote it to file, it would be recovered on restart after failure. + if (log.topicId.equals(Uuid.ZERO_UUID)) { + log.partitionMetadataFile.get.write(id) + log.topicId = id + // Warn if the topic ID in the request does not match the log. + } else if (!log.topicId.equals(id)) { + stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" + + s" match the topic Id provided in the request: " + + s"${id.toString}.") + } + } + } } // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions @@ -1448,15 +1468,38 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.shutdownIdleFetcherThreads() replicaAlterLogDirsManager.shutdownIdleFetcherThreads() onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower) - val responsePartitions = responseMap.iterator.map { case (tp, error) => - new LeaderAndIsrPartitionError() - .setTopicName(tp.topic) - .setPartitionIndex(tp.partition) - .setErrorCode(error.code) - }.toBuffer - new LeaderAndIsrResponse(new LeaderAndIsrResponseData() - .setErrorCode(Errors.NONE.code) - .setPartitionErrors(responsePartitions.asJava)) + if (leaderAndIsrRequest.version() < 5) { + val responsePartitions = responseMap.iterator.map { case (tp, error) => + new LeaderAndIsrPartitionError() + .setTopicName(tp.topic) + .setPartitionIndex(tp.partition) + .setErrorCode(error.code) + }.toBuffer + new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.NONE.code) + .setPartitionErrors(responsePartitions.asJava), leaderAndIsrRequest.version()) + } else { + val topics = new mutable.HashMap[String, List[LeaderAndIsrPartitionError]] + responseMap.asJava.forEach { case (tp, error) => + if (!topics.contains(tp.topic)) { + topics.put(tp.topic, List(new LeaderAndIsrPartitionError() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code))) + } else { + topics.put(tp.topic, new LeaderAndIsrPartitionError() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)::topics(tp.topic)) + } + } + val topicErrors = topics.iterator.map { case (topic, partitionError) => + new LeaderAndIsrTopicError() + .setTopicId(topicIds.get(topic)) + .setPartitionErrors(partitionError.asJava) + }.toBuffer + new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.NONE.code) + .setTopics(topicErrors.asJava), leaderAndIsrRequest.version()) + } } } val endMs = time.milliseconds() diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index c8122d7..7f84dab 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -507,7 +507,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo /** * Sets the topic znode with the given assignment. * @param topic the topic whose assignment is being set. - * @param topicId optional topic ID if the topic has one + * @param topicId unique topic ID for the topic * @param assignment the partition to replica mapping to set for the given topic * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. * @return SetDataResponse diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 11faf35..0ca1e17 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -56,7 +56,7 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED} import org.apache.kafka.common.resource.ResourceType._ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType} import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol} -import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, requests} +import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, requests, Uuid} import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -99,6 +99,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val brokerId: Integer = 0 val topic = "topic" + val topicId = Uuid.randomUuid() val topicPattern = "topic.*" val transactionalId = "transactional.id" val producerId = 83392L @@ -106,6 +107,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val correlationId = 0 val clientId = "client-Id" val tp = new TopicPartition(topic, part) + val topicIds = Collections.singletonMap(topic, topicId) + val topicNames = Collections.singletonMap(topicId, topic) val logDir = "logDir" val group = "my-group" val protocolType = "consumer" @@ -181,7 +184,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error), ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)), ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => Errors.forCode( - resp.partitions.asScala.find(p => p.topicName == tp.topic && p.partitionIndex == tp.partition).get.errorCode)), + resp.topics.asScala.find(t => topicNames.get(t.topicId) == tp.topic).get.partitionErrors.asScala.find( + p => p.partitionIndex == tp.partition).get.errorCode)), ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => Errors.forCode( resp.partitionErrors.asScala.find(pe => pe.topicName == tp.topic && pe.partitionIndex == tp.partition).get.errorCode)), ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error), @@ -474,6 +478,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { .setZkVersion(2) .setReplicas(Seq(brokerId).asJava) .setIsNew(false)).asJava, + topicIds, Set(new Node(brokerId, "localhost", 0)).asJava).build() } diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala index 837ac9e..7a4d0ba 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.message.StopReplicaResponseData.StopReplicaPartit import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, StopReplicaRequest, StopReplicaResponse, UpdateMetadataRequest, UpdateMetadataResponse} +import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.Assert._ import org.junit.Test @@ -72,6 +73,8 @@ class ControllerChannelManagerTest { assertEquals(1, updateMetadataRequests.size) val leaderAndIsrRequest = leaderAndIsrRequests.head + val topicIds = leaderAndIsrRequest.topicIds(); + val topicNames = topicIds.asScala.map { case (k, v) => (v, k) } assertEquals(controllerId, leaderAndIsrRequest.controllerId) assertEquals(controllerEpoch, leaderAndIsrRequest.controllerEpoch) assertEquals(partitions.keySet, @@ -87,7 +90,10 @@ class ControllerChannelManagerTest { val LeaderAndIsrResponseReceived(leaderAndIsrResponse, brokerId) = batch.sentEvents.head assertEquals(2, brokerId) assertEquals(partitions.keySet, - leaderAndIsrResponse.partitions.asScala.map(p => new TopicPartition(p.topicName, p.partitionIndex)).toSet) + leaderAndIsrResponse.topics.asScala.flatMap(t => t.partitionErrors.asScala.map(p => + new TopicPartition(topicNames(t.topicId), p.partitionIndex))).toSet) + leaderAndIsrResponse.topics.forEach(topic => + assertEquals(topicIds.get(topicNames.get(topic.topicId).get), topic.topicId)) } @Test @@ -157,7 +163,8 @@ class ControllerChannelManagerTest { for (apiVersion <- ApiVersion.allVersions) { val leaderAndIsrRequestVersion: Short = - if (apiVersion >= KAFKA_2_4_IV1) 4 + if (apiVersion >= KAFKA_2_8_IV1) 5 + else if (apiVersion >= KAFKA_2_4_IV1) 4 else if (apiVersion >= KAFKA_2_4_IV0) 3 else if (apiVersion >= KAFKA_2_2_IV0) 2 else if (apiVersion >= KAFKA_1_0_IV0) 1 @@ -187,6 +194,21 @@ class ControllerChannelManagerTest { assertEquals(1, leaderAndIsrRequests.size) assertEquals(s"IBP $interBrokerProtocolVersion should use version $expectedLeaderAndIsrVersion", expectedLeaderAndIsrVersion, leaderAndIsrRequests.head.version) + + val request = leaderAndIsrRequests.head + val byteBuffer = request.serialize + val deserializedRequest = LeaderAndIsrRequest.parse(byteBuffer, expectedLeaderAndIsrVersion) + + if (interBrokerProtocolVersion >= KAFKA_2_8_IV1) { + assertTrue(!request.topicIds().get("foo").equals(Uuid.ZERO_UUID)) + assertTrue(!deserializedRequest.topicIds().get("foo").equals(Uuid.ZERO_UUID)) + } else if (interBrokerProtocolVersion >= KAFKA_2_2_IV0) { + assertTrue(!request.topicIds().get("foo").equals(Uuid.ZERO_UUID)) + assertTrue(deserializedRequest.topicIds().get("foo").equals(Uuid.ZERO_UUID)) + } else { + assertTrue(request.topicIds().get("foo") == null) + assertTrue(deserializedRequest.topicIds().get("foo") == null) + } } @Test @@ -827,15 +849,18 @@ class ControllerChannelManagerTest { private def applyLeaderAndIsrResponseCallbacks(error: Errors, sentRequests: List[SentRequest]): Unit = { sentRequests.filter(_.request.apiKey == ApiKeys.LEADER_AND_ISR).filter(_.responseCallback != null).foreach { sentRequest => val leaderAndIsrRequest = sentRequest.request.build().asInstanceOf[LeaderAndIsrRequest] - val partitionErrors = leaderAndIsrRequest.partitionStates.asScala.map(p => - new LeaderAndIsrPartitionError() - .setTopicName(p.topicName) - .setPartitionIndex(p.partitionIndex) - .setErrorCode(error.code)) + val topicIds = leaderAndIsrRequest.topicIds + val topicErrors = leaderAndIsrRequest.data.topicStates.asScala.map(t => + new LeaderAndIsrTopicError() + .setTopicId(topicIds.get(t.topicName)) + .setPartitionErrors(t.partitionStates.asScala.map(p => + new LeaderAndIsrPartitionError() + .setPartitionIndex(p.partitionIndex) + .setErrorCode(error.code)).asJava)) val leaderAndIsrResponse = new LeaderAndIsrResponse( new LeaderAndIsrResponseData() .setErrorCode(error.code) - .setPartitionErrors(partitionErrors.toBuffer.asJava)) + .setTopics(topicErrors.toBuffer.asJava), leaderAndIsrRequest.version()) sentRequest.responseCallback(leaderAndIsrResponse) } } @@ -871,6 +896,11 @@ class ControllerChannelManagerTest { }.toMap context.setLiveBrokers(brokerEpochs) + context.setAllTopics(topics) + + for (topic <- topics) { + context.addTopicId(topic, Uuid.randomUuid()) + } // Simple round-robin replica assignment var leaderIndex = 0 diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index a13bedc..031000d 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -228,8 +228,8 @@ class LogManagerTest { s.lazyTimeIndex.get }) - // there should be a log file, two indexes, one producer snapshot, and the leader epoch checkpoint - assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 1, log.dir.list.length) + // there should be a log file, two indexes, one producer snapshot, partition metadata, and the leader epoch checkpoint + assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 2, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, readLog(log, offset + 1).records.sizeInBytes) try { @@ -278,8 +278,8 @@ class LogManagerTest { time.sleep(log.config.fileDeleteDelayMs + 1) // there should be a log file, two indexes (the txn index is created lazily), - // and a producer snapshot file per segment, and the leader epoch checkpoint. - assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 1, log.dir.list.length) + // and a producer snapshot file per segment, and the leader epoch checkpoint and partition metadata file. + assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 2, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, readLog(log, offset + 1).records.sizeInBytes) try { readLog(log, 0) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index ce52c6b..b107c21 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -30,9 +30,9 @@ import kafka.log.Log.DeleteDirSuffix import kafka.metrics.KafkaYammerMetrics import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} -import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata} +import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata, PartitionMetadataFile} import kafka.utils._ -import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} +import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.common.errors._ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.MemoryRecords.RecordFilter @@ -2372,6 +2372,21 @@ class LogTest { log.close() } + @Test + def testLogRecoversTopicId(): Unit = { + val logConfig = LogTest.createLogConfig() + var log = createLog(logDir, logConfig) + + val topicId = Uuid.randomUuid() + log.partitionMetadataFile.get.write(topicId) + log.close() + + // test recovery case + log = createLog(logDir, logConfig) + assertTrue(log.topicId == topicId) + log.close() + } + /** * Test building the time index on the follower by setting assignOffsets to false. */ @@ -2907,6 +2922,33 @@ class LogTest { } @Test + def testTopicIdTransfersAfterDirectoryRename(): Unit = { + val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) + val log = createLog(logDir, logConfig) + + // Write a topic ID to the partition metadata file to ensure it is transferred correctly. + val id = Uuid.randomUuid() + log.topicId = id + log.partitionMetadataFile.get.write(id) + + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) + assertEquals(Some(5), log.latestEpoch) + + // Ensure that after a directory rename, the partition metadata file is written to the right location. + val tp = Log.parseTopicPartitionName(log.dir) + log.renameDir(Log.logDeleteDirName(tp)) + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 10) + assertEquals(Some(10), log.latestEpoch) + assertTrue(PartitionMetadataFile.newFile(log.dir).exists()) + assertFalse(PartitionMetadataFile.newFile(this.logDir).exists()) + + // Check the topic ID remains in memory and was copied correctly. + assertEquals(id, log.topicId) + assertTrue(!log.partitionMetadataFile.isEmpty) + assertEquals(id, log.partitionMetadataFile.get.read().topicId) + } + + @Test def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) val log = createLog(logDir, logConfig) diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala index be8766c..e733909 100755 --- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala @@ -25,7 +25,7 @@ import kafka.controller.{ControllerChannelManager, ControllerContext, StateChang import kafka.utils.TestUtils import kafka.utils.TestUtils.createTopic import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} @@ -112,6 +112,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { private def testControlRequestWithBrokerEpoch(epochInRequestDiffFromCurrentEpoch: Long): Unit = { val tp = new TopicPartition("new-topic", 0) + val topicIds = Collections.singletonMap("new-topic", Uuid.randomUuid) // create topic with 1 partition, 2 replicas, one on each broker createTopic(zkClient, tp.topic(), partitionReplicaAssignment = Map(0 -> Seq(brokerId1, brokerId2)), servers = servers) @@ -155,7 +156,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { val requestBuilder = new LeaderAndIsrRequest.Builder( ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, controllerEpoch, epochInRequest, - partitionStates.asJava, nodes.toSet.asJava) + partitionStates.asJava, topicIds, nodes.toSet.asJava) if (epochInRequestDiffFromCurrentEpoch < 0) { // stale broker epoch in LEADER_AND_ISR diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index e37d93b..87a870a 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -65,7 +65,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _} import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.utils.ProducerIdAndEpoch -import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition} +import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.easymock.EasyMock._ import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher} @@ -2681,12 +2681,13 @@ class KafkaApisTest { controllerEpoch, brokerEpochInRequest, partitionStates, + Collections.singletonMap("topicW", Uuid.randomUuid()), asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091)) ).build() val request = buildRequest(leaderAndIsrRequest) val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() .setErrorCode(Errors.NONE.code) - .setPartitionErrors(asList())) + .setPartitionErrors(asList()), leaderAndIsrRequest.version()) EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) EasyMock.expect(replicaManager.becomeLeaderOrFollower( diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index a3eb5d7..fa0b940 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -17,7 +17,9 @@ package kafka.server -import org.apache.kafka.common.TopicPartition +import java.util.Collections + +import org.apache.kafka.common.{TopicPartition, Uuid} import scala.jdk.CollectionConverters._ import kafka.api.LeaderAndIsr @@ -155,7 +157,8 @@ class LeaderElectionTest extends ZooKeeperTestHarness { ) val requestBuilder = new LeaderAndIsrRequest.Builder( ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, staleControllerEpoch, - servers(brokerId2).kafkaController.brokerEpoch, partitionStates.asJava, nodes.toSet.asJava) + servers(brokerId2).kafkaController.brokerEpoch, partitionStates.asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), nodes.toSet.asJava) controllerChannelManager.sendRequest(brokerId2, requestBuilder, staleControllerEpochCallback) TestUtils.waitUntilTrue(() => staleControllerEpochDetected, "Controller epoch should be stale") diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index f73223e..80fdc60 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -21,7 +21,7 @@ import java.io.File import java.net.InetAddress import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.{Optional, Properties} +import java.util.{Collections, Optional, Properties} import kafka.api._ import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, ProducerStateManager} @@ -50,7 +50,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition} +import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid} import org.easymock.EasyMock import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -174,6 +174,7 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1).asJava + val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) val partition = rm.createPartition(new TopicPartition(topic, 0)) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, @@ -190,6 +191,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(brokerList) .setIsNew(false)).asJava, + topicIds, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) rm.getPartitionOrException(new TopicPartition(topic, 0)) @@ -212,6 +214,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(brokerList) .setIsNew(false)).asJava, + topicIds, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) @@ -236,6 +239,7 @@ class ReplicaManagerTest { replicaManager.createPartition(topicPartition) .createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) + val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq(new LeaderAndIsrPartitionState() @@ -248,6 +252,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(brokerList) .setIsNew(true)).asJava, + topicIds, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ()) @@ -307,6 +312,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(brokerList) .setIsNew(true)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) @@ -367,6 +373,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(brokerList) .setIsNew(true)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) @@ -473,6 +480,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(brokerList) .setIsNew(true)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) @@ -549,6 +557,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(brokerList) .setIsNew(false)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build() rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) rm.getPartitionOrException(new TopicPartition(topic, 0)) @@ -605,6 +614,7 @@ class ReplicaManagerTest { .setIsNew(true) val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq(leaderAndIsrPartitionState).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) assertEquals(Errors.NONE, leaderAndIsrResponse.error) @@ -696,6 +706,7 @@ class ReplicaManagerTest { replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava val partition1Replicas = Seq[Integer](0, 2).asJava + val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq( new LeaderAndIsrPartitionState() @@ -719,6 +730,7 @@ class ReplicaManagerTest { .setReplicas(partition1Replicas) .setIsNew(true) ).asJava, + topicIds, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) @@ -832,6 +844,7 @@ class ReplicaManagerTest { val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, controllerEpoch, brokerEpoch, Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(followerBrokerId, "host1", 0), new Node(leaderBrokerId, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0, @@ -908,6 +921,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(brokerList) .setIsNew(false)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) @@ -957,6 +971,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(brokerList) .setIsNew(false)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) @@ -1007,6 +1022,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(brokerList) .setIsNew(false)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) @@ -1088,6 +1104,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(partition0Replicas) .setIsNew(true)).asJava, + Collections.singletonMap(tp0.topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) @@ -1128,6 +1145,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(partition0Replicas) .setIsNew(true)).asJava, + Collections.singletonMap(tp0.topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) @@ -1161,6 +1179,7 @@ class ReplicaManagerTest { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava + val topicIds = Collections.singletonMap(tp0.topic, Uuid.randomUuid()) val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq(new LeaderAndIsrPartitionState() @@ -1173,6 +1192,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(partition0Replicas) .setIsNew(true)).asJava, + topicIds, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) @@ -1193,6 +1213,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(partition0Replicas) .setIsNew(true)).asJava, + topicIds, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) @@ -1209,6 +1230,7 @@ class ReplicaManagerTest { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava + val topicIds = Collections.singletonMap(tp0.topic, Uuid.randomUuid()) val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq(new LeaderAndIsrPartitionState() @@ -1221,6 +1243,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(partition0Replicas) .setIsNew(true)).asJava, + topicIds, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) @@ -1242,6 +1265,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(partition0Replicas) .setIsNew(true)).asJava, + topicIds, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) @@ -1269,6 +1293,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(partition0Replicas) .setIsNew(true)).asJava, + Collections.singletonMap(tp0.topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) @@ -1311,6 +1336,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(partition0Replicas) .setIsNew(true)).asJava, + Collections.singletonMap(tp0.topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) @@ -1354,6 +1380,7 @@ class ReplicaManagerTest { .setZkVersion(0) .setReplicas(partition0Replicas) .setIsNew(true)).asJava, + Collections.singletonMap(tp0.topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) @@ -1732,6 +1759,7 @@ class ReplicaManagerTest { val tp1 = new TopicPartition(topic, 1) val partition0Replicas = Seq[Integer](0, 1).asJava val partition1Replicas = Seq[Integer](1, 0).asJava + val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, 0, brokerEpoch, @@ -1757,6 +1785,7 @@ class ReplicaManagerTest { .setReplicas(partition1Replicas) .setIsNew(true) ).asJava, + topicIds, Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build() rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ()) @@ -1787,6 +1816,7 @@ class ReplicaManagerTest { .setReplicas(partition1Replicas) .setIsNew(true) ).asJava, + topicIds, Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build() rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ()) @@ -1821,6 +1851,7 @@ class ReplicaManagerTest { val tp1 = new TopicPartition(topic, 1) val partition0Replicas = Seq[Integer](1, 0).asJava val partition1Replicas = Seq[Integer](1, 0).asJava + val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, 0, brokerEpoch, @@ -1846,6 +1877,7 @@ class ReplicaManagerTest { .setReplicas(partition1Replicas) .setIsNew(true) ).asJava, + topicIds, Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build() rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ()) @@ -1876,6 +1908,7 @@ class ReplicaManagerTest { .setReplicas(partition1Replicas) .setIsNew(true) ).asJava, + topicIds, Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build() rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ()) @@ -1935,6 +1968,7 @@ class ReplicaManagerTest { val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 10, brokerEpoch, Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava ).build() @@ -1961,6 +1995,7 @@ class ReplicaManagerTest { val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava ).build() @@ -2110,6 +2145,7 @@ class ReplicaManagerTest { val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava, + Collections.singletonMap(tp0.topic(), Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava ).build() @@ -2176,4 +2212,100 @@ class ReplicaManagerTest { replicaManager.shutdown(false) } } + + @Test + def testPartitionMetadataFile() = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) + try { + val brokerList = Seq[Integer](0, 1).asJava + val topicPartition = new TopicPartition(topic, 0) + replicaManager.createPartition(topicPartition) + .createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) + val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) + + def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(epoch) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(true)).asJava, + topicIds, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ()) + assertFalse(replicaManager.localLog(topicPartition).isEmpty) + val id = topicIds.get(topicPartition.topic()) + val log = replicaManager.localLog(topicPartition).get + assertFalse(log.partitionMetadataFile.isEmpty) + assertFalse(log.partitionMetadataFile.get.isEmpty()) + val partitionMetadata = log.partitionMetadataFile.get.read() + + // Current version of PartitionMetadataFile is 0. + assertEquals(0, partitionMetadata.version) + assertEquals(id, partitionMetadata.topicId) + } finally replicaManager.shutdown(checkpointHW = false) + } + + @Test + def testPartitionMetadataFileNotCreated() = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) + try { + val brokerList = Seq[Integer](0, 1).asJava + val topicPartition = new TopicPartition(topic, 0) + val topicPartitionFoo = new TopicPartition("foo", 0) + replicaManager.createPartition(topicPartition) + .createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) + val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" -> Uuid.randomUuid()).asJava + + def leaderAndIsrRequest(epoch: Int, name: String, version: Short): LeaderAndIsrRequest = LeaderAndIsrRequest.parse( + new LeaderAndIsrRequest.Builder(version, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(name) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(epoch) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(true)).asJava, + topicIds, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build().serialize(), version) + + // The file has no contents if the topic does not have an associated topic ID. + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ()) + assertFalse(replicaManager.localLog(topicPartition).isEmpty) + val log = replicaManager.localLog(topicPartition).get + assertFalse(log.partitionMetadataFile.isEmpty) + assertTrue(log.partitionMetadataFile.get.isEmpty()) + + // The file has no contents if the topic has the default UUID. + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ()) + assertFalse(replicaManager.localLog(topicPartition).isEmpty) + val log2 = replicaManager.localLog(topicPartition).get + assertFalse(log2.partitionMetadataFile.isEmpty) + assertTrue(log2.partitionMetadataFile.get.isEmpty()) + + // The file has no contents if the request is an older version + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 0), (_, _) => ()) + assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty) + val log3 = replicaManager.localLog(topicPartitionFoo).get + assertFalse(log3.partitionMetadataFile.isEmpty) + assertTrue(log3.partitionMetadataFile.get.isEmpty()) + + // The file has no contents if the request is an older version + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 4), (_, _) => ()) + assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty) + val log4 = replicaManager.localLog(topicPartitionFoo).get + assertFalse(log4.partitionMetadataFile.isEmpty) + assertTrue(log4.partitionMetadataFile.get.isEmpty()) + } finally replicaManager.shutdown(checkpointHW = false) + } } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 9ea4039..ed480c5 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -62,6 +62,7 @@ class RequestQuotaTest extends BaseRequestTest { private val topic = "topic-1" private val numPartitions = 1 private val tp = new TopicPartition(topic, 0) + private val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) private val logDir = "logDir" private val unthrottledClientId = "unthrottled-client" private val smallQuotaProducerClientId = "small-quota-producer-client" @@ -254,6 +255,7 @@ class RequestQuotaTest extends BaseRequestTest { .setZkVersion(2) .setReplicas(Seq(brokerId).asJava) .setIsNew(true)).asJava, + topicIds, Set(new Node(brokerId, "localhost", 0)).asJava) case ApiKeys.STOP_REPLICA => diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 436dc9e..ce2ceda 100755 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -21,6 +21,7 @@ import kafka.utils.{CoreUtils, TestUtils} import kafka.utils.TestUtils._ import java.io.{DataInputStream, File} import java.net.ServerSocket +import java.util.Collections import java.util.concurrent.{Executors, TimeUnit} import kafka.cluster.Broker @@ -29,6 +30,7 @@ import kafka.log.LogManager import kafka.zookeeper.ZooKeeperClientTimeoutException import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.Uuid import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName @@ -233,7 +235,8 @@ class ServerShutdownTest extends ZooKeeperTestHarness { // Initiate a sendRequest and wait until connection is established and one byte is received by the peer val requestBuilder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, - controllerId, 1, 0L, Seq.empty.asJava, brokerAndEpochs.keys.map(_.node(listenerName)).toSet.asJava) + controllerId, 1, 0L, Seq.empty.asJava, Collections.singletonMap(topic, Uuid.randomUuid()), + brokerAndEpochs.keys.map(_.node(listenerName)).toSet.asJava) controllerChannelManager.sendRequest(1, requestBuilder) receiveFuture.get(10, TimeUnit.SECONDS) diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java index 97a27d9..1d51585 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java @@ -1891,7 +1891,6 @@ public final class MessageDataGenerator implements MessageClassGenerator { prefix, field.camelCaseName(), field.camelCaseName()); } else if (field.type().isStruct() || field.type() instanceof FieldType.UUIDFieldType) { - } else if (field.type().isStruct()) { buffer.printf("+ \"%s%s=\" + %s.toString()%n", prefix, field.camelCaseName(), field.camelCaseName()); } else if (field.type().isArray()) {