This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 50b6953661a KAFKA-18122 : Added support for ShareConsumeBenchWorker
(#17984)
50b6953661a is described below
commit 50b6953661a46d7d57a8aca5c875e91a19166253
Author: ShivsundarR <[email protected]>
AuthorDate: Thu Dec 5 08:16:32 2024 -0500
KAFKA-18122 : Added support for ShareConsumeBenchWorker (#17984)
Added ShareConsumeBenchSpec and ShareConsumeBenchWorker similar to
ConsumeBenchSpec/ConsumeBenchWorker. This will help us run trogdor workloads
for share consumers as well.
Added a sample json workload running 5 share consumers.
Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy
<[email protected]>
---
tests/spec/simple_share_consume_bench_spec.json | 31 ++
.../trogdor/workload/ShareConsumeBenchSpec.java | 221 +++++++++
.../trogdor/workload/ShareConsumeBenchWorker.java | 498 +++++++++++++++++++++
.../workload/ShareConsumeBenchSpecTest.java | 60 +++
4 files changed, 810 insertions(+)
diff --git a/tests/spec/simple_share_consume_bench_spec.json
b/tests/spec/simple_share_consume_bench_spec.json
new file mode 100644
index 00000000000..23fbcff6d76
--- /dev/null
+++ b/tests/spec/simple_share_consume_bench_spec.json
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//
+// An example task specification for running a consumer benchmark in Trogdor.
+// See trogdor/README.md for details.
+//
+
+{
+ "class": "org.apache.kafka.trogdor.workload.ShareConsumeBenchSpec",
+ "durationMs": 10000000,
+ "consumerNode": "node0",
+ "bootstrapServers": "localhost:9092",
+ "targetMessagesPerSec": 1000,
+ "threadsPerWorker": 5,
+ "shareGroup": "sg",
+ "maxMessages": 10000,
+ "activeTopics": [ "foo[1-3]" ]
+}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpec.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpec.java
new file mode 100644
index 00000000000..021f04ebd33
--- /dev/null
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpec.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.trogdor.common.StringExpander;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The specification for a benchmark that consumes messages from a set of
topic/partitions.
+ *
+ * If a share group is not given to the specification, the default group name
"share" will be used.
+ *
+ * This specification uses a specific way to represent a topic partition via
its "activeTopics" field.
+ * The notation for that is topic_name:partition_number (e.g "foo:1"
represents partition-1 of topic "foo")
+ * Note that a topic name cannot have more than one colon.
+ *
+ * The "activeTopics" field also supports ranges that get expanded. See
#{@link StringExpander}.
+ *
+ * There now exists a clever and succinct way to represent multiple topics.
+ * Example:
+ * Given "activeTopics": ["foo[1-3]"], "foo[1-3]" will get
+ * expanded to [foo1, foo2, foo3].
+ *
+ * The consumer will subscribe to the topics via
+ * #{@link
org.apache.kafka.clients.consumer.KafkaShareConsumer#subscribe(Collection)}.
+ * It will be assigned partitions dynamically from the share group by the
broker.
+ *
+ * This specification supports the spawning of multiple share consumers in the
single Trogdor worker agent.
+ * The "threadsPerWorker" field denotes how many consumers should be spawned
for this spec.
+ * It is worth noting that the "targetMessagesPerSec", "maxMessages" and
"activeTopics" fields apply for every share consumer individually.
+ *
+ * The "recordProcessor" field allows the specification of tasks to run on
records that are consumed. This is run
+ * immediately after the messages are polled. See the `RecordProcessor`
interface for more information.
+ *
+ * An example JSON representation which will result in a share consumer that
is part of the share group "sg" and
+ * subscribed to topics foo1, foo2, foo3 and bar.
+ * #{@code
+ * {
+ * "class": "org.apache.kafka.trogdor.workload.ShareConsumeBenchSpec",
+ * "durationMs": 10000000,
+ * "consumerNode": "node0",
+ * "bootstrapServers": "localhost:9092",
+ * "maxMessages": 100,
+ * "shareGroup": "sg",
+ * "activeTopics": ["foo[1-3]", "bar"]
+ * }
+ * }
+ */
+public final class ShareConsumeBenchSpec extends TaskSpec {
+
+ private static final String VALID_EXPANDED_TOPIC_NAME_PATTERN = "^[^:]+$";
+ private static final String DEFAULT_SHARE_GROUP_NAME = "share";
+ private final String consumerNode;
+ private final String bootstrapServers;
+ private final int targetMessagesPerSec;
+ private final long maxMessages;
+ private final Map<String, String> consumerConf;
+ private final Map<String, String> adminClientConf;
+ private final Map<String, String> commonClientConf;
+ private final List<String> activeTopics;
+ private final String shareGroup;
+ private final int threadsPerWorker;
+ private final Optional<RecordProcessor> recordProcessor;
+
+ @JsonCreator
+ public ShareConsumeBenchSpec(@JsonProperty("startMs") long startMs,
+ @JsonProperty("durationMs") long durationMs,
+ @JsonProperty("consumerNode") String consumerNode,
+ @JsonProperty("bootstrapServers") String
bootstrapServers,
+ @JsonProperty("targetMessagesPerSec") int
targetMessagesPerSec,
+ @JsonProperty("maxMessages") long maxMessages,
+ @JsonProperty("consumerGroup") String shareGroup,
+ @JsonProperty("consumerConf") Map<String, String>
consumerConf,
+ @JsonProperty("commonClientConf") Map<String,
String> commonClientConf,
+ @JsonProperty("adminClientConf") Map<String,
String> adminClientConf,
+ @JsonProperty("threadsPerWorker") Integer
threadsPerWorker,
+ @JsonProperty("recordProcessor")
Optional<RecordProcessor> recordProcessor,
+ @JsonProperty("activeTopics") List<String>
activeTopics) {
+ super(startMs, durationMs);
+ this.consumerNode = (consumerNode == null) ? "" : consumerNode;
+ this.bootstrapServers = (bootstrapServers == null) ? "" :
bootstrapServers;
+ this.targetMessagesPerSec = targetMessagesPerSec;
+ this.maxMessages = maxMessages;
+ this.consumerConf = configOrEmptyMap(consumerConf);
+ this.commonClientConf = configOrEmptyMap(commonClientConf);
+ this.adminClientConf = configOrEmptyMap(adminClientConf);
+ this.activeTopics = activeTopics == null ? new ArrayList<>() :
activeTopics;
+ this.shareGroup = shareGroup == null ? DEFAULT_SHARE_GROUP_NAME :
shareGroup;
+ this.threadsPerWorker = threadsPerWorker == null ? 1 :
threadsPerWorker;
+ this.recordProcessor = recordProcessor;
+ }
+
+ @JsonProperty
+ public String consumerNode() {
+ return consumerNode;
+ }
+
+ @JsonProperty
+ public String shareGroup() {
+ return shareGroup;
+ }
+
+ @JsonProperty
+ public String bootstrapServers() {
+ return bootstrapServers;
+ }
+
+ @JsonProperty
+ public int targetMessagesPerSec() {
+ return targetMessagesPerSec;
+ }
+
+ @JsonProperty
+ public long maxMessages() {
+ return maxMessages;
+ }
+
+ @JsonProperty
+ public int threadsPerWorker() {
+ return threadsPerWorker;
+ }
+
+ @JsonProperty
+ public Optional<RecordProcessor> recordProcessor() {
+ return this.recordProcessor;
+ }
+
+ @JsonProperty
+ public Map<String, String> consumerConf() {
+ return consumerConf;
+ }
+
+ @JsonProperty
+ public Map<String, String> commonClientConf() {
+ return commonClientConf;
+ }
+
+ @JsonProperty
+ public Map<String, String> adminClientConf() {
+ return adminClientConf;
+ }
+
+ @JsonProperty
+ public List<String> activeTopics() {
+ return activeTopics;
+ }
+
+ @Override
+ public TaskController newController(String id) {
+ return topology -> Collections.singleton(consumerNode);
+ }
+
+ @Override
+ public TaskWorker newTaskWorker(String id) {
+ return new ShareConsumeBenchWorker(id, this);
+ }
+
+ /**
+ * Materializes a list of topic names (optionally with ranges) into a map
of the topics and their partitions
+ *
+ * Example:
+ * ['foo[1-3]', 'bar[1-2]'] => {'foo1', 'foo2', 'foo3', 'bar1', 'bar2' }
+ */
+ Set<String> expandTopicNames() {
+ Set<String> expandedTopics = new HashSet<>();
+
+ for (String rawTopicName : this.activeTopics) {
+ Set<String> expandedNames = expandTopicName(rawTopicName);
+ if
(!expandedNames.iterator().next().matches(VALID_EXPANDED_TOPIC_NAME_PATTERN))
+ throw new IllegalArgumentException(String.format("Expanded
topic name %s is invalid", expandedNames));
+
+ expandedTopics.addAll(expandedNames);
+ }
+ return expandedTopics;
+ }
+
+ /**
+ * Expands a topic name until there are no more ranges in it
+ */
+ private Set<String> expandTopicName(String topicName) {
+ Set<String> expandedNames = StringExpander.expand(topicName);
+ if (expandedNames.size() == 1) {
+ return expandedNames;
+ }
+
+ Set<String> newNames = new HashSet<>();
+ for (String name : expandedNames) {
+ newNames.addAll(expandTopicName(name));
+ }
+ return newNames;
+ }
+}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
new file mode 100644
index 00000000000..dad654ec552
--- /dev/null
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.WorkerUtils;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class ShareConsumeBenchWorker implements TaskWorker {
+ private static final Logger log =
LoggerFactory.getLogger(ShareConsumeBenchWorker.class);
+
+ private static final int THROTTLE_PERIOD_MS = 100;
+
+ private final String id;
+ private final ShareConsumeBenchSpec spec;
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private ScheduledExecutorService executor;
+ private WorkerStatusTracker workerStatus;
+ private StatusUpdater statusUpdater;
+ private Future<?> statusUpdaterFuture;
+ private KafkaFutureImpl<String> doneFuture;
+ public ShareConsumeBenchWorker(String id, ShareConsumeBenchSpec spec) {
+ this.id = id;
+ this.spec = spec;
+ }
+
+ @Override
+ public void start(Platform platform, WorkerStatusTracker status,
+ KafkaFutureImpl<String> doneFuture) throws Exception {
+ if (!running.compareAndSet(false, true)) {
+ throw new IllegalStateException("ShareConsumeBenchWorker is
already running.");
+ }
+ log.info("{}: Activating ShareConsumeBenchWorker with {}", id, spec);
+ this.statusUpdater = new StatusUpdater();
+ this.executor = Executors.newScheduledThreadPool(
+ spec.threadsPerWorker() + 2, // 1 thread for all the
ConsumeStatusUpdater and 1 for the StatusUpdater
+
ThreadUtils.createThreadFactory("ShareConsumeBenchWorkerThread%d", false));
+ this.statusUpdaterFuture =
executor.scheduleAtFixedRate(this.statusUpdater, 1, 1, TimeUnit.MINUTES);
+ this.workerStatus = status;
+ this.doneFuture = doneFuture;
+ executor.submit(new Prepare());
+ }
+
+ public class Prepare implements Runnable {
+ @Override
+ public void run() {
+ try {
+ List<Future<Void>> consumeTasks = new ArrayList<>();
+ for (ConsumeMessages task : consumeTasks()) {
+ consumeTasks.add(executor.submit(task));
+ }
+ executor.submit(new CloseStatusUpdater(consumeTasks));
+ } catch (Throwable e) {
+ WorkerUtils.abort(log, "Prepare", e, doneFuture);
+ }
+ }
+
+ private List<ConsumeMessages> consumeTasks() {
+ List<ConsumeMessages> tasks = new ArrayList<>();
+ String shareGroup = shareGroup();
+ int consumerCount = spec.threadsPerWorker();
+
+ Set<String> topics = new HashSet<>(spec.expandTopicNames());
+
+ for (int i = 0; i < consumerCount; i++) {
+ tasks.add(new ConsumeMessages(consumer(shareGroup,
clientId(i)), spec.recordProcessor(), topics));
+ }
+
+ return tasks;
+ }
+
+ private String clientId(int idx) {
+ return String.format("consumer.%s-%d", id, idx);
+ }
+
+ /**
+ * Creates a new KafkaConsumer instance
+ */
+ private ThreadSafeShareConsumer consumer(String shareGroup, String
clientId) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
spec.bootstrapServers());
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup);
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+ // these defaults maybe over-written by the user-specified
commonClientConf or consumerConf
+ WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(),
spec.consumerConf());
+ return new ThreadSafeShareConsumer(new KafkaShareConsumer<>(props,
new ByteArrayDeserializer(), new ByteArrayDeserializer()), clientId);
+ }
+
+ private String shareGroup() {
+ return spec.shareGroup();
+ }
+ }
+
+ public class ConsumeMessages implements Callable<Void> {
+ private final Histogram latencyHistogram;
+ private final Histogram messageSizeHistogram;
+ private final Future<?> statusUpdaterFuture;
+ private final Throttle throttle;
+ private final String clientId;
+ private final ThreadSafeShareConsumer consumer;
+ private final Optional<RecordProcessor> recordProcessor;
+
+ private ConsumeMessages(ThreadSafeShareConsumer consumer,
+ Optional<RecordProcessor> recordProcessor) {
+ this.latencyHistogram = new Histogram(10000);
+ this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
+ this.clientId = consumer.clientId();
+ this.statusUpdaterFuture = executor.scheduleAtFixedRate(
+ new ConsumeStatusUpdater(latencyHistogram,
messageSizeHistogram, consumer, recordProcessor), 1, 1, TimeUnit.MINUTES);
+ int perPeriod;
+ if (spec.targetMessagesPerSec() <= 0)
+ perPeriod = Integer.MAX_VALUE;
+ else
+ perPeriod =
WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
+
+ this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
+ this.consumer = consumer;
+ this.recordProcessor = recordProcessor;
+ }
+
+ ConsumeMessages(ThreadSafeShareConsumer consumer,
+ Optional<RecordProcessor> recordProcessor,
+ Set<String> topics) {
+ this(consumer, recordProcessor);
+ log.info("Will consume from topics {}.", topics);
+ this.consumer.subscribe(topics);
+ }
+
+ @Override
+ public Void call() throws Exception {
+ long messagesConsumed = 0;
+ long bytesConsumed = 0;
+ long startTimeMs = Time.SYSTEM.milliseconds();
+ long startBatchMs = startTimeMs;
+ long maxMessages = spec.maxMessages();
+ try {
+ while (messagesConsumed < maxMessages) {
+ ConsumerRecords<byte[], byte[]> records = consumer.poll();
+ if (records.isEmpty()) {
+ continue;
+ }
+ long endBatchMs = Time.SYSTEM.milliseconds();
+ long elapsedBatchMs = endBatchMs - startBatchMs;
+
+ // Do the record batch processing immediately to avoid
latency skew.
+ recordProcessor.ifPresent(processor ->
processor.processRecords(records));
+
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ messagesConsumed++;
+ long messageBytes = 0;
+ if (record.key() != null) {
+ messageBytes += record.serializedKeySize();
+ }
+ if (record.value() != null) {
+ messageBytes += record.serializedValueSize();
+ }
+ latencyHistogram.add(elapsedBatchMs);
+ messageSizeHistogram.add(messageBytes);
+ bytesConsumed += messageBytes;
+ if (messagesConsumed >= maxMessages)
+ break;
+
+ throttle.increment();
+ }
+ startBatchMs = Time.SYSTEM.milliseconds();
+ }
+ } catch (Exception e) {
+ WorkerUtils.abort(log, "ConsumeRecords", e, doneFuture);
+ } finally {
+ statusUpdaterFuture.cancel(false);
+ StatusData statusData =
+ new ConsumeStatusUpdater(latencyHistogram,
messageSizeHistogram, consumer, spec.recordProcessor()).update();
+ long curTimeMs = Time.SYSTEM.milliseconds();
+ log.info("{} Consumed total number of messages={}, bytes={} in
{} ms. status: {}",
+ clientId, messagesConsumed, bytesConsumed, curTimeMs -
startTimeMs, statusData);
+ }
+ consumer.close();
+ return null;
+ }
+ }
+
+ public class CloseStatusUpdater implements Runnable {
+ private final List<Future<Void>> consumeTasks;
+
+ CloseStatusUpdater(List<Future<Void>> consumeTasks) {
+ this.consumeTasks = consumeTasks;
+ }
+
+ @Override
+ public void run() {
+ while (!consumeTasks.stream().allMatch(Future::isDone)) {
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e) {
+ log.debug("{} was interrupted. Closing...",
this.getClass().getName());
+ break; // close the thread
+ }
+ }
+ statusUpdaterFuture.cancel(false);
+ statusUpdater.update();
+ doneFuture.complete("");
+ }
+ }
+
+ class StatusUpdater implements Runnable {
+ final Map<String, JsonNode> statuses;
+
+ StatusUpdater() {
+ statuses = new HashMap<>();
+ }
+
+ @Override
+ public void run() {
+ try {
+ update();
+ } catch (Exception e) {
+ WorkerUtils.abort(log, "ConsumeStatusUpdater", e, doneFuture);
+ }
+ }
+
+ synchronized void update() {
+ workerStatus.update(JsonUtil.JSON_SERDE.valueToTree(statuses));
+ }
+
+ synchronized void updateConsumeStatus(String clientId, StatusData
status) {
+ statuses.put(clientId, JsonUtil.JSON_SERDE.valueToTree(status));
+ }
+ }
+
+ /**
+ * Runnable class that updates the status of a single consumer
+ */
+ public class ConsumeStatusUpdater implements Runnable {
+ private final Histogram latencyHistogram;
+ private final Histogram messageSizeHistogram;
+ private final ThreadSafeShareConsumer consumer;
+ private final Optional<RecordProcessor> recordProcessor;
+
+ ConsumeStatusUpdater(Histogram latencyHistogram,
+ Histogram messageSizeHistogram,
+ ThreadSafeShareConsumer consumer,
+ Optional<RecordProcessor> recordProcessor) {
+ this.latencyHistogram = latencyHistogram;
+ this.messageSizeHistogram = messageSizeHistogram;
+ this.consumer = consumer;
+ this.recordProcessor = recordProcessor;
+ }
+
+ @Override
+ public void run() {
+ try {
+ update();
+ } catch (Exception e) {
+ WorkerUtils.abort(log, "ConsumeStatusUpdater", e, doneFuture);
+ }
+ }
+
+ StatusData update() {
+ Histogram.Summary latSummary =
latencyHistogram.summarize(StatusData.PERCENTILES);
+ Histogram.Summary msgSummary =
messageSizeHistogram.summarize(StatusData.PERCENTILES);
+
+ // Parse out the RecordProcessor's status, id specified.
+ Optional<JsonNode> recordProcessorStatus = Optional.empty();
+ if (recordProcessor.isPresent()) {
+ recordProcessorStatus =
Optional.of(recordProcessor.get().processorStatus());
+ }
+
+ StatusData statusData = new StatusData(
+ consumer.subscription(),
+ latSummary.numSamples(),
+ (long) (msgSummary.numSamples() * msgSummary.average()),
+ (long) msgSummary.average(),
+ latSummary.average(),
+ latSummary.percentiles().get(0).value(),
+ latSummary.percentiles().get(1).value(),
+ latSummary.percentiles().get(2).value(),
+ recordProcessorStatus);
+ statusUpdater.updateConsumeStatus(consumer.clientId(), statusData);
+ log.info("Status={}", JsonUtil.toJsonString(statusData));
+ return statusData;
+ }
+ }
+
+ public static class StatusData {
+ private final long totalMessagesReceived;
+ private final Set<String> subscription;
+ private final long totalBytesReceived;
+ private final long averageMessageSizeBytes;
+ private final float averageLatencyMs;
+ private final int p50LatencyMs;
+ private final int p95LatencyMs;
+ private final int p99LatencyMs;
+ private final Optional<JsonNode> recordProcessorStatus;
+
+ /**
+ * The percentiles to use when calculating the histogram data.
+ * These should match up with the p50LatencyMs, p95LatencyMs, etc.
fields.
+ */
+ static final float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+ @JsonCreator
+ StatusData(@JsonProperty("subscription") Set<String> subscription,
+ @JsonProperty("totalMessagesReceived") long
totalMessagesReceived,
+ @JsonProperty("totalBytesReceived") long totalBytesReceived,
+ @JsonProperty("averageMessageSizeBytes") long
averageMessageSizeBytes,
+ @JsonProperty("averageLatencyMs") float averageLatencyMs,
+ @JsonProperty("p50LatencyMs") int p50latencyMs,
+ @JsonProperty("p95LatencyMs") int p95latencyMs,
+ @JsonProperty("p99LatencyMs") int p99latencyMs,
+ @JsonProperty("recordProcessorStatus") Optional<JsonNode>
recordProcessorStatus) {
+ this.subscription = subscription;
+ this.totalMessagesReceived = totalMessagesReceived;
+ this.totalBytesReceived = totalBytesReceived;
+ this.averageMessageSizeBytes = averageMessageSizeBytes;
+ this.averageLatencyMs = averageLatencyMs;
+ this.p50LatencyMs = p50latencyMs;
+ this.p95LatencyMs = p95latencyMs;
+ this.p99LatencyMs = p99latencyMs;
+ this.recordProcessorStatus = recordProcessorStatus;
+ }
+
+ @JsonProperty
+ public Set<String> subscription() {
+ return subscription;
+ }
+
+ @JsonProperty
+ public long totalMessagesReceived() {
+ return totalMessagesReceived;
+ }
+
+ @JsonProperty
+ public long totalBytesReceived() {
+ return totalBytesReceived;
+ }
+
+ @JsonProperty
+ public long averageMessageSizeBytes() {
+ return averageMessageSizeBytes;
+ }
+
+ @JsonProperty
+ public float averageLatencyMs() {
+ return averageLatencyMs;
+ }
+
+ @JsonProperty
+ public int p50LatencyMs() {
+ return p50LatencyMs;
+ }
+
+ @JsonProperty
+ public int p95LatencyMs() {
+ return p95LatencyMs;
+ }
+
+ @JsonProperty
+ public int p99LatencyMs() {
+ return p99LatencyMs;
+ }
+
+ @JsonProperty
+ public JsonNode recordProcessorStatus() {
+ return recordProcessorStatus.orElse(null);
+ }
+ }
+
+ @Override
+ public void stop(Platform platform) throws Exception {
+ if (!running.compareAndSet(true, false)) {
+ throw new IllegalStateException("ShareConsumeBenchWorker is not
running.");
+ }
+ log.info("{}: Deactivating ShareConsumeBenchWorker.", id);
+ doneFuture.complete("");
+ executor.shutdownNow();
+ executor.awaitTermination(1, TimeUnit.DAYS);
+ this.executor = null;
+ this.statusUpdater = null;
+ this.statusUpdaterFuture = null;
+ this.workerStatus = null;
+ this.doneFuture = null;
+ }
+
+ /**
+ * A thread-safe KafkaShareConsumer wrapper
+ */
+ private static class ThreadSafeShareConsumer {
+ private final KafkaShareConsumer<byte[], byte[]> consumer;
+ private final String clientId;
+ private final ReentrantLock consumerLock;
+ private boolean closed = false;
+
+ ThreadSafeShareConsumer(KafkaShareConsumer<byte[], byte[]> consumer,
String clientId) {
+ this.consumer = consumer;
+ this.clientId = clientId;
+ this.consumerLock = new ReentrantLock();
+ }
+
+ ConsumerRecords<byte[], byte[]> poll() {
+ this.consumerLock.lock();
+ try {
+ return consumer.poll(Duration.ofMillis(50));
+ } finally {
+ this.consumerLock.unlock();
+ }
+ }
+
+ void close() {
+ if (closed)
+ return;
+ this.consumerLock.lock();
+ try {
+ consumer.unsubscribe();
+ Utils.closeQuietly(consumer, "consumer");
+ closed = true;
+ } finally {
+ this.consumerLock.unlock();
+ }
+ }
+
+ void subscribe(Set<String> topics) {
+ this.consumerLock.lock();
+ try {
+ consumer.subscribe(topics);
+ } finally {
+ this.consumerLock.unlock();
+ }
+ }
+
+ Set<String> subscription() {
+ this.consumerLock.lock();
+ try {
+ return consumer.subscription();
+ } finally {
+ this.consumerLock.unlock();
+ }
+ }
+
+ String clientId() {
+ return clientId;
+ }
+
+ KafkaShareConsumer<byte[], byte[]> consumer() {
+ return consumer;
+ }
+ }
+}
diff --git
a/trogdor/src/test/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpecTest.java
b/trogdor/src/test/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpecTest.java
new file mode 100644
index 00000000000..76a30b73ec1
--- /dev/null
+++
b/trogdor/src/test/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpecTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ShareConsumeBenchSpecTest {
+
+ @Test
+ public void testExpandTopicNames() {
+ ShareConsumeBenchSpec shareConsumeBenchSpec =
shareConsumeBenchSpec(Arrays.asList("foo[1-3]", "bar"));
+ Set<String> expectedNames = new HashSet<>();
+
+ expectedNames.add("foo1");
+ expectedNames.add("foo2");
+ expectedNames.add("foo3");
+ expectedNames.add("bar");
+
+ assertEquals(expectedNames, shareConsumeBenchSpec.expandTopicNames());
+ }
+
+ @Test
+ public void testInvalidNameRaisesException() {
+ for (String invalidName : Arrays.asList("In:valid", "invalid:",
":invalid[]", "in:valid:", "invalid[1-3]:")) {
+ assertThrows(IllegalArgumentException.class, () ->
shareConsumeBenchSpec(Collections.singletonList(invalidName)).expandTopicNames());
+ }
+ }
+
+ private ShareConsumeBenchSpec shareConsumeBenchSpec(List<String>
activeTopics) {
+ return new ShareConsumeBenchSpec(0, 0, "node", "localhost",
+ 123, 1234, "sg-1",
+ Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap(), 1,
+ Optional.empty(), activeTopics);
+ }
+
+}
\ No newline at end of file