This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 99e1d8fbb30 MINOR: Cache topic resolution in TopicIds set (#17285)
99e1d8fbb30 is described below

commit 99e1d8fbb30c8c132cd9baec016efb833b6036ec
Author: Sean Quah <[email protected]>
AuthorDate: Thu Oct 3 08:40:25 2024 +0100

    MINOR: Cache topic resolution in TopicIds set (#17285)
    
    Looking up topics in a TopicsImage is relatively slow. Cache the results
    in TopicIds to improve assignor performance. In benchmarks, we see a
    noticeable improvement in performance in the heterogeneous case.
    
    Before
    ```
    Benchmark                                       (assignmentType)  
(assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  
(subscriptionType)  (topicCount)  Mode  Cnt    Score   Error  Units
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           
RANGE          false          10000                         10         
HOMOGENEOUS          1000  avgt    5   36.400 ± 3.004  ms/op
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           
RANGE          false          10000                         10       
HETEROGENEOUS          1000  avgt    5  158.340 ± 0.825  ms/op
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         
UNIFORM          false          10000                         10         
HOMOGENEOUS          1000  avgt    5    1.329 ± 0.041  ms/op
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         
UNIFORM          false          10000                         10       
HETEROGENEOUS          1000  avgt    5  382.901 ± 6.203  ms/op
    ```
    
    After
    ```
    Benchmark                                       (assignmentType)  
(assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  
(subscriptionType)  (topicCount)  Mode  Cnt    Score   Error  Units
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           
RANGE          false          10000                         10         
HOMOGENEOUS          1000  avgt    5   36.465 ± 1.954  ms/op
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           
RANGE          false          10000                         10       
HETEROGENEOUS          1000  avgt    5  114.043 ± 1.424  ms/op
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         
UNIFORM          false          10000                         10         
HOMOGENEOUS          1000  avgt    5    1.454 ± 0.019  ms/op
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         
UNIFORM          false          10000                         10       
HETEROGENEOUS          1000  avgt    5  342.840 ± 2.744  ms/op
    ```
    
    ---
    
    Based heavily on https://github.com/apache/kafka/pull/16527.
    
    Reviewers: David Arthur <[email protected]>, David Jacot 
<[email protected]>
---
 .../group/modern/TargetAssignmentBuilder.java      |   9 +-
 .../kafka/coordinator/group/modern/TopicIds.java   | 168 +++++++++++++++++++--
 .../group/modern/TargetAssignmentBuilderTest.java  |   8 +-
 .../coordinator/group/modern/TopicIdsTest.java     |   7 +-
 .../kafka/jmh/assignor/AssignorBenchmarkUtils.java |   6 +-
 .../jmh/assignor/ServerSideAssignorBenchmark.java  |   8 +-
 .../assignor/TargetAssignmentBuilderBenchmark.java |   6 +-
 7 files changed, 182 insertions(+), 30 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
index 930cdfb2e7c..ba08a236ba6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
@@ -327,13 +327,14 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
      */
     public TargetAssignmentResult build() throws PartitionAssignorException {
         Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new 
HashMap<>();
+        TopicIds.TopicResolver topicResolver = new 
TopicIds.CachedTopicResolver(topicsImage);
 
         // Prepare the member spec for all members.
         members.forEach((memberId, member) ->
             memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
                 member,
                 targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
-                topicsImage
+                topicResolver
             ))
         );
 
@@ -355,7 +356,7 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
                 memberSpecs.put(memberId, 
createMemberSubscriptionAndAssignment(
                     updatedMemberOrNull,
                     assignment,
-                    topicsImage
+                    topicResolver
                 ));
             }
         });
@@ -420,12 +421,12 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
     static <T extends ModernGroupMember> MemberSubscriptionAndAssignmentImpl 
createMemberSubscriptionAndAssignment(
         T member,
         Assignment memberAssignment,
-        TopicsImage topicsImage
+        TopicIds.TopicResolver topicResolver
     ) {
         return new MemberSubscriptionAndAssignmentImpl(
             Optional.ofNullable(member.rackId()),
             Optional.ofNullable(member.instanceId()),
-            new TopicIds(member.subscribedTopicNames(), topicsImage),
+            new TopicIds(member.subscribedTopicNames(), topicResolver),
             memberAssignment
         );
     }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java
index baf4e8ce778..f45735a527c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java
@@ -21,25 +21,161 @@ import org.apache.kafka.image.TopicImage;
 import org.apache.kafka.image.TopicsImage;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Set;
 
 /**
  * TopicIds is initialized with topic names (String) but exposes a Set of 
topic ids (Uuid) to the
- * user and performs the conversion lazily with TopicsImage.
+ * user and performs the conversion lazily with a TopicResolver backed by a 
TopicsImage.
  */
 public class TopicIds implements Set<Uuid> {
+    /**
+     * Converts between topic ids (Uuids) and topic names (Strings).
+     */
+    public interface TopicResolver {
+        /**
+         * @return The TopicsImage used by the resolver.
+         */
+        TopicsImage image();
+
+        /**
+         * Converts a topic id to a topic name.
+         *
+         * @param id The topic id.
+         * @return The topic name for the given topic id, or null if the topic 
does not exist.
+         */
+        String name(Uuid id);
+
+        /**
+         * Converts a topic name to a topic id.
+         *
+         * @param name The topic name.
+         * @return The topic id for the given topic name, or null if the topic 
does not exist.
+         */
+        Uuid id(String name);
+
+        /**
+         * Clears any cached data.
+         *
+         * Used for benchmarking purposes.
+         */
+        void clear();
+    }
+
+    /**
+     * A TopicResolver without any caching.
+     */
+    public static class DefaultTopicResolver implements TopicResolver {
+        private final TopicsImage image;
+
+        public DefaultTopicResolver(
+            TopicsImage image
+        ) {
+            this.image = Objects.requireNonNull(image);
+        }
+
+        @Override
+        public final TopicsImage image() {
+            return image;
+        }
+
+        @Override
+        public String name(Uuid id) {
+            TopicImage topic = image.getTopic(id);
+            if (topic == null) return null;
+            return topic.name();
+        }
+
+        @Override
+        public Uuid id(String name) {
+            TopicImage topic = image.getTopic(name);
+            if (topic == null) return null;
+            return topic.id();
+        }
+
+        @Override
+        public void clear() {}
+
+        @Override
+        public String toString() {
+            return "DefaultTopicResolver(image=" + image + ")";
+        }
+    }
+
+    /**
+     * A TopicResolver that caches results.
+     *
+     * This cache is expected to be short-lived and only used within a single
+     * TargetAssignmentBuilder.build() call.
+     */
+    public static class CachedTopicResolver implements TopicResolver {
+        private final TopicsImage image;
+
+        private final Map<String, Uuid> topicIds = new HashMap<>();
+        private final Map<Uuid, String> topicNames = new HashMap<>();
+
+        public CachedTopicResolver(
+            TopicsImage image
+        ) {
+            this.image = Objects.requireNonNull(image);
+        }
+
+        @Override
+        public final TopicsImage image() {
+            return image;
+        }
+
+        @Override
+        public String name(Uuid id) {
+            return topicNames.computeIfAbsent(id, __ -> {
+                TopicImage topic = image.getTopic(id);
+                if (topic == null) return null;
+                return topic.name();
+            });
+        }
+
+        @Override
+        public Uuid id(String name) {
+            return topicIds.computeIfAbsent(name, __ -> {
+                TopicImage topic = image.getTopic(name);
+                if (topic == null) return null;
+                return topic.id();
+            });
+        }
+
+        @Override
+        public void clear() {
+            this.topicNames.clear();
+            this.topicIds.clear();
+        }
+
+        @Override
+        public String toString() {
+            return "CachedTopicResolver(image=" + image + ")";
+        }
+    }
+
     private final Set<String> topicNames;
-    private final TopicsImage image;
+    private final TopicResolver resolver;
 
     public TopicIds(
         Set<String> topicNames,
         TopicsImage image
     ) {
         this.topicNames = Objects.requireNonNull(topicNames);
-        this.image = Objects.requireNonNull(image);
+        this.resolver = new DefaultTopicResolver(image);
+    }
+
+    public TopicIds(
+        Set<String> topicNames,
+        TopicResolver resolver
+    ) {
+        this.topicNames = Objects.requireNonNull(topicNames);
+        this.resolver = Objects.requireNonNull(resolver);
     }
 
     @Override
@@ -56,24 +192,24 @@ public class TopicIds implements Set<Uuid> {
     public boolean contains(Object o) {
         if (o instanceof Uuid) {
             Uuid topicId = (Uuid) o;
-            TopicImage topicImage = image.getTopic(topicId);
-            if (topicImage == null) return false;
-            return topicNames.contains(topicImage.name());
+            String topicName = resolver.name(topicId);
+            if (topicName == null) return false;
+            return topicNames.contains(topicName);
         }
         return false;
     }
 
     private static class TopicIdIterator implements Iterator<Uuid> {
         final Iterator<String> iterator;
-        final TopicsImage image;
+        final TopicResolver resolver;
         private Uuid next = null;
 
         private TopicIdIterator(
             Iterator<String> iterator,
-            TopicsImage image
+            TopicResolver resolver
         ) {
             this.iterator = Objects.requireNonNull(iterator);
-            this.image = Objects.requireNonNull(image);
+            this.resolver = Objects.requireNonNull(resolver);
         }
 
         @Override
@@ -85,9 +221,9 @@ public class TopicIds implements Set<Uuid> {
                     return false;
                 }
                 String next = iterator.next();
-                TopicImage topicImage = image.getTopic(next);
-                if (topicImage != null) {
-                    result = topicImage.id();
+                Uuid topicId = resolver.id(next);
+                if (topicId != null) {
+                    result = topicId;
                 }
             } while (result == null);
             next = result;
@@ -105,7 +241,7 @@ public class TopicIds implements Set<Uuid> {
 
     @Override
     public Iterator<Uuid> iterator() {
-        return new TopicIdIterator(topicNames.iterator(), image);
+        return new TopicIdIterator(topicNames.iterator(), resolver);
     }
 
     @Override
@@ -164,20 +300,20 @@ public class TopicIds implements Set<Uuid> {
         TopicIds uuids = (TopicIds) o;
 
         if (!Objects.equals(topicNames, uuids.topicNames)) return false;
-        return Objects.equals(image, uuids.image);
+        return Objects.equals(resolver.image(), uuids.resolver.image());
     }
 
     @Override
     public int hashCode() {
         int result = topicNames.hashCode();
-        result = 31 * result + image.hashCode();
+        result = 31 * result + resolver.image().hashCode();
         return result;
     }
 
     @Override
     public String toString() {
         return "TopicIds(topicNames=" + topicNames +
-            ", image=" + image +
+            ", resolver=" + resolver +
             ')';
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
index 093145650b9..e3445663df8 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
@@ -161,6 +161,7 @@ public class TargetAssignmentBuilderTest {
 
         public TargetAssignmentBuilder.TargetAssignmentResult build() {
             TopicsImage topicsImage = topicsImageBuilder.build().topics();
+            TopicIds.TopicResolver topicResolver = new 
TopicIds.CachedTopicResolver(topicsImage);
             // Prepare expected member specs.
             Map<String, MemberSubscriptionAndAssignmentImpl> 
memberSubscriptions = new HashMap<>();
 
@@ -169,7 +170,7 @@ public class TargetAssignmentBuilderTest {
                 memberSubscriptions.put(memberId, 
createMemberSubscriptionAndAssignment(
                     member,
                     targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
-                    topicsImage
+                    topicResolver
                 ))
             );
 
@@ -192,7 +193,7 @@ public class TargetAssignmentBuilderTest {
                     memberSubscriptions.put(memberId, 
createMemberSubscriptionAndAssignment(
                         updatedMemberOrNull,
                         assignment,
-                        topicsImage
+                        topicResolver
                     ));
                 }
             });
@@ -263,6 +264,7 @@ public class TargetAssignmentBuilderTest {
             .addTopic(barTopicId, "bar", 5)
             .build()
             .topics();
+        TopicIds.TopicResolver topicResolver = new 
TopicIds.DefaultTopicResolver(topicsImage);
 
         ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
             .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
@@ -278,7 +280,7 @@ public class TargetAssignmentBuilderTest {
         MemberSubscription subscriptionSpec = 
createMemberSubscriptionAndAssignment(
             member,
             assignment,
-            topicsImage
+            topicResolver
         );
 
         assertEquals(new MemberSubscriptionAndAssignmentImpl(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java
index 876131fa51c..0706624a97a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java
@@ -41,7 +41,12 @@ public class TopicIdsTest {
 
     @Test
     public void testTopicsImageCannotBeNull() {
-        assertThrows(NullPointerException.class, () -> new 
TopicIds(Collections.emptySet(), null));
+        assertThrows(NullPointerException.class, () -> new 
TopicIds(Collections.emptySet(), (TopicsImage) null));
+    }
+
+    @Test
+    public void testTopicResolverCannotBeNull() {
+        assertThrows(NullPointerException.class, () -> new 
TopicIds(Collections.emptySet(), (TopicIds.TopicResolver) null));
     }
 
     @Test
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
index e3dceb19c2a..9402728e1eb 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
@@ -159,13 +159,13 @@ public class AssignorBenchmarkUtils {
      *
      * @param members               The ConsumerGroupMembers.
      * @param subscriptionType      The group's subscription type.
-     * @param topicsImage           The TopicsImage to use.
+     * @param topicResolver         The TopicResolver to use.
      * @return The new GroupSpec.
      */
     public static GroupSpec createGroupSpec(
         Map<String, ConsumerGroupMember> members,
         SubscriptionType subscriptionType,
-        TopicsImage topicsImage
+        TopicIds.TopicResolver topicResolver
     ) {
         Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new 
HashMap<>();
 
@@ -177,7 +177,7 @@ public class AssignorBenchmarkUtils {
             memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
                 Optional.ofNullable(member.rackId()),
                 Optional.ofNullable(member.instanceId()),
-                new TopicIds(member.subscribedTopicNames(), topicsImage),
+                new TopicIds(member.subscribedTopicNames(), topicResolver),
                 new Assignment(member.assignedPartitions())
             ));
         }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
index 1ad30e320b5..971828c1c47 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
@@ -129,6 +129,8 @@ public class ServerSideAssignorBenchmark {
 
     private TopicsImage topicsImage = TopicsImage.EMPTY;
 
+    private TopicIds.TopicResolver topicResolver;
+
     private SubscribedTopicDescriber subscribedTopicDescriber;
 
     @Setup(Level.Trial)
@@ -138,7 +140,7 @@ public class ServerSideAssignorBenchmark {
         setupTopics();
 
         Map<String, ConsumerGroupMember> members = createMembers();
-        this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, 
subscriptionType, topicsImage);
+        this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, 
subscriptionType, topicResolver);
 
         if (assignmentType == AssignmentType.INCREMENTAL) {
             simulateIncrementalRebalance();
@@ -155,6 +157,7 @@ public class ServerSideAssignorBenchmark {
         );
 
         topicsImage = 
AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
+        topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
 
         Map<Uuid, TopicMetadata> topicMetadata = 
AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata);
         subscribedTopicDescriber = new 
SubscribedTopicDescriberImpl(topicMetadata);
@@ -229,7 +232,7 @@ public class ServerSideAssignorBenchmark {
         if (subscriptionType == HETEROGENEOUS) {
             subscribedTopicIdsForNewMember = 
updatedMemberSpec.get(memberId(memberCount - 2)).subscribedTopicIds();
         } else {
-            subscribedTopicIdsForNewMember = new TopicIds(new 
HashSet<>(allTopicNames), topicsImage);
+            subscribedTopicIdsForNewMember = new TopicIds(new 
HashSet<>(allTopicNames), topicResolver);
         }
 
         Optional<String> rackId = rackId(memberCount - 1);
@@ -251,6 +254,7 @@ public class ServerSideAssignorBenchmark {
     @Threads(1)
     @OutputTimeUnit(TimeUnit.MILLISECONDS)
     public void doAssignment() {
+        topicResolver.clear();
         partitionAssignor.assign(groupSpec, subscribedTopicDescriber);
     }
 }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index a06e9b0e527..2a23d22b655 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.coordinator.group.assignor.UniformAssignor;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
 import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.modern.TopicIds;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import org.apache.kafka.image.TopicsImage;
@@ -96,6 +97,8 @@ public class TargetAssignmentBuilderBenchmark {
 
     private TopicsImage topicsImage;
 
+    private TopicIds.TopicResolver topicResolver;
+
     private SubscribedTopicDescriber subscribedTopicDescriber;
 
     @Setup(Level.Trial)
@@ -133,6 +136,7 @@ public class TargetAssignmentBuilderBenchmark {
         );
 
         topicsImage = 
AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
+        topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
 
         Map<Uuid, TopicMetadata> topicMetadata = 
AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata);
         subscribedTopicDescriber = new 
SubscribedTopicDescriberImpl(topicMetadata);
@@ -144,7 +148,7 @@ public class TargetAssignmentBuilderBenchmark {
         this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(
             members,
             subscriptionType,
-            topicsImage
+            topicResolver
         );
 
         GroupAssignment groupAssignment = partitionAssignor.assign(

Reply via email to