This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 50b6953661a KAFKA-18122 : Added support for ShareConsumeBenchWorker 
(#17984)
50b6953661a is described below

commit 50b6953661a46d7d57a8aca5c875e91a19166253
Author: ShivsundarR <[email protected]>
AuthorDate: Thu Dec 5 08:16:32 2024 -0500

    KAFKA-18122 : Added support for ShareConsumeBenchWorker (#17984)
    
    Added ShareConsumeBenchSpec and ShareConsumeBenchWorker similar to 
ConsumeBenchSpec/ConsumeBenchWorker. This will help us run trogdor workloads 
for share consumers as well.
    Added a sample json workload running 5 share consumers.
    
    Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy 
<[email protected]>
---
 tests/spec/simple_share_consume_bench_spec.json    |  31 ++
 .../trogdor/workload/ShareConsumeBenchSpec.java    | 221 +++++++++
 .../trogdor/workload/ShareConsumeBenchWorker.java  | 498 +++++++++++++++++++++
 .../workload/ShareConsumeBenchSpecTest.java        |  60 +++
 4 files changed, 810 insertions(+)

diff --git a/tests/spec/simple_share_consume_bench_spec.json 
b/tests/spec/simple_share_consume_bench_spec.json
new file mode 100644
index 00000000000..23fbcff6d76
--- /dev/null
+++ b/tests/spec/simple_share_consume_bench_spec.json
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//
+// An example task specification for running a consumer benchmark in Trogdor.
+// See trogdor/README.md for details.
+//
+
+{
+  "class": "org.apache.kafka.trogdor.workload.ShareConsumeBenchSpec",
+  "durationMs": 10000000,
+  "consumerNode": "node0",
+  "bootstrapServers": "localhost:9092",
+  "targetMessagesPerSec": 1000,
+  "threadsPerWorker": 5,
+  "shareGroup": "sg",
+  "maxMessages": 10000,
+  "activeTopics": [ "foo[1-3]" ]
+}
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpec.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpec.java
new file mode 100644
index 00000000000..021f04ebd33
--- /dev/null
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpec.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.trogdor.common.StringExpander;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The specification for a benchmark that consumes messages from a set of 
topic/partitions.
+ *
+ * If a share group is not given to the specification, the default group name 
"share" will be used.
+ *
+ * This specification uses a specific way to represent a topic partition via 
its "activeTopics" field.
+ * The notation for that is topic_name:partition_number (e.g "foo:1" 
represents partition-1 of topic "foo")
+ * Note that a topic name cannot have more than one colon.
+ *
+ * The "activeTopics" field also supports ranges that get expanded. See 
#{@link StringExpander}.
+ *
+ * There now exists a clever and succinct way to represent multiple topics.
+ * Example:
+ * Given "activeTopics": ["foo[1-3]"], "foo[1-3]" will get
+ * expanded to [foo1, foo2, foo3].
+ *
+ * The consumer will subscribe to the topics via
+ * #{@link 
org.apache.kafka.clients.consumer.KafkaShareConsumer#subscribe(Collection)}.
+ * It will be assigned partitions dynamically from the share group by the 
broker.
+ *
+ * This specification supports the spawning of multiple share consumers in the 
single Trogdor worker agent.
+ * The "threadsPerWorker" field denotes how many consumers should be spawned 
for this spec.
+ * It is worth noting that the "targetMessagesPerSec", "maxMessages" and 
"activeTopics" fields apply for every share consumer individually.
+ *
+ * The "recordProcessor" field allows the specification of tasks to run on 
records that are consumed.  This is run
+ * immediately after the messages are polled.  See the `RecordProcessor` 
interface for more information.
+ *
+ * An example JSON representation which will result in a share consumer that 
is part of the share group "sg" and
+ * subscribed to topics foo1, foo2, foo3 and bar.
+ * #{@code
+ *    {
+ *        "class": "org.apache.kafka.trogdor.workload.ShareConsumeBenchSpec",
+ *        "durationMs": 10000000,
+ *        "consumerNode": "node0",
+ *        "bootstrapServers": "localhost:9092",
+ *        "maxMessages": 100,
+ *        "shareGroup": "sg",
+ *        "activeTopics": ["foo[1-3]", "bar"]
+ *    }
+ * }
+ */
+public final class ShareConsumeBenchSpec extends TaskSpec {
+
+    private static final String VALID_EXPANDED_TOPIC_NAME_PATTERN = "^[^:]+$";
+    private static final String DEFAULT_SHARE_GROUP_NAME = "share";
+    private final String consumerNode;
+    private final String bootstrapServers;
+    private final int targetMessagesPerSec;
+    private final long maxMessages;
+    private final Map<String, String> consumerConf;
+    private final Map<String, String> adminClientConf;
+    private final Map<String, String> commonClientConf;
+    private final List<String> activeTopics;
+    private final String shareGroup;
+    private final int threadsPerWorker;
+    private final Optional<RecordProcessor> recordProcessor;
+
+    @JsonCreator
+    public ShareConsumeBenchSpec(@JsonProperty("startMs") long startMs,
+                            @JsonProperty("durationMs") long durationMs,
+                            @JsonProperty("consumerNode") String consumerNode,
+                            @JsonProperty("bootstrapServers") String 
bootstrapServers,
+                            @JsonProperty("targetMessagesPerSec") int 
targetMessagesPerSec,
+                            @JsonProperty("maxMessages") long maxMessages,
+                            @JsonProperty("consumerGroup") String shareGroup,
+                            @JsonProperty("consumerConf") Map<String, String> 
consumerConf,
+                            @JsonProperty("commonClientConf") Map<String, 
String> commonClientConf,
+                            @JsonProperty("adminClientConf") Map<String, 
String> adminClientConf,
+                            @JsonProperty("threadsPerWorker") Integer 
threadsPerWorker,
+                            @JsonProperty("recordProcessor") 
Optional<RecordProcessor> recordProcessor,
+                            @JsonProperty("activeTopics") List<String> 
activeTopics) {
+        super(startMs, durationMs);
+        this.consumerNode = (consumerNode == null) ? "" : consumerNode;
+        this.bootstrapServers = (bootstrapServers == null) ? "" : 
bootstrapServers;
+        this.targetMessagesPerSec = targetMessagesPerSec;
+        this.maxMessages = maxMessages;
+        this.consumerConf = configOrEmptyMap(consumerConf);
+        this.commonClientConf = configOrEmptyMap(commonClientConf);
+        this.adminClientConf = configOrEmptyMap(adminClientConf);
+        this.activeTopics = activeTopics == null ? new ArrayList<>() : 
activeTopics;
+        this.shareGroup = shareGroup == null ? DEFAULT_SHARE_GROUP_NAME : 
shareGroup;
+        this.threadsPerWorker = threadsPerWorker == null ? 1 : 
threadsPerWorker;
+        this.recordProcessor = recordProcessor;
+    }
+
+    @JsonProperty
+    public String consumerNode() {
+        return consumerNode;
+    }
+
+    @JsonProperty
+    public String shareGroup() {
+        return shareGroup;
+    }
+
+    @JsonProperty
+    public String bootstrapServers() {
+        return bootstrapServers;
+    }
+
+    @JsonProperty
+    public int targetMessagesPerSec() {
+        return targetMessagesPerSec;
+    }
+
+    @JsonProperty
+    public long maxMessages() {
+        return maxMessages;
+    }
+
+    @JsonProperty
+    public int threadsPerWorker() {
+        return threadsPerWorker;
+    }
+
+    @JsonProperty
+    public Optional<RecordProcessor> recordProcessor() {
+        return this.recordProcessor;
+    }
+
+    @JsonProperty
+    public Map<String, String> consumerConf() {
+        return consumerConf;
+    }
+
+    @JsonProperty
+    public Map<String, String> commonClientConf() {
+        return commonClientConf;
+    }
+
+    @JsonProperty
+    public Map<String, String> adminClientConf() {
+        return adminClientConf;
+    }
+
+    @JsonProperty
+    public List<String> activeTopics() {
+        return activeTopics;
+    }
+
+    @Override
+    public TaskController newController(String id) {
+        return topology -> Collections.singleton(consumerNode);
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new ShareConsumeBenchWorker(id, this);
+    }
+
+    /**
+     * Materializes a list of topic names (optionally with ranges) into a map 
of the topics and their partitions
+     *
+     * Example:
+     * ['foo[1-3]', 'bar[1-2]'] => {'foo1', 'foo2', 'foo3', 'bar1', 'bar2' }
+     */
+    Set<String> expandTopicNames() {
+        Set<String> expandedTopics = new HashSet<>();
+
+        for (String rawTopicName : this.activeTopics) {
+            Set<String> expandedNames = expandTopicName(rawTopicName);
+            if 
(!expandedNames.iterator().next().matches(VALID_EXPANDED_TOPIC_NAME_PATTERN))
+                throw new IllegalArgumentException(String.format("Expanded 
topic name %s is invalid", expandedNames));
+
+            expandedTopics.addAll(expandedNames);
+        }
+        return expandedTopics;
+    }
+
+    /**
+     * Expands a topic name until there are no more ranges in it
+     */
+    private Set<String> expandTopicName(String topicName) {
+        Set<String> expandedNames = StringExpander.expand(topicName);
+        if (expandedNames.size() == 1) {
+            return expandedNames;
+        }
+
+        Set<String> newNames = new HashSet<>();
+        for (String name : expandedNames) {
+            newNames.addAll(expandTopicName(name));
+        }
+        return newNames;
+    }
+}
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
new file mode 100644
index 00000000000..dad654ec552
--- /dev/null
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.WorkerUtils;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class ShareConsumeBenchWorker implements TaskWorker {
+    private static final Logger log = 
LoggerFactory.getLogger(ShareConsumeBenchWorker.class);
+
+    private static final int THROTTLE_PERIOD_MS = 100;
+
+    private final String id;
+    private final ShareConsumeBenchSpec spec;
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    private ScheduledExecutorService executor;
+    private WorkerStatusTracker workerStatus;
+    private StatusUpdater statusUpdater;
+    private Future<?> statusUpdaterFuture;
+    private KafkaFutureImpl<String> doneFuture;
+    public ShareConsumeBenchWorker(String id, ShareConsumeBenchSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @Override
+    public void start(Platform platform, WorkerStatusTracker status,
+                      KafkaFutureImpl<String> doneFuture) throws Exception {
+        if (!running.compareAndSet(false, true)) {
+            throw new IllegalStateException("ShareConsumeBenchWorker is 
already running.");
+        }
+        log.info("{}: Activating ShareConsumeBenchWorker with {}", id, spec);
+        this.statusUpdater = new StatusUpdater();
+        this.executor = Executors.newScheduledThreadPool(
+                spec.threadsPerWorker() + 2, // 1 thread for all the 
ConsumeStatusUpdater and 1 for the StatusUpdater
+                
ThreadUtils.createThreadFactory("ShareConsumeBenchWorkerThread%d", false));
+        this.statusUpdaterFuture = 
executor.scheduleAtFixedRate(this.statusUpdater, 1, 1, TimeUnit.MINUTES);
+        this.workerStatus = status;
+        this.doneFuture = doneFuture;
+        executor.submit(new Prepare());
+    }
+
+    public class Prepare implements Runnable {
+        @Override
+        public void run() {
+            try {
+                List<Future<Void>> consumeTasks = new ArrayList<>();
+                for (ConsumeMessages task : consumeTasks()) {
+                    consumeTasks.add(executor.submit(task));
+                }
+                executor.submit(new CloseStatusUpdater(consumeTasks));
+            } catch (Throwable e) {
+                WorkerUtils.abort(log, "Prepare", e, doneFuture);
+            }
+        }
+
+        private List<ConsumeMessages> consumeTasks() {
+            List<ConsumeMessages> tasks = new ArrayList<>();
+            String shareGroup = shareGroup();
+            int consumerCount = spec.threadsPerWorker();
+
+            Set<String> topics = new HashSet<>(spec.expandTopicNames());
+
+            for (int i = 0; i < consumerCount; i++) {
+                tasks.add(new ConsumeMessages(consumer(shareGroup, 
clientId(i)), spec.recordProcessor(), topics));
+            }
+
+            return tasks;
+        }
+
+        private String clientId(int idx) {
+            return String.format("consumer.%s-%d", id, idx);
+        }
+
+        /**
+         * Creates a new KafkaConsumer instance
+         */
+        private ThreadSafeShareConsumer consumer(String shareGroup, String 
clientId) {
+            Properties props = new Properties();
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
+            props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup);
+            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+            // these defaults maybe over-written by the user-specified 
commonClientConf or consumerConf
+            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
+            return new ThreadSafeShareConsumer(new KafkaShareConsumer<>(props, 
new ByteArrayDeserializer(), new ByteArrayDeserializer()), clientId);
+        }
+
+        private String shareGroup() {
+            return spec.shareGroup();
+        }
+    }
+
+    public class ConsumeMessages implements Callable<Void> {
+        private final Histogram latencyHistogram;
+        private final Histogram messageSizeHistogram;
+        private final Future<?> statusUpdaterFuture;
+        private final Throttle throttle;
+        private final String clientId;
+        private final ThreadSafeShareConsumer consumer;
+        private final Optional<RecordProcessor> recordProcessor;
+
+        private ConsumeMessages(ThreadSafeShareConsumer consumer,
+                                Optional<RecordProcessor> recordProcessor) {
+            this.latencyHistogram = new Histogram(10000);
+            this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
+            this.clientId = consumer.clientId();
+            this.statusUpdaterFuture = executor.scheduleAtFixedRate(
+                    new ConsumeStatusUpdater(latencyHistogram, 
messageSizeHistogram, consumer, recordProcessor), 1, 1, TimeUnit.MINUTES);
+            int perPeriod;
+            if (spec.targetMessagesPerSec() <= 0)
+                perPeriod = Integer.MAX_VALUE;
+            else
+                perPeriod = 
WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
+
+            this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
+            this.consumer = consumer;
+            this.recordProcessor = recordProcessor;
+        }
+
+        ConsumeMessages(ThreadSafeShareConsumer consumer,
+                        Optional<RecordProcessor> recordProcessor,
+                        Set<String> topics) {
+            this(consumer, recordProcessor);
+            log.info("Will consume from topics {}.", topics);
+            this.consumer.subscribe(topics);
+        }
+
+        @Override
+        public Void call() throws Exception {
+            long messagesConsumed = 0;
+            long bytesConsumed = 0;
+            long startTimeMs = Time.SYSTEM.milliseconds();
+            long startBatchMs = startTimeMs;
+            long maxMessages = spec.maxMessages();
+            try {
+                while (messagesConsumed < maxMessages) {
+                    ConsumerRecords<byte[], byte[]> records = consumer.poll();
+                    if (records.isEmpty()) {
+                        continue;
+                    }
+                    long endBatchMs = Time.SYSTEM.milliseconds();
+                    long elapsedBatchMs = endBatchMs - startBatchMs;
+
+                    // Do the record batch processing immediately to avoid 
latency skew.
+                    recordProcessor.ifPresent(processor -> 
processor.processRecords(records));
+
+                    for (ConsumerRecord<byte[], byte[]> record : records) {
+                        messagesConsumed++;
+                        long messageBytes = 0;
+                        if (record.key() != null) {
+                            messageBytes += record.serializedKeySize();
+                        }
+                        if (record.value() != null) {
+                            messageBytes += record.serializedValueSize();
+                        }
+                        latencyHistogram.add(elapsedBatchMs);
+                        messageSizeHistogram.add(messageBytes);
+                        bytesConsumed += messageBytes;
+                        if (messagesConsumed >= maxMessages)
+                            break;
+
+                        throttle.increment();
+                    }
+                    startBatchMs = Time.SYSTEM.milliseconds();
+                }
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "ConsumeRecords", e, doneFuture);
+            } finally {
+                statusUpdaterFuture.cancel(false);
+                StatusData statusData =
+                        new ConsumeStatusUpdater(latencyHistogram, 
messageSizeHistogram, consumer, spec.recordProcessor()).update();
+                long curTimeMs = Time.SYSTEM.milliseconds();
+                log.info("{} Consumed total number of messages={}, bytes={} in 
{} ms.  status: {}",
+                        clientId, messagesConsumed, bytesConsumed, curTimeMs - 
startTimeMs, statusData);
+            }
+            consumer.close();
+            return null;
+        }
+    }
+
+    public class CloseStatusUpdater implements Runnable {
+        private final List<Future<Void>> consumeTasks;
+
+        CloseStatusUpdater(List<Future<Void>> consumeTasks) {
+            this.consumeTasks = consumeTasks;
+        }
+
+        @Override
+        public void run() {
+            while (!consumeTasks.stream().allMatch(Future::isDone)) {
+                try {
+                    Thread.sleep(60000);
+                } catch (InterruptedException e) {
+                    log.debug("{} was interrupted. Closing...", 
this.getClass().getName());
+                    break; // close the thread
+                }
+            }
+            statusUpdaterFuture.cancel(false);
+            statusUpdater.update();
+            doneFuture.complete("");
+        }
+    }
+
+    class StatusUpdater implements Runnable {
+        final Map<String, JsonNode> statuses;
+
+        StatusUpdater() {
+            statuses = new HashMap<>();
+        }
+
+        @Override
+        public void run() {
+            try {
+                update();
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "ConsumeStatusUpdater", e, doneFuture);
+            }
+        }
+
+        synchronized void update() {
+            workerStatus.update(JsonUtil.JSON_SERDE.valueToTree(statuses));
+        }
+
+        synchronized void updateConsumeStatus(String clientId, StatusData 
status) {
+            statuses.put(clientId, JsonUtil.JSON_SERDE.valueToTree(status));
+        }
+    }
+
+    /**
+     * Runnable class that updates the status of a single consumer
+     */
+    public class ConsumeStatusUpdater implements Runnable {
+        private final Histogram latencyHistogram;
+        private final Histogram messageSizeHistogram;
+        private final ThreadSafeShareConsumer consumer;
+        private final Optional<RecordProcessor> recordProcessor;
+
+        ConsumeStatusUpdater(Histogram latencyHistogram,
+                             Histogram messageSizeHistogram,
+                             ThreadSafeShareConsumer consumer,
+                             Optional<RecordProcessor> recordProcessor) {
+            this.latencyHistogram = latencyHistogram;
+            this.messageSizeHistogram = messageSizeHistogram;
+            this.consumer = consumer;
+            this.recordProcessor = recordProcessor;
+        }
+
+        @Override
+        public void run() {
+            try {
+                update();
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "ConsumeStatusUpdater", e, doneFuture);
+            }
+        }
+
+        StatusData update() {
+            Histogram.Summary latSummary = 
latencyHistogram.summarize(StatusData.PERCENTILES);
+            Histogram.Summary msgSummary = 
messageSizeHistogram.summarize(StatusData.PERCENTILES);
+
+            // Parse out the RecordProcessor's status, id specified.
+            Optional<JsonNode> recordProcessorStatus = Optional.empty();
+            if (recordProcessor.isPresent()) {
+                recordProcessorStatus = 
Optional.of(recordProcessor.get().processorStatus());
+            }
+
+            StatusData statusData = new StatusData(
+                    consumer.subscription(),
+                    latSummary.numSamples(),
+                    (long) (msgSummary.numSamples() * msgSummary.average()),
+                    (long) msgSummary.average(),
+                    latSummary.average(),
+                    latSummary.percentiles().get(0).value(),
+                    latSummary.percentiles().get(1).value(),
+                    latSummary.percentiles().get(2).value(),
+                    recordProcessorStatus);
+            statusUpdater.updateConsumeStatus(consumer.clientId(), statusData);
+            log.info("Status={}", JsonUtil.toJsonString(statusData));
+            return statusData;
+        }
+    }
+
+    public static class StatusData {
+        private final long totalMessagesReceived;
+        private final Set<String> subscription;
+        private final long totalBytesReceived;
+        private final long averageMessageSizeBytes;
+        private final float averageLatencyMs;
+        private final int p50LatencyMs;
+        private final int p95LatencyMs;
+        private final int p99LatencyMs;
+        private final Optional<JsonNode> recordProcessorStatus;
+
+        /**
+         * The percentiles to use when calculating the histogram data.
+         * These should match up with the p50LatencyMs, p95LatencyMs, etc. 
fields.
+         */
+        static final float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+        @JsonCreator
+        StatusData(@JsonProperty("subscription") Set<String> subscription,
+                   @JsonProperty("totalMessagesReceived") long 
totalMessagesReceived,
+                   @JsonProperty("totalBytesReceived") long totalBytesReceived,
+                   @JsonProperty("averageMessageSizeBytes") long 
averageMessageSizeBytes,
+                   @JsonProperty("averageLatencyMs") float averageLatencyMs,
+                   @JsonProperty("p50LatencyMs") int p50latencyMs,
+                   @JsonProperty("p95LatencyMs") int p95latencyMs,
+                   @JsonProperty("p99LatencyMs") int p99latencyMs,
+                   @JsonProperty("recordProcessorStatus") Optional<JsonNode> 
recordProcessorStatus) {
+            this.subscription = subscription;
+            this.totalMessagesReceived = totalMessagesReceived;
+            this.totalBytesReceived = totalBytesReceived;
+            this.averageMessageSizeBytes = averageMessageSizeBytes;
+            this.averageLatencyMs = averageLatencyMs;
+            this.p50LatencyMs = p50latencyMs;
+            this.p95LatencyMs = p95latencyMs;
+            this.p99LatencyMs = p99latencyMs;
+            this.recordProcessorStatus = recordProcessorStatus;
+        }
+
+        @JsonProperty
+        public Set<String> subscription() {
+            return subscription;
+        }
+
+        @JsonProperty
+        public long totalMessagesReceived() {
+            return totalMessagesReceived;
+        }
+
+        @JsonProperty
+        public long totalBytesReceived() {
+            return totalBytesReceived;
+        }
+
+        @JsonProperty
+        public long averageMessageSizeBytes() {
+            return averageMessageSizeBytes;
+        }
+
+        @JsonProperty
+        public float averageLatencyMs() {
+            return averageLatencyMs;
+        }
+
+        @JsonProperty
+        public int p50LatencyMs() {
+            return p50LatencyMs;
+        }
+
+        @JsonProperty
+        public int p95LatencyMs() {
+            return p95LatencyMs;
+        }
+
+        @JsonProperty
+        public int p99LatencyMs() {
+            return p99LatencyMs;
+        }
+
+        @JsonProperty
+        public JsonNode recordProcessorStatus() {
+            return recordProcessorStatus.orElse(null);
+        }
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        if (!running.compareAndSet(true, false)) {
+            throw new IllegalStateException("ShareConsumeBenchWorker is not 
running.");
+        }
+        log.info("{}: Deactivating ShareConsumeBenchWorker.", id);
+        doneFuture.complete("");
+        executor.shutdownNow();
+        executor.awaitTermination(1, TimeUnit.DAYS);
+        this.executor = null;
+        this.statusUpdater = null;
+        this.statusUpdaterFuture = null;
+        this.workerStatus = null;
+        this.doneFuture = null;
+    }
+
+    /**
+     * A thread-safe KafkaShareConsumer wrapper
+     */
+    private static class ThreadSafeShareConsumer {
+        private final KafkaShareConsumer<byte[], byte[]> consumer;
+        private final String clientId;
+        private final ReentrantLock consumerLock;
+        private boolean closed = false;
+
+        ThreadSafeShareConsumer(KafkaShareConsumer<byte[], byte[]> consumer, 
String clientId) {
+            this.consumer = consumer;
+            this.clientId = clientId;
+            this.consumerLock = new ReentrantLock();
+        }
+
+        ConsumerRecords<byte[], byte[]> poll() {
+            this.consumerLock.lock();
+            try {
+                return consumer.poll(Duration.ofMillis(50));
+            } finally {
+                this.consumerLock.unlock();
+            }
+        }
+
+        void close() {
+            if (closed)
+                return;
+            this.consumerLock.lock();
+            try {
+                consumer.unsubscribe();
+                Utils.closeQuietly(consumer, "consumer");
+                closed = true;
+            } finally {
+                this.consumerLock.unlock();
+            }
+        }
+
+        void subscribe(Set<String> topics) {
+            this.consumerLock.lock();
+            try {
+                consumer.subscribe(topics);
+            } finally {
+                this.consumerLock.unlock();
+            }
+        }
+
+        Set<String> subscription() {
+            this.consumerLock.lock();
+            try {
+                return consumer.subscription();
+            } finally {
+                this.consumerLock.unlock();
+            }
+        }
+
+        String clientId() {
+            return clientId;
+        }
+
+        KafkaShareConsumer<byte[], byte[]> consumer() {
+            return consumer;
+        }
+    }
+}
diff --git 
a/trogdor/src/test/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpecTest.java
 
b/trogdor/src/test/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpecTest.java
new file mode 100644
index 00000000000..76a30b73ec1
--- /dev/null
+++ 
b/trogdor/src/test/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchSpecTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ShareConsumeBenchSpecTest {
+
+    @Test
+    public void testExpandTopicNames() {
+        ShareConsumeBenchSpec shareConsumeBenchSpec = 
shareConsumeBenchSpec(Arrays.asList("foo[1-3]", "bar"));
+        Set<String> expectedNames = new HashSet<>();
+
+        expectedNames.add("foo1");
+        expectedNames.add("foo2");
+        expectedNames.add("foo3");
+        expectedNames.add("bar");
+
+        assertEquals(expectedNames, shareConsumeBenchSpec.expandTopicNames());
+    }
+
+    @Test
+    public void testInvalidNameRaisesException() {
+        for (String invalidName : Arrays.asList("In:valid", "invalid:", 
":invalid[]", "in:valid:", "invalid[1-3]:")) {
+            assertThrows(IllegalArgumentException.class, () -> 
shareConsumeBenchSpec(Collections.singletonList(invalidName)).expandTopicNames());
+        }
+    }
+
+    private ShareConsumeBenchSpec shareConsumeBenchSpec(List<String> 
activeTopics) {
+        return new ShareConsumeBenchSpec(0, 0, "node", "localhost",
+                123, 1234, "sg-1",
+                Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap(), 1,
+                Optional.empty(), activeTopics);
+    }
+
+}
\ No newline at end of file


Reply via email to