This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c671e44 MINOR: Add `KafkaAdminClient.getListOffsetsCalls` benchmark
(#10955)
c671e44 is described below
commit c671e44b0bebc4cf4eb04392a673d3dad9cf5f23
Author: Jeff Kim <[email protected]>
AuthorDate: Tue Jul 6 23:58:56 2021 -0700
MINOR: Add `KafkaAdminClient.getListOffsetsCalls` benchmark (#10955)
Reviewers: David Jacot <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 7 +-
.../kafka/clients/admin/AdminClientTestUtils.java | 13 ++
.../jmh/admin/GetListOffsetsCallsBenchmark.java | 142 +++++++++++++++++++++
3 files changed, 159 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 2381bfb..1037135 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3677,9 +3677,10 @@ public class KafkaAdminClient extends AdminClient {
return new ListOffsetsResult(new HashMap<>(futures));
}
- private List<Call>
getListOffsetsCalls(MetadataOperationContext<ListOffsetsResultInfo,
ListOffsetsOptions> context,
- Map<TopicPartition, OffsetSpec>
topicPartitionOffsets,
- Map<TopicPartition,
KafkaFutureImpl<ListOffsetsResultInfo>> futures) {
+ // visible for benchmark
+ List<Call>
getListOffsetsCalls(MetadataOperationContext<ListOffsetsResultInfo,
ListOffsetsOptions> context,
+ Map<TopicPartition, OffsetSpec>
topicPartitionOffsets,
+ Map<TopicPartition,
KafkaFutureImpl<ListOffsetsResultInfo>> futures) {
MetadataResponse mr = context.response().orElseThrow(() -> new
IllegalStateException("No Metadata response"));
Cluster clusterSnapshot = mr.buildCluster();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 9fd37502..019566a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -17,10 +17,12 @@
package org.apache.kafka.clients.admin;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import
org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
+import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
@@ -102,4 +104,15 @@ public class AdminClientTestUtils {
public static ListConsumerGroupOffsetsResult
listConsumerGroupOffsetsResult(Map<TopicPartition, OffsetAndMetadata> offsets) {
return new
ListConsumerGroupOffsetsResult(KafkaFuture.completedFuture(offsets));
}
+
+ /**
+ * Used for benchmark. KafkaAdminClient.getListOffsetsCalls is only
accessible
+ * from within the admin package.
+ */
+ public static List<KafkaAdminClient.Call>
getListOffsetsCalls(KafkaAdminClient adminClient,
+
MetadataOperationContext<ListOffsetsResult.ListOffsetsResultInfo,
ListOffsetsOptions> context,
+
Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
+
Map<TopicPartition, KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo>>
futures) {
+ return adminClient.getListOffsetsCalls(context, topicPartitionOffsets,
futures);
+ }
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/admin/GetListOffsetsCallsBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/admin/GetListOffsetsCallsBenchmark.java
new file mode 100644
index 0000000..8da09ed
--- /dev/null
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/admin/GetListOffsetsCallsBenchmark.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.admin;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.MetadataResponseData;
+import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+public class GetListOffsetsCallsBenchmark {
+ @Param({"1", "10"})
+ private int topicCount;
+
+ @Param({"100", "1000", "10000"})
+ private int partitionCount;
+
+ private KafkaAdminClient admin;
+ private MetadataOperationContext<ListOffsetsResult.ListOffsetsResultInfo,
ListOffsetsOptions> context;
+ private final Map<TopicPartition, OffsetSpec> topicPartitionOffsets = new
HashMap<>();
+ private final Map<TopicPartition,
KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo>> futures = new
HashMap<>();
+ private final int numNodes = 3;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ MetadataResponseData data = new MetadataResponseData();
+ List<MetadataResponseTopic> mrTopicList = new ArrayList<>();
+ Set<String> topics = new HashSet<>();
+
+ for (int topicIndex = 0; topicIndex < topicCount; topicIndex++) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "topic-" + topicIndex;
+ MetadataResponseTopic mrTopic = new MetadataResponseTopic()
+ .setTopicId(topicId)
+ .setName(topicName)
+ .setErrorCode((short) 0)
+ .setIsInternal(false);
+
+ List<MetadataResponsePartition> mrPartitionList = new
ArrayList<>();
+
+ for (int partition = 0; partition < partitionCount; partition++) {
+ TopicPartition tp = new TopicPartition(topicName, partition);
+ topics.add(tp.topic());
+ futures.put(tp, new KafkaFutureImpl<>());
+ topicPartitionOffsets.put(tp, OffsetSpec.latest());
+
+ MetadataResponsePartition mrPartition = new
MetadataResponsePartition()
+ .setLeaderId(partition % numNodes)
+ .setPartitionIndex(partition)
+ .setIsrNodes(Arrays.asList(0, 1, 2))
+ .setReplicaNodes(Arrays.asList(0, 1, 2))
+ .setOfflineReplicas(Collections.emptyList())
+ .setErrorCode((short) 0);
+
+ mrPartitionList.add(mrPartition);
+ }
+
+ mrTopic.setPartitions(mrPartitionList);
+ mrTopicList.add(mrTopic);
+ }
+ data.setTopics(new
MetadataResponseData.MetadataResponseTopicCollection(mrTopicList.listIterator()));
+
+ long deadline = 0L;
+ short version = 0;
+ context = new MetadataOperationContext<>(topics, new
ListOffsetsOptions(), deadline, futures);
+ context.setResponse(Optional.of(new MetadataResponse(data, version)));
+
+ AdminClientUnitTestEnv adminEnv = new
AdminClientUnitTestEnv(mockCluster());
+ admin = (KafkaAdminClient) adminEnv.adminClient();
+ }
+
+ @Benchmark
+ public Object testGetListOffsetsCalls() {
+ return AdminClientTestUtils.getListOffsetsCalls(admin, context,
topicPartitionOffsets, futures);
+ }
+
+ private Cluster mockCluster() {
+ final int controllerIndex = 0;
+
+ HashMap<Integer, Node> nodes = new HashMap<>();
+ for (int i = 0; i < numNodes; i++)
+ nodes.put(i, new Node(i, "localhost", 8121 + i));
+ return new Cluster("mockClusterId", nodes.values(),
+ Collections.emptySet(), Collections.emptySet(),
+ Collections.emptySet(), nodes.get(controllerIndex));
+ }
+}