This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch KAFKA-13280 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 8f2c2316923d4e03959bfb58cfbb218a0390ba68 Author: Colin P. Mccabe <[email protected]> AuthorDate: Wed Sep 8 13:35:08 2021 -0700 KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and KRaftMetadataCache#topicIdsToNames by returning a map subclass that exposes the TopicsImage data structures without copying them. --- .../kafka/server/metadata/KRaftMetadataCache.scala | 11 +- .../java/org/apache/kafka/image/TopicsImage.java | 148 +++++++++++++++++++++ .../org/apache/kafka/image/TopicsImageTest.java | 62 +++++++-- 3 files changed, 204 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index b7fbd17..1ff7a80 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -236,18 +236,13 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w map(topic => topic.partitions().size()) } - override def topicNamesToIds(): util.Map[String, Uuid] = { - _currentImage.topics.topicsByName().asScala.map{ case (topicName, topicImage) => (topicName, topicImage.id())}.asJava - } + override def topicNamesToIds(): util.Map[String, Uuid] = _currentImage.topics.topicNameToIdView() - override def topicIdsToNames(): util.Map[Uuid, String] = { - _currentImage.topics.topicsById().asScala.map{ case (topicId, topicImage) => (topicId, topicImage.name())}.asJava - } + override def topicIdsToNames(): util.Map[Uuid, String] = _currentImage.topics.topicIdToNameView() override def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = { val image = _currentImage - (image.topics.topicsByName().asScala.map{ case (topicName, topicImage) => (topicName, topicImage.id())}.asJava, - image.topics.topicsById().asScala.map{ case (topicId, topicImage) => (topicId, topicImage.name())}.asJava) + (image.topics.topicNameToIdView(), image.topics.topicIdToNameView()) } // if the leader is not known, return None; diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java index 59b31a7..db8dc70 100644 --- a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java @@ -21,10 +21,16 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.common.ApiMessageAndVersion; +import java.util.AbstractMap; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.AbstractSet; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; +import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -92,6 +98,148 @@ public final class TopicsImage { return Objects.hash(topicsById, topicsByName); } + /** + * Expose a view of this TopicsImage as a map from topic names to IDs. + * + * Like TopicsImage itself, this map is immutable. + */ + public Map<String, Uuid> topicNameToIdView() { + return new TopicNameToIdMap(); + } + + class TopicNameToIdMap extends AbstractMap<String, Uuid> { + private final TopicNameToIdMapEntrySet set = new TopicNameToIdMapEntrySet(); + + @Override + public boolean containsKey(Object key) { + return topicsByName.containsKey(key); + } + + @Override + public Uuid get(Object key) { + TopicImage image = topicsByName.get(key); + if (image == null) return null; + return image.id(); + } + + @Override + public Set<Entry<String, Uuid>> entrySet() { + return set; + } + } + + class TopicNameToIdMapEntrySet extends AbstractSet<Entry<String, Uuid>> { + @Override + public Iterator<Entry<String, Uuid>> iterator() { + return new TopicNameToIdMapEntrySetIterator(topicsByName.entrySet().iterator()); + } + + @SuppressWarnings("rawtypes") + @Override + public boolean contains(Object o) { + if (!(o instanceof Entry)) return false; + Entry other = (Entry) o; + TopicImage image = topicsByName.get(other.getKey()); + if (image == null) return false; + return image.id().equals(other.getValue()); + } + + @Override + public int size() { + return topicsByName.size(); + } + } + + static class TopicNameToIdMapEntrySetIterator implements Iterator<Entry<String, Uuid>> { + private final Iterator<Entry<String, TopicImage>> iterator; + + TopicNameToIdMapEntrySetIterator(Iterator<Entry<String, TopicImage>> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return this.iterator.hasNext(); + } + + @Override + public Entry<String, Uuid> next() { + Entry<String, TopicImage> entry = iterator.next(); + return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue().id()); + } + } + + /** + * Expose a view of this TopicsImage as a map from IDs to names. + * + * Like TopicsImage itself, this map is immutable. + */ + public Map<Uuid, String> topicIdToNameView() { + return new TopicIdToNameMap(); + } + + class TopicIdToNameMap extends AbstractMap<Uuid, String> { + private final TopicIdToNameMapEntrySet set = new TopicIdToNameMapEntrySet(); + + @Override + public boolean containsKey(Object key) { + return topicsById.containsKey(key); + } + + @Override + public String get(Object key) { + TopicImage image = topicsById.get(key); + if (image == null) return null; + return image.name(); + } + + @Override + public Set<Entry<Uuid, String>> entrySet() { + return set; + } + } + + class TopicIdToNameMapEntrySet extends AbstractSet<Entry<Uuid, String>> { + @Override + public Iterator<Entry<Uuid, String>> iterator() { + return new TopicIdToNameEntrySetIterator(topicsById.entrySet().iterator()); + } + + @SuppressWarnings("rawtypes") + @Override + public boolean contains(Object o) { + if (!(o instanceof Entry)) return false; + Entry other = (Entry) o; + TopicImage image = topicsById.get(other.getKey()); + if (image == null) return false; + return image.name().equals(other.getValue()); + } + + @Override + public int size() { + return topicsById.size(); + } + } + + static class TopicIdToNameEntrySetIterator implements Iterator<Entry<Uuid, String>> { + private final Iterator<Entry<Uuid, TopicImage>> iterator; + + TopicIdToNameEntrySetIterator(Iterator<Entry<Uuid, TopicImage>> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return this.iterator.hasNext(); + } + + @Override + public Entry<Uuid, String> next() { + Entry<Uuid, TopicImage> entry = iterator.next(); + return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue().name()); + } + } + @Override public String toString() { return "TopicsImage(topicsById=" + topicsById.entrySet().stream(). diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java index 1b5fbef..91cdd5b 100644 --- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java @@ -38,12 +38,16 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.UUID; import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD; import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD; import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(value = 40) @@ -83,16 +87,22 @@ public class TopicsImageTest { return map; } + private static final Uuid FOO_UUID = Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"); + + private static final Uuid BAR_UUID = Uuid.fromString("f62ptyETTjet8SL5ZeREiw"); + + private static final Uuid BAZ_UUID = Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"); + static { TOPIC_IMAGES1 = Arrays.asList( - newTopicImage("foo", Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"), + newTopicImage("foo", FOO_UUID, new PartitionRegistration(new int[] {2, 3, 4}, new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345), new PartitionRegistration(new int[] {3, 4, 5}, new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684), new PartitionRegistration(new int[] {2, 4, 5}, new int[] {2, 4, 5}, Replicas.NONE, Replicas.NONE, 2, 10, 84)), - newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"), + newTopicImage("bar", BAR_UUID, new PartitionRegistration(new int[] {0, 1, 2, 3, 4}, new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 0, 1, 345))); @@ -100,18 +110,18 @@ public class TopicsImageTest { DELTA1_RECORDS = new ArrayList<>(); DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord(). - setTopicId(Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA")), + setTopicId(FOO_UUID), REMOVE_TOPIC_RECORD.highestSupportedVersion())); DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionChangeRecord(). - setTopicId(Uuid.fromString("f62ptyETTjet8SL5ZeREiw")). + setTopicId(BAR_UUID). setPartitionId(0).setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())); DELTA1_RECORDS.add(new ApiMessageAndVersion(new TopicRecord(). - setName("baz").setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")), + setName("baz").setTopicId(BAZ_UUID), TOPIC_RECORD.highestSupportedVersion())); DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionRecord(). setPartitionId(0). - setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")). + setTopicId(BAZ_UUID). setReplicas(Arrays.asList(1, 2, 3, 4)). setIsr(Arrays.asList(3, 4)). setRemovingReplicas(Collections.singletonList(2)). @@ -124,10 +134,10 @@ public class TopicsImageTest { RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); List<TopicImage> topics2 = Arrays.asList( - newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"), + newTopicImage("bar", BAR_UUID, new PartitionRegistration(new int[] {0, 1, 2, 3, 4}, new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 1, 2, 346)), - newTopicImage("baz", Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"), + newTopicImage("baz", BAZ_UUID, new PartitionRegistration(new int[] {1, 2, 3, 4}, new int[] {3, 4}, new int[] {2}, new int[] {1}, 3, 2, 1))); IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2)); @@ -177,7 +187,7 @@ public class TopicsImageTest { new ApiMessageAndVersion( new PartitionRecord() .setPartitionId(1) - .setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")) + .setTopicId(BAZ_UUID) .setReplicas(Arrays.asList(4, 2, localId)) .setIsr(Arrays.asList(4, 2, localId)) .setLeader(4) @@ -375,4 +385,38 @@ public class TopicsImageTest { TopicsImage nextImage = delta.apply(); assertEquals(image, nextImage); } + + @Test + public void testTopicNameToIdView() { + Map<String, Uuid> map = IMAGE1.topicNameToIdView(); + assertTrue(map.containsKey("foo")); + assertEquals(FOO_UUID, map.get("foo")); + assertTrue(map.containsKey("bar")); + assertEquals(BAR_UUID, map.get("bar")); + assertFalse(map.containsKey("baz")); + assertEquals(null, map.get("baz")); + HashSet<Uuid> uuids = new HashSet<>(); + map.values().iterator().forEachRemaining(u -> uuids.add(u)); + HashSet<Uuid> expectedUuids = new HashSet<>(Arrays.asList( + Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"), + Uuid.fromString("f62ptyETTjet8SL5ZeREiw"))); + assertEquals(expectedUuids, uuids); + assertThrows(UnsupportedOperationException.class, () -> map.remove("foo")); + } + + @Test + public void testTopicIdToNameView() { + Map<Uuid, String> map = IMAGE1.topicIdToNameView(); + assertTrue(map.containsKey(FOO_UUID)); + assertEquals("foo", map.get(FOO_UUID)); + assertTrue(map.containsKey(BAR_UUID)); + assertEquals("bar", map.get(BAR_UUID)); + assertFalse(map.containsKey(BAZ_UUID)); + assertEquals(null, map.get(BAZ_UUID)); + HashSet<String> names = new HashSet<>(); + map.values().iterator().forEachRemaining(n -> names.add(n)); + HashSet<String> expectedNames = new HashSet<>(Arrays.asList("foo", "bar")); + assertEquals(expectedNames, names); + assertThrows(UnsupportedOperationException.class, () -> map.remove(FOO_UUID)); + } }
