This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 99e1d8fbb30 MINOR: Cache topic resolution in TopicIds set (#17285)
99e1d8fbb30 is described below
commit 99e1d8fbb30c8c132cd9baec016efb833b6036ec
Author: Sean Quah <[email protected]>
AuthorDate: Thu Oct 3 08:40:25 2024 +0100
MINOR: Cache topic resolution in TopicIds set (#17285)
Looking up topics in a TopicsImage is relatively slow. Cache the results
in TopicIds to improve assignor performance. In benchmarks, we see a
noticeable improvement in performance in the heterogeneous case.
Before
```
Benchmark (assignmentType)
(assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio)
(subscriptionType) (topicCount) Mode Cnt Score Error Units
ServerSideAssignorBenchmark.doAssignment INCREMENTAL
RANGE false 10000 10
HOMOGENEOUS 1000 avgt 5 36.400 ± 3.004 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL
RANGE false 10000 10
HETEROGENEOUS 1000 avgt 5 158.340 ± 0.825 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL
UNIFORM false 10000 10
HOMOGENEOUS 1000 avgt 5 1.329 ± 0.041 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL
UNIFORM false 10000 10
HETEROGENEOUS 1000 avgt 5 382.901 ± 6.203 ms/op
```
After
```
Benchmark (assignmentType)
(assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio)
(subscriptionType) (topicCount) Mode Cnt Score Error Units
ServerSideAssignorBenchmark.doAssignment INCREMENTAL
RANGE false 10000 10
HOMOGENEOUS 1000 avgt 5 36.465 ± 1.954 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL
RANGE false 10000 10
HETEROGENEOUS 1000 avgt 5 114.043 ± 1.424 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL
UNIFORM false 10000 10
HOMOGENEOUS 1000 avgt 5 1.454 ± 0.019 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL
UNIFORM false 10000 10
HETEROGENEOUS 1000 avgt 5 342.840 ± 2.744 ms/op
```
---
Based heavily on https://github.com/apache/kafka/pull/16527.
Reviewers: David Arthur <[email protected]>, David Jacot
<[email protected]>
---
.../group/modern/TargetAssignmentBuilder.java | 9 +-
.../kafka/coordinator/group/modern/TopicIds.java | 168 +++++++++++++++++++--
.../group/modern/TargetAssignmentBuilderTest.java | 8 +-
.../coordinator/group/modern/TopicIdsTest.java | 7 +-
.../kafka/jmh/assignor/AssignorBenchmarkUtils.java | 6 +-
.../jmh/assignor/ServerSideAssignorBenchmark.java | 8 +-
.../assignor/TargetAssignmentBuilderBenchmark.java | 6 +-
7 files changed, 182 insertions(+), 30 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
index 930cdfb2e7c..ba08a236ba6 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
@@ -327,13 +327,14 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
*/
public TargetAssignmentResult build() throws PartitionAssignorException {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new
HashMap<>();
+ TopicIds.TopicResolver topicResolver = new
TopicIds.CachedTopicResolver(topicsImage);
// Prepare the member spec for all members.
members.forEach((memberId, member) ->
memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
- topicsImage
+ topicResolver
))
);
@@ -355,7 +356,7 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
memberSpecs.put(memberId,
createMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
- topicsImage
+ topicResolver
));
}
});
@@ -420,12 +421,12 @@ public class TargetAssignmentBuilder<T extends
ModernGroupMember> {
static <T extends ModernGroupMember> MemberSubscriptionAndAssignmentImpl
createMemberSubscriptionAndAssignment(
T member,
Assignment memberAssignment,
- TopicsImage topicsImage
+ TopicIds.TopicResolver topicResolver
) {
return new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
- new TopicIds(member.subscribedTopicNames(), topicsImage),
+ new TopicIds(member.subscribedTopicNames(), topicResolver),
memberAssignment
);
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java
index baf4e8ce778..f45735a527c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java
@@ -21,25 +21,161 @@ import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
/**
* TopicIds is initialized with topic names (String) but exposes a Set of
topic ids (Uuid) to the
- * user and performs the conversion lazily with TopicsImage.
+ * user and performs the conversion lazily with a TopicResolver backed by a
TopicsImage.
*/
public class TopicIds implements Set<Uuid> {
+ /**
+ * Converts between topic ids (Uuids) and topic names (Strings).
+ */
+ public interface TopicResolver {
+ /**
+ * @return The TopicsImage used by the resolver.
+ */
+ TopicsImage image();
+
+ /**
+ * Converts a topic id to a topic name.
+ *
+ * @param id The topic id.
+ * @return The topic name for the given topic id, or null if the topic
does not exist.
+ */
+ String name(Uuid id);
+
+ /**
+ * Converts a topic name to a topic id.
+ *
+ * @param name The topic name.
+ * @return The topic id for the given topic name, or null if the topic
does not exist.
+ */
+ Uuid id(String name);
+
+ /**
+ * Clears any cached data.
+ *
+ * Used for benchmarking purposes.
+ */
+ void clear();
+ }
+
+ /**
+ * A TopicResolver without any caching.
+ */
+ public static class DefaultTopicResolver implements TopicResolver {
+ private final TopicsImage image;
+
+ public DefaultTopicResolver(
+ TopicsImage image
+ ) {
+ this.image = Objects.requireNonNull(image);
+ }
+
+ @Override
+ public final TopicsImage image() {
+ return image;
+ }
+
+ @Override
+ public String name(Uuid id) {
+ TopicImage topic = image.getTopic(id);
+ if (topic == null) return null;
+ return topic.name();
+ }
+
+ @Override
+ public Uuid id(String name) {
+ TopicImage topic = image.getTopic(name);
+ if (topic == null) return null;
+ return topic.id();
+ }
+
+ @Override
+ public void clear() {}
+
+ @Override
+ public String toString() {
+ return "DefaultTopicResolver(image=" + image + ")";
+ }
+ }
+
+ /**
+ * A TopicResolver that caches results.
+ *
+ * This cache is expected to be short-lived and only used within a single
+ * TargetAssignmentBuilder.build() call.
+ */
+ public static class CachedTopicResolver implements TopicResolver {
+ private final TopicsImage image;
+
+ private final Map<String, Uuid> topicIds = new HashMap<>();
+ private final Map<Uuid, String> topicNames = new HashMap<>();
+
+ public CachedTopicResolver(
+ TopicsImage image
+ ) {
+ this.image = Objects.requireNonNull(image);
+ }
+
+ @Override
+ public final TopicsImage image() {
+ return image;
+ }
+
+ @Override
+ public String name(Uuid id) {
+ return topicNames.computeIfAbsent(id, __ -> {
+ TopicImage topic = image.getTopic(id);
+ if (topic == null) return null;
+ return topic.name();
+ });
+ }
+
+ @Override
+ public Uuid id(String name) {
+ return topicIds.computeIfAbsent(name, __ -> {
+ TopicImage topic = image.getTopic(name);
+ if (topic == null) return null;
+ return topic.id();
+ });
+ }
+
+ @Override
+ public void clear() {
+ this.topicNames.clear();
+ this.topicIds.clear();
+ }
+
+ @Override
+ public String toString() {
+ return "CachedTopicResolver(image=" + image + ")";
+ }
+ }
+
private final Set<String> topicNames;
- private final TopicsImage image;
+ private final TopicResolver resolver;
public TopicIds(
Set<String> topicNames,
TopicsImage image
) {
this.topicNames = Objects.requireNonNull(topicNames);
- this.image = Objects.requireNonNull(image);
+ this.resolver = new DefaultTopicResolver(image);
+ }
+
+ public TopicIds(
+ Set<String> topicNames,
+ TopicResolver resolver
+ ) {
+ this.topicNames = Objects.requireNonNull(topicNames);
+ this.resolver = Objects.requireNonNull(resolver);
}
@Override
@@ -56,24 +192,24 @@ public class TopicIds implements Set<Uuid> {
public boolean contains(Object o) {
if (o instanceof Uuid) {
Uuid topicId = (Uuid) o;
- TopicImage topicImage = image.getTopic(topicId);
- if (topicImage == null) return false;
- return topicNames.contains(topicImage.name());
+ String topicName = resolver.name(topicId);
+ if (topicName == null) return false;
+ return topicNames.contains(topicName);
}
return false;
}
private static class TopicIdIterator implements Iterator<Uuid> {
final Iterator<String> iterator;
- final TopicsImage image;
+ final TopicResolver resolver;
private Uuid next = null;
private TopicIdIterator(
Iterator<String> iterator,
- TopicsImage image
+ TopicResolver resolver
) {
this.iterator = Objects.requireNonNull(iterator);
- this.image = Objects.requireNonNull(image);
+ this.resolver = Objects.requireNonNull(resolver);
}
@Override
@@ -85,9 +221,9 @@ public class TopicIds implements Set<Uuid> {
return false;
}
String next = iterator.next();
- TopicImage topicImage = image.getTopic(next);
- if (topicImage != null) {
- result = topicImage.id();
+ Uuid topicId = resolver.id(next);
+ if (topicId != null) {
+ result = topicId;
}
} while (result == null);
next = result;
@@ -105,7 +241,7 @@ public class TopicIds implements Set<Uuid> {
@Override
public Iterator<Uuid> iterator() {
- return new TopicIdIterator(topicNames.iterator(), image);
+ return new TopicIdIterator(topicNames.iterator(), resolver);
}
@Override
@@ -164,20 +300,20 @@ public class TopicIds implements Set<Uuid> {
TopicIds uuids = (TopicIds) o;
if (!Objects.equals(topicNames, uuids.topicNames)) return false;
- return Objects.equals(image, uuids.image);
+ return Objects.equals(resolver.image(), uuids.resolver.image());
}
@Override
public int hashCode() {
int result = topicNames.hashCode();
- result = 31 * result + image.hashCode();
+ result = 31 * result + resolver.image().hashCode();
return result;
}
@Override
public String toString() {
return "TopicIds(topicNames=" + topicNames +
- ", image=" + image +
+ ", resolver=" + resolver +
')';
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
index 093145650b9..e3445663df8 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
@@ -161,6 +161,7 @@ public class TargetAssignmentBuilderTest {
public TargetAssignmentBuilder.TargetAssignmentResult build() {
TopicsImage topicsImage = topicsImageBuilder.build().topics();
+ TopicIds.TopicResolver topicResolver = new
TopicIds.CachedTopicResolver(topicsImage);
// Prepare expected member specs.
Map<String, MemberSubscriptionAndAssignmentImpl>
memberSubscriptions = new HashMap<>();
@@ -169,7 +170,7 @@ public class TargetAssignmentBuilderTest {
memberSubscriptions.put(memberId,
createMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
- topicsImage
+ topicResolver
))
);
@@ -192,7 +193,7 @@ public class TargetAssignmentBuilderTest {
memberSubscriptions.put(memberId,
createMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
- topicsImage
+ topicResolver
));
}
});
@@ -263,6 +264,7 @@ public class TargetAssignmentBuilderTest {
.addTopic(barTopicId, "bar", 5)
.build()
.topics();
+ TopicIds.TopicResolver topicResolver = new
TopicIds.DefaultTopicResolver(topicsImage);
ConsumerGroupMember member = new
ConsumerGroupMember.Builder("member-id")
.setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
@@ -278,7 +280,7 @@ public class TargetAssignmentBuilderTest {
MemberSubscription subscriptionSpec =
createMemberSubscriptionAndAssignment(
member,
assignment,
- topicsImage
+ topicResolver
);
assertEquals(new MemberSubscriptionAndAssignmentImpl(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java
index 876131fa51c..0706624a97a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java
@@ -41,7 +41,12 @@ public class TopicIdsTest {
@Test
public void testTopicsImageCannotBeNull() {
- assertThrows(NullPointerException.class, () -> new
TopicIds(Collections.emptySet(), null));
+ assertThrows(NullPointerException.class, () -> new
TopicIds(Collections.emptySet(), (TopicsImage) null));
+ }
+
+ @Test
+ public void testTopicResolverCannotBeNull() {
+ assertThrows(NullPointerException.class, () -> new
TopicIds(Collections.emptySet(), (TopicIds.TopicResolver) null));
}
@Test
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
index e3dceb19c2a..9402728e1eb 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
@@ -159,13 +159,13 @@ public class AssignorBenchmarkUtils {
*
* @param members The ConsumerGroupMembers.
* @param subscriptionType The group's subscription type.
- * @param topicsImage The TopicsImage to use.
+ * @param topicResolver The TopicResolver to use.
* @return The new GroupSpec.
*/
public static GroupSpec createGroupSpec(
Map<String, ConsumerGroupMember> members,
SubscriptionType subscriptionType,
- TopicsImage topicsImage
+ TopicIds.TopicResolver topicResolver
) {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new
HashMap<>();
@@ -177,7 +177,7 @@ public class AssignorBenchmarkUtils {
memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
- new TopicIds(member.subscribedTopicNames(), topicsImage),
+ new TopicIds(member.subscribedTopicNames(), topicResolver),
new Assignment(member.assignedPartitions())
));
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
index 1ad30e320b5..971828c1c47 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
@@ -129,6 +129,8 @@ public class ServerSideAssignorBenchmark {
private TopicsImage topicsImage = TopicsImage.EMPTY;
+ private TopicIds.TopicResolver topicResolver;
+
private SubscribedTopicDescriber subscribedTopicDescriber;
@Setup(Level.Trial)
@@ -138,7 +140,7 @@ public class ServerSideAssignorBenchmark {
setupTopics();
Map<String, ConsumerGroupMember> members = createMembers();
- this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members,
subscriptionType, topicsImage);
+ this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members,
subscriptionType, topicResolver);
if (assignmentType == AssignmentType.INCREMENTAL) {
simulateIncrementalRebalance();
@@ -155,6 +157,7 @@ public class ServerSideAssignorBenchmark {
);
topicsImage =
AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
+ topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
Map<Uuid, TopicMetadata> topicMetadata =
AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata);
subscribedTopicDescriber = new
SubscribedTopicDescriberImpl(topicMetadata);
@@ -229,7 +232,7 @@ public class ServerSideAssignorBenchmark {
if (subscriptionType == HETEROGENEOUS) {
subscribedTopicIdsForNewMember =
updatedMemberSpec.get(memberId(memberCount - 2)).subscribedTopicIds();
} else {
- subscribedTopicIdsForNewMember = new TopicIds(new
HashSet<>(allTopicNames), topicsImage);
+ subscribedTopicIdsForNewMember = new TopicIds(new
HashSet<>(allTopicNames), topicResolver);
}
Optional<String> rackId = rackId(memberCount - 1);
@@ -251,6 +254,7 @@ public class ServerSideAssignorBenchmark {
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void doAssignment() {
+ topicResolver.clear();
partitionAssignor.assign(groupSpec, subscribedTopicDescriber);
}
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index a06e9b0e527..2a23d22b655 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -27,6 +27,7 @@ import
org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.image.TopicsImage;
@@ -96,6 +97,8 @@ public class TargetAssignmentBuilderBenchmark {
private TopicsImage topicsImage;
+ private TopicIds.TopicResolver topicResolver;
+
private SubscribedTopicDescriber subscribedTopicDescriber;
@Setup(Level.Trial)
@@ -133,6 +136,7 @@ public class TargetAssignmentBuilderBenchmark {
);
topicsImage =
AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
+ topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
Map<Uuid, TopicMetadata> topicMetadata =
AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata);
subscribedTopicDescriber = new
SubscribedTopicDescriberImpl(topicMetadata);
@@ -144,7 +148,7 @@ public class TargetAssignmentBuilderBenchmark {
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(
members,
subscriptionType,
- topicsImage
+ topicResolver
);
GroupAssignment groupAssignment = partitionAssignor.assign(