This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0181073d49b KAFKA-17933: Added round trip trogdor workload for share 
consumer. (#17692)
0181073d49b is described below

commit 0181073d49b5a23ecdd493cd72c880a2cfc9c416
Author: ShivsundarR <[email protected]>
AuthorDate: Thu Nov 7 05:51:14 2024 -0500

    KAFKA-17933: Added round trip trogdor workload for share consumer. (#17692)
    
    Added ShareRoundTripWorker.java similar to RoundTripWorker.java. This will 
start a producer and a share consumer on a single node. The share consumer 
reads back the messages produced by the producer.
    
    Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy 
<[email protected]>
---
 tests/spec/share_round_trip.json                   |  34 ++
 .../org/apache/kafka/trogdor/task/TaskSpec.java    |   2 +-
 .../kafka/trogdor/workload/RoundTripWorker.java    | 435 +--------------------
 ...undTripWorker.java => RoundTripWorkerBase.java} |  62 +--
 .../trogdor/workload/RoundTripWorkloadSpec.java    |   2 +-
 .../trogdor/workload/ShareRoundTripWorker.java     |  68 ++++
 .../workload/ShareRoundTripWorkloadSpec.java       |  48 +++
 7 files changed, 205 insertions(+), 446 deletions(-)

diff --git a/tests/spec/share_round_trip.json b/tests/spec/share_round_trip.json
new file mode 100644
index 00000000000..426bd3ebfdf
--- /dev/null
+++ b/tests/spec/share_round_trip.json
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//
+// An example task specification for running a round trip test in Trogdor.
+// See trogdor/README.md for details.
+//
+
+{
+  "class": "org.apache.kafka.trogdor.workload.ShareRoundTripWorkloadSpec",
+  "durationMs": 10000000,
+  "clientNode": "node0",
+  "bootstrapServers": "localhost:9092",
+  "targetMessagesPerSec": 1000,
+  "maxMessages": 100,
+  "activeTopics": {
+    "round_trip_topic_share[0-1]": {
+      "numPartitions": 2,
+      "replicationFactor": 1
+    }
+  }
+}
diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
index 8033b95a8ba..f7a538195b8 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
@@ -111,7 +111,7 @@ public abstract class TaskSpec {
         return JsonUtil.toJsonString(this);
     }
 
-    protected Map<String, String> configOrEmptyMap(Map<String, String> config) 
{
+    protected static Map<String, String> configOrEmptyMap(Map<String, String> 
config) {
         return (config == null) ? Collections.emptyMap() : config;
     }
 }
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 385e14162ee..d1f47e626fd 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -14,447 +14,54 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.trogdor.workload;
 
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.ThreadUtils;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.WorkerUtils;
-import org.apache.kafka.trogdor.task.TaskWorker;
-import org.apache.kafka.trogdor.task.WorkerStatusTracker;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.node.TextNode;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.TreeSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-
-public class RoundTripWorker implements TaskWorker {
-    private static final int THROTTLE_PERIOD_MS = 100;
-
-    private static final int LOG_INTERVAL_MS = 5000;
-
-    private static final int LOG_NUM_MESSAGES = 10;
-
-    private static final Logger log = 
LoggerFactory.getLogger(RoundTripWorker.class);
-
-    private static final PayloadGenerator KEY_GENERATOR = new 
SequentialPayloadGenerator(4, 0);
-
-    private ToReceiveTracker toReceiveTracker;
-
-    private final String id;
-
-    private final RoundTripWorkloadSpec spec;
-
-    private final AtomicBoolean running = new AtomicBoolean(false);
-
-    private final Lock lock = new ReentrantLock();
 
-    private final Condition unackedSendsAreZero = lock.newCondition();
+public class RoundTripWorker extends RoundTripWorkerBase {
 
-    private ScheduledExecutorService executor;
+    KafkaConsumer<byte[], byte[]> consumer;
 
-    private WorkerStatusTracker status;
-
-    private KafkaFutureImpl<String> doneFuture;
-
-    private KafkaProducer<byte[], byte[]> producer;
-
-    private KafkaConsumer<byte[], byte[]> consumer;
-
-    private Long unackedSends;
-
-    private ToSendTracker toSendTracker;
-
-    public RoundTripWorker(String id, RoundTripWorkloadSpec spec) {
+    RoundTripWorker(String id, RoundTripWorkloadSpec spec) {
         this.id = id;
         this.spec = spec;
     }
 
     @Override
-    public void start(Platform platform, WorkerStatusTracker status,
-                      KafkaFutureImpl<String> doneFuture) throws Exception {
-        if (!running.compareAndSet(false, true)) {
-            throw new IllegalStateException("RoundTripWorker is already 
running.");
-        }
-        log.info("{}: Activating RoundTripWorker.", id);
-        this.executor = Executors.newScheduledThreadPool(3,
-            ThreadUtils.createThreadFactory("RoundTripWorker%d", false));
-        this.status = status;
-        this.doneFuture = doneFuture;
-        this.producer = null;
-        this.consumer = null;
-        this.unackedSends = spec.maxMessages();
-        executor.submit(new Prepare());
-    }
-
-    class Prepare implements Runnable {
-        @Override
-        public void run() {
-            try {
-                if (spec.targetMessagesPerSec() <= 0) {
-                    throw new ConfigException("Can't have targetMessagesPerSec 
<= 0.");
-                }
-                Map<String, NewTopic> newTopics = new HashMap<>();
-                HashSet<TopicPartition> active = new HashSet<>();
-                for (Map.Entry<String, PartitionsSpec> entry :
-                    spec.activeTopics().materialize().entrySet()) {
-                    String topicName = entry.getKey();
-                    PartitionsSpec partSpec = entry.getValue();
-                    newTopics.put(topicName, partSpec.newTopic(topicName));
-                    for (Integer partitionNumber : 
partSpec.partitionNumbers()) {
-                        active.add(new TopicPartition(topicName, 
partitionNumber));
-                    }
-                }
-                if (active.isEmpty()) {
-                    throw new RuntimeException("You must specify at least one 
active topic.");
-                }
-                status.update(new TextNode("Creating " + 
newTopics.keySet().size() + " topic(s)"));
-                WorkerUtils.createTopics(log, spec.bootstrapServers(), 
spec.commonClientConf(),
-                    spec.adminClientConf(), newTopics, false);
-                status.update(new TextNode("Created " + 
newTopics.keySet().size() + " topic(s)"));
-                toSendTracker = new ToSendTracker(spec.maxMessages());
-                toReceiveTracker = new ToReceiveTracker();
-                executor.submit(new ProducerRunnable(active));
-                executor.submit(new ConsumerRunnable(active));
-                executor.submit(new StatusUpdater());
-                executor.scheduleWithFixedDelay(
-                    new StatusUpdater(), 30, 30, TimeUnit.SECONDS);
-            } catch (Throwable e) {
-                WorkerUtils.abort(log, "Prepare", e, doneFuture);
-            }
-        }
-    }
-
-    private static class ToSendTrackerResult {
-        final long index;
-        final boolean firstSend;
-
-        ToSendTrackerResult(long index, boolean firstSend) {
-            this.index = index;
-            this.firstSend = firstSend;
-        }
-    }
-
-    private static class ToSendTracker {
-        private final long maxMessages;
-        private final List<Long> failed = new ArrayList<>();
-        private long frontier = 0;
-
-        ToSendTracker(long maxMessages) {
-            this.maxMessages = maxMessages;
-        }
-
-        synchronized void addFailed(long index) {
-            failed.add(index);
-        }
-
-        synchronized long frontier() {
-            return frontier;
-        }
-
-        synchronized ToSendTrackerResult next() {
-            if (failed.isEmpty()) {
-                if (frontier >= maxMessages) {
-                    return null;
-                } else {
-                    return new ToSendTrackerResult(frontier++, true);
-                }
-            } else {
-                return new ToSendTrackerResult(failed.remove(0), false);
-            }
-        }
-    }
-
-    class ProducerRunnable implements Runnable {
-        private final HashSet<TopicPartition> partitions;
-        private final Throttle throttle;
-
-        ProducerRunnable(HashSet<TopicPartition> partitions) {
-            this.partitions = partitions;
-            Properties props = new Properties();
-            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
-            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
-            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 4 * 16 * 1024L);
-            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000L);
-            props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer." + id);
-            props.put(ProducerConfig.ACKS_CONFIG, "all");
-            props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
-            // user may over-write the defaults with common client config and 
producer config
-            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.producerConf());
-            producer = new KafkaProducer<>(props, new ByteArraySerializer(),
-                new ByteArraySerializer());
-            int perPeriod = WorkerUtils.
-                perSecToPerPeriod(spec.targetMessagesPerSec(), 
THROTTLE_PERIOD_MS);
-            this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
-        }
-
-        @Override
-        public void run() {
-            long messagesSent = 0;
-            long uniqueMessagesSent = 0;
-            log.debug("{}: Starting RoundTripWorker#ProducerRunnable.", id);
-            try {
-                Iterator<TopicPartition> iter = partitions.iterator();
-                while (true) {
-                    final ToSendTrackerResult result = toSendTracker.next();
-                    if (result == null) {
-                        break;
-                    }
-                    throttle.increment();
-                    final long messageIndex = result.index;
-                    if (result.firstSend) {
-                        toReceiveTracker.addPending(messageIndex);
-                        uniqueMessagesSent++;
-                    }
-                    messagesSent++;
-                    if (!iter.hasNext()) {
-                        iter = partitions.iterator();
-                    }
-                    TopicPartition partition = iter.next();
-                    // we explicitly specify generator position based on 
message index
-                    ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(partition.topic(),
-                        partition.partition(), 
KEY_GENERATOR.generate(messageIndex),
-                        spec.valueGenerator().generate(messageIndex));
-                    producer.send(record, (metadata, exception) -> {
-                        if (exception == null) {
-                            lock.lock();
-                            try {
-                                unackedSends -= 1;
-                                if (unackedSends <= 0)
-                                    unackedSendsAreZero.signalAll();
-                            } finally {
-                                lock.unlock();
-                            }
-                        } else {
-                            log.info("{}: Got exception when sending message 
{}: {}",
-                                id, messageIndex, exception.getMessage());
-                            toSendTracker.addFailed(messageIndex);
-                        }
-                    });
-                }
-            } catch (Throwable e) {
-                WorkerUtils.abort(log, "ProducerRunnable", e, doneFuture);
-            } finally {
-                lock.lock();
-                try {
-                    log.info("{}: ProducerRunnable is exiting.  
messagesSent={}; uniqueMessagesSent={}; " +
-                                    "ackedSends={}/{}.", id, messagesSent, 
uniqueMessagesSent,
-                            spec.maxMessages() - unackedSends, 
spec.maxMessages());
-                } finally {
-                    lock.unlock();
-                }
-            }
-        }
-    }
-
-    private class ToReceiveTracker {
-        private final TreeSet<Long> pending = new TreeSet<>();
-
-        private long totalReceived = 0;
-
-        synchronized void addPending(long messageIndex) {
-            pending.add(messageIndex);
-        }
-
-        synchronized boolean removePending(long messageIndex) {
-            if (pending.remove(messageIndex)) {
-                totalReceived++;
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-        synchronized long totalReceived() {
-            return totalReceived;
-        }
-
-        void log() {
-            long numToReceive;
-            List<Long> list = new ArrayList<>(LOG_NUM_MESSAGES);
-            synchronized (this) {
-                numToReceive = pending.size();
-                for (Iterator<Long> iter = pending.iterator();
-                        iter.hasNext() && (list.size() < LOG_NUM_MESSAGES); ) {
-                    Long i = iter.next();
-                    list.add(i);
-                }
-            }
-            log.info("{}: consumer waiting for {} message(s), starting with: 
{}",
-                id, numToReceive, 
list.stream().map(Object::toString).collect(Collectors.joining(", ")));
-        }
-    }
-
-    class ConsumerRunnable implements Runnable {
-
-        ConsumerRunnable(HashSet<TopicPartition> partitions) {
-            Properties props = new Properties();
-            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
-            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
-            props.put(ConsumerConfig.GROUP_ID_CONFIG, 
"round-trip-consumer-group-" + id);
-            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
-            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
-            // user may over-write the defaults with common client config and 
consumer config
-            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
-            consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
+    public void initializeConsumer(HashSet<TopicPartition> partitions) {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
+        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+        // user may over-write the defaults with common client config and 
consumer config
+        WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
+
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-consumer-group-" 
+ id);
+        consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
-            consumer.assign(partitions);
-        }
-
-        @Override
-        public void run() {
-            long uniqueMessagesReceived = 0;
-            long messagesReceived = 0;
-            long pollInvoked = 0;
-            log.debug("{}: Starting RoundTripWorker#ConsumerRunnable.", id);
-            try {
-                long lastLogTimeMs = Time.SYSTEM.milliseconds();
-                while (true) {
-                    try {
-                        pollInvoked++;
-                        ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(50));
-                        for (ConsumerRecord<byte[], byte[]> record : records) {
-                            int messageIndex = 
ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();
-                            messagesReceived++;
-                            if (toReceiveTracker.removePending(messageIndex)) {
-                                uniqueMessagesReceived++;
-                                if (uniqueMessagesReceived >= 
spec.maxMessages()) {
-                                    lock.lock();
-                                    try {
-                                        log.info("{}: Consumer received the 
full count of {} unique messages.  " +
-                                                "Waiting for all {} sends to 
be acked...", id, spec.maxMessages(), unackedSends);
-                                        while (unackedSends > 0)
-                                            unackedSendsAreZero.await();
-                                    } finally {
-                                        lock.unlock();
-                                    }
-
-                                    log.info("{}: all sends have been acked.", 
id);
-                                    new StatusUpdater().update();
-                                    doneFuture.complete("");
-                                    return;
-                                }
-                            }
-                        }
-                        long curTimeMs = Time.SYSTEM.milliseconds();
-                        if (curTimeMs > lastLogTimeMs + LOG_INTERVAL_MS) {
-                            toReceiveTracker.log();
-                            lastLogTimeMs = curTimeMs;
-                        }
-                    } catch (WakeupException e) {
-                        log.debug("{}: Consumer got WakeupException", id, e);
-                    } catch (TimeoutException e) {
-                        log.debug("{}: Consumer got TimeoutException", id, e);
-                    }
-                }
-            } catch (Throwable e) {
-                WorkerUtils.abort(log, "ConsumerRunnable", e, doneFuture);
-            } finally {
-                log.info("{}: ConsumerRunnable is exiting.  Invoked poll {} 
time(s).  " +
-                    "messagesReceived = {}; uniqueMessagesReceived = {}.",
-                    id, pollInvoked, messagesReceived, uniqueMessagesReceived);
-            }
-        }
+        consumer.assign(partitions);
     }
 
-    public class StatusUpdater implements Runnable {
-        @Override
-        public void run() {
-            try {
-                update();
-            } catch (Exception e) {
-                WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
-            }
-        }
-
-        StatusData update() {
-            StatusData statusData =
-                new StatusData(toSendTracker.frontier(), 
toReceiveTracker.totalReceived());
-            status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
-            return statusData;
-        }
-    }
-
-    public static class StatusData {
-        private final long totalUniqueSent;
-        private final long totalReceived;
-
-        @JsonCreator
-        public StatusData(@JsonProperty("totalUniqueSent") long 
totalUniqueSent,
-                          @JsonProperty("totalReceived") long totalReceived) {
-            this.totalUniqueSent = totalUniqueSent;
-            this.totalReceived = totalReceived;
-        }
-
-        @JsonProperty
-        public long totalUniqueSent() {
-            return totalUniqueSent;
-        }
-
-        @JsonProperty
-        public long totalReceived() {
-            return totalReceived;
-        }
+    @Override
+    protected ConsumerRecords<byte[], byte[]> fetchRecords(Duration duration) {
+        return consumer.poll(duration);
     }
 
     @Override
-    public void stop(Platform platform) throws Exception {
-        if (!running.compareAndSet(true, false)) {
-            throw new IllegalStateException("RoundTripWorker is not running.");
-        }
-        log.info("{}: Deactivating RoundTripWorker.", id);
-        doneFuture.complete("");
-        executor.shutdownNow();
-        executor.awaitTermination(1, TimeUnit.DAYS);
+    protected void shutdownConsumer() {
         Utils.closeQuietly(consumer, "consumer");
-        Utils.closeQuietly(producer, "producer");
-        this.consumer = null;
-        this.producer = null;
-        this.unackedSends = null;
-        this.executor = null;
-        this.doneFuture = null;
-        log.info("{}: Deactivated RoundTripWorker.", id);
+        consumer = null;
     }
 }
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkerBase.java
similarity index 91%
copy from 
trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
copy to 
trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkerBase.java
index 385e14162ee..7adc1287d71 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkerBase.java
@@ -18,10 +18,8 @@
 package org.apache.kafka.trogdor.workload;
 
 import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -30,7 +28,6 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
@@ -69,22 +66,31 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 
-public class RoundTripWorker implements TaskWorker {
+/**
+ * A base class for a round-trip worker which will produce and consume equal 
number of messages.
+ *
+ * This is used to create a round-trip trogdor agent which will spawn 
producers and consumers to
+ * produce and consume equal number of messages based on the workload it is 
executing.
+ *
+ * Currently, there are 2 subclasses, one which uses {@link 
org.apache.kafka.clients.consumer.KafkaConsumer}
+ * and another which uses {@link 
org.apache.kafka.clients.consumer.KafkaShareConsumer} as the consumer.
+ */
+public abstract class RoundTripWorkerBase implements TaskWorker {
     private static final int THROTTLE_PERIOD_MS = 100;
 
     private static final int LOG_INTERVAL_MS = 5000;
 
     private static final int LOG_NUM_MESSAGES = 10;
 
-    private static final Logger log = 
LoggerFactory.getLogger(RoundTripWorker.class);
+    private static final Logger log = 
LoggerFactory.getLogger(RoundTripWorkerBase.class);
 
     private static final PayloadGenerator KEY_GENERATOR = new 
SequentialPayloadGenerator(4, 0);
 
     private ToReceiveTracker toReceiveTracker;
 
-    private final String id;
+    protected String id;
 
-    private final RoundTripWorkloadSpec spec;
+    protected RoundTripWorkloadSpec spec;
 
     private final AtomicBoolean running = new AtomicBoolean(false);
 
@@ -100,17 +106,10 @@ public class RoundTripWorker implements TaskWorker {
 
     private KafkaProducer<byte[], byte[]> producer;
 
-    private KafkaConsumer<byte[], byte[]> consumer;
-
     private Long unackedSends;
 
     private ToSendTracker toSendTracker;
 
-    public RoundTripWorker(String id, RoundTripWorkloadSpec spec) {
-        this.id = id;
-        this.spec = spec;
-    }
-
     @Override
     public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> doneFuture) throws Exception {
@@ -123,7 +122,6 @@ public class RoundTripWorker implements TaskWorker {
         this.status = status;
         this.doneFuture = doneFuture;
         this.producer = null;
-        this.consumer = null;
         this.unackedSends = spec.maxMessages();
         executor.submit(new Prepare());
     }
@@ -329,18 +327,7 @@ public class RoundTripWorker implements TaskWorker {
     class ConsumerRunnable implements Runnable {
 
         ConsumerRunnable(HashSet<TopicPartition> partitions) {
-            Properties props = new Properties();
-            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
-            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
-            props.put(ConsumerConfig.GROUP_ID_CONFIG, 
"round-trip-consumer-group-" + id);
-            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
-            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
-            // user may over-write the defaults with common client config and 
consumer config
-            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
-            consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
-                new ByteArrayDeserializer());
-            consumer.assign(partitions);
+            initializeConsumer(partitions);
         }
 
         @Override
@@ -354,7 +341,7 @@ public class RoundTripWorker implements TaskWorker {
                 while (true) {
                     try {
                         pollInvoked++;
-                        ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(50));
+                        ConsumerRecords<byte[], byte[]> records = 
fetchRecords(Duration.ofMillis(50));
                         for (ConsumerRecord<byte[], byte[]> record : records) {
                             int messageIndex = 
ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();
                             messagesReceived++;
@@ -439,6 +426,22 @@ public class RoundTripWorker implements TaskWorker {
         }
     }
 
+    /**
+     * Initialize the consumer.
+     */
+    protected abstract void initializeConsumer(HashSet<TopicPartition> 
partitions);
+
+    /**
+     *
+     * Invoke poll from the consumer and return the records fetched.
+     */
+    protected abstract ConsumerRecords<byte[], byte[]> fetchRecords(Duration 
duration);
+
+    /**
+     * Close the consumer.
+     */
+    protected abstract void shutdownConsumer();
+
     @Override
     public void stop(Platform platform) throws Exception {
         if (!running.compareAndSet(true, false)) {
@@ -448,9 +451,8 @@ public class RoundTripWorker implements TaskWorker {
         doneFuture.complete("");
         executor.shutdownNow();
         executor.awaitTermination(1, TimeUnit.DAYS);
-        Utils.closeQuietly(consumer, "consumer");
+        shutdownConsumer();
         Utils.closeQuietly(producer, "producer");
-        this.consumer = null;
         this.producer = null;
         this.unackedSends = null;
         this.executor = null;
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 223b0c0e20c..67fec60990b 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -31,7 +31,7 @@ import java.util.Map;
  * The specification for a workload that sends messages to a broker and then
  * reads them back.
  */
-public final class RoundTripWorkloadSpec extends TaskSpec {
+public class RoundTripWorkloadSpec extends TaskSpec {
     private final String clientNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
new file mode 100644
index 00000000000..2ce028243a2
--- /dev/null
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.WorkerUtils;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Properties;
+
+
+public class ShareRoundTripWorker extends RoundTripWorkerBase {
+    KafkaShareConsumer<byte[], byte[]> consumer;
+
+    ShareRoundTripWorker(String id, RoundTripWorkloadSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @Override
+    public void initializeConsumer(HashSet<TopicPartition> partitions) {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
+        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+        // user may over-write the defaults with common client config and 
consumer config
+        WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
+
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-share-group-" + 
id);
+        consumer = new KafkaShareConsumer<>(props, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+        consumer.subscribe(spec.activeTopics().materialize().keySet());
+    }
+
+    @Override
+    protected ConsumerRecords<byte[], byte[]> fetchRecords(Duration duration) {
+        return consumer.poll(duration);
+    }
+
+    @Override
+    protected void shutdownConsumer() {
+        Utils.closeQuietly(consumer, "consumer");
+        consumer = null;
+    }
+}
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorkloadSpec.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorkloadSpec.java
new file mode 100644
index 00000000000..3ea5e0938bb
--- /dev/null
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorkloadSpec.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+public class ShareRoundTripWorkloadSpec extends RoundTripWorkloadSpec {
+
+    @JsonCreator
+    public ShareRoundTripWorkloadSpec(@JsonProperty("startMs") long startMs,
+              @JsonProperty("durationMs") long durationMs,
+              @JsonProperty("clientNode") String clientNode,
+              @JsonProperty("bootstrapServers") String bootstrapServers,
+              @JsonProperty("commonClientConf") Map<String, String> 
commonClientConf,
+              @JsonProperty("adminClientConf") Map<String, String> 
adminClientConf,
+              @JsonProperty("consumerConf") Map<String, String> consumerConf,
+              @JsonProperty("producerConf") Map<String, String> producerConf,
+              @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
+              @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
+              @JsonProperty("activeTopics") TopicsSpec activeTopics,
+              @JsonProperty("maxMessages") long maxMessages) {
+        super(startMs, durationMs, clientNode, bootstrapServers, 
commonClientConf, adminClientConf, consumerConf, producerConf, 
targetMessagesPerSec, valueGenerator, activeTopics, maxMessages);
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new ShareRoundTripWorker(id, this);
+    }
+}


Reply via email to