This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0181073d49b KAFKA-17933: Added round trip trogdor workload for share
consumer. (#17692)
0181073d49b is described below
commit 0181073d49b5a23ecdd493cd72c880a2cfc9c416
Author: ShivsundarR <[email protected]>
AuthorDate: Thu Nov 7 05:51:14 2024 -0500
KAFKA-17933: Added round trip trogdor workload for share consumer. (#17692)
Added ShareRoundTripWorker.java similar to RoundTripWorker.java. This will
start a producer and a share consumer on a single node. The share consumer
reads back the messages produced by the producer.
Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy
<[email protected]>
---
tests/spec/share_round_trip.json | 34 ++
.../org/apache/kafka/trogdor/task/TaskSpec.java | 2 +-
.../kafka/trogdor/workload/RoundTripWorker.java | 435 +--------------------
...undTripWorker.java => RoundTripWorkerBase.java} | 62 +--
.../trogdor/workload/RoundTripWorkloadSpec.java | 2 +-
.../trogdor/workload/ShareRoundTripWorker.java | 68 ++++
.../workload/ShareRoundTripWorkloadSpec.java | 48 +++
7 files changed, 205 insertions(+), 446 deletions(-)
diff --git a/tests/spec/share_round_trip.json b/tests/spec/share_round_trip.json
new file mode 100644
index 00000000000..426bd3ebfdf
--- /dev/null
+++ b/tests/spec/share_round_trip.json
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//
+// An example task specification for running a round trip test in Trogdor.
+// See trogdor/README.md for details.
+//
+
+{
+ "class": "org.apache.kafka.trogdor.workload.ShareRoundTripWorkloadSpec",
+ "durationMs": 10000000,
+ "clientNode": "node0",
+ "bootstrapServers": "localhost:9092",
+ "targetMessagesPerSec": 1000,
+ "maxMessages": 100,
+ "activeTopics": {
+ "round_trip_topic_share[0-1]": {
+ "numPartitions": 2,
+ "replicationFactor": 1
+ }
+ }
+}
diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
index 8033b95a8ba..f7a538195b8 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
@@ -111,7 +111,7 @@ public abstract class TaskSpec {
return JsonUtil.toJsonString(this);
}
- protected Map<String, String> configOrEmptyMap(Map<String, String> config)
{
+ protected static Map<String, String> configOrEmptyMap(Map<String, String>
config) {
return (config == null) ? Collections.emptyMap() : config;
}
}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 385e14162ee..d1f47e626fd 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -14,447 +14,54 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.trogdor.workload;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.ThreadUtils;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.WorkerUtils;
-import org.apache.kafka.trogdor.task.TaskWorker;
-import org.apache.kafka.trogdor.task.WorkerStatusTracker;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.node.TextNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.TreeSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-
-public class RoundTripWorker implements TaskWorker {
- private static final int THROTTLE_PERIOD_MS = 100;
-
- private static final int LOG_INTERVAL_MS = 5000;
-
- private static final int LOG_NUM_MESSAGES = 10;
-
- private static final Logger log =
LoggerFactory.getLogger(RoundTripWorker.class);
-
- private static final PayloadGenerator KEY_GENERATOR = new
SequentialPayloadGenerator(4, 0);
-
- private ToReceiveTracker toReceiveTracker;
-
- private final String id;
-
- private final RoundTripWorkloadSpec spec;
-
- private final AtomicBoolean running = new AtomicBoolean(false);
-
- private final Lock lock = new ReentrantLock();
- private final Condition unackedSendsAreZero = lock.newCondition();
+public class RoundTripWorker extends RoundTripWorkerBase {
- private ScheduledExecutorService executor;
+ KafkaConsumer<byte[], byte[]> consumer;
- private WorkerStatusTracker status;
-
- private KafkaFutureImpl<String> doneFuture;
-
- private KafkaProducer<byte[], byte[]> producer;
-
- private KafkaConsumer<byte[], byte[]> consumer;
-
- private Long unackedSends;
-
- private ToSendTracker toSendTracker;
-
- public RoundTripWorker(String id, RoundTripWorkloadSpec spec) {
+ RoundTripWorker(String id, RoundTripWorkloadSpec spec) {
this.id = id;
this.spec = spec;
}
@Override
- public void start(Platform platform, WorkerStatusTracker status,
- KafkaFutureImpl<String> doneFuture) throws Exception {
- if (!running.compareAndSet(false, true)) {
- throw new IllegalStateException("RoundTripWorker is already
running.");
- }
- log.info("{}: Activating RoundTripWorker.", id);
- this.executor = Executors.newScheduledThreadPool(3,
- ThreadUtils.createThreadFactory("RoundTripWorker%d", false));
- this.status = status;
- this.doneFuture = doneFuture;
- this.producer = null;
- this.consumer = null;
- this.unackedSends = spec.maxMessages();
- executor.submit(new Prepare());
- }
-
- class Prepare implements Runnable {
- @Override
- public void run() {
- try {
- if (spec.targetMessagesPerSec() <= 0) {
- throw new ConfigException("Can't have targetMessagesPerSec
<= 0.");
- }
- Map<String, NewTopic> newTopics = new HashMap<>();
- HashSet<TopicPartition> active = new HashSet<>();
- for (Map.Entry<String, PartitionsSpec> entry :
- spec.activeTopics().materialize().entrySet()) {
- String topicName = entry.getKey();
- PartitionsSpec partSpec = entry.getValue();
- newTopics.put(topicName, partSpec.newTopic(topicName));
- for (Integer partitionNumber :
partSpec.partitionNumbers()) {
- active.add(new TopicPartition(topicName,
partitionNumber));
- }
- }
- if (active.isEmpty()) {
- throw new RuntimeException("You must specify at least one
active topic.");
- }
- status.update(new TextNode("Creating " +
newTopics.keySet().size() + " topic(s)"));
- WorkerUtils.createTopics(log, spec.bootstrapServers(),
spec.commonClientConf(),
- spec.adminClientConf(), newTopics, false);
- status.update(new TextNode("Created " +
newTopics.keySet().size() + " topic(s)"));
- toSendTracker = new ToSendTracker(spec.maxMessages());
- toReceiveTracker = new ToReceiveTracker();
- executor.submit(new ProducerRunnable(active));
- executor.submit(new ConsumerRunnable(active));
- executor.submit(new StatusUpdater());
- executor.scheduleWithFixedDelay(
- new StatusUpdater(), 30, 30, TimeUnit.SECONDS);
- } catch (Throwable e) {
- WorkerUtils.abort(log, "Prepare", e, doneFuture);
- }
- }
- }
-
- private static class ToSendTrackerResult {
- final long index;
- final boolean firstSend;
-
- ToSendTrackerResult(long index, boolean firstSend) {
- this.index = index;
- this.firstSend = firstSend;
- }
- }
-
- private static class ToSendTracker {
- private final long maxMessages;
- private final List<Long> failed = new ArrayList<>();
- private long frontier = 0;
-
- ToSendTracker(long maxMessages) {
- this.maxMessages = maxMessages;
- }
-
- synchronized void addFailed(long index) {
- failed.add(index);
- }
-
- synchronized long frontier() {
- return frontier;
- }
-
- synchronized ToSendTrackerResult next() {
- if (failed.isEmpty()) {
- if (frontier >= maxMessages) {
- return null;
- } else {
- return new ToSendTrackerResult(frontier++, true);
- }
- } else {
- return new ToSendTrackerResult(failed.remove(0), false);
- }
- }
- }
-
- class ProducerRunnable implements Runnable {
- private final HashSet<TopicPartition> partitions;
- private final Throttle throttle;
-
- ProducerRunnable(HashSet<TopicPartition> partitions) {
- this.partitions = partitions;
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
spec.bootstrapServers());
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
- props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 4 * 16 * 1024L);
- props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000L);
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer." + id);
- props.put(ProducerConfig.ACKS_CONFIG, "all");
- props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
- // user may over-write the defaults with common client config and
producer config
- WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(),
spec.producerConf());
- producer = new KafkaProducer<>(props, new ByteArraySerializer(),
- new ByteArraySerializer());
- int perPeriod = WorkerUtils.
- perSecToPerPeriod(spec.targetMessagesPerSec(),
THROTTLE_PERIOD_MS);
- this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
- }
-
- @Override
- public void run() {
- long messagesSent = 0;
- long uniqueMessagesSent = 0;
- log.debug("{}: Starting RoundTripWorker#ProducerRunnable.", id);
- try {
- Iterator<TopicPartition> iter = partitions.iterator();
- while (true) {
- final ToSendTrackerResult result = toSendTracker.next();
- if (result == null) {
- break;
- }
- throttle.increment();
- final long messageIndex = result.index;
- if (result.firstSend) {
- toReceiveTracker.addPending(messageIndex);
- uniqueMessagesSent++;
- }
- messagesSent++;
- if (!iter.hasNext()) {
- iter = partitions.iterator();
- }
- TopicPartition partition = iter.next();
- // we explicitly specify generator position based on
message index
- ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(partition.topic(),
- partition.partition(),
KEY_GENERATOR.generate(messageIndex),
- spec.valueGenerator().generate(messageIndex));
- producer.send(record, (metadata, exception) -> {
- if (exception == null) {
- lock.lock();
- try {
- unackedSends -= 1;
- if (unackedSends <= 0)
- unackedSendsAreZero.signalAll();
- } finally {
- lock.unlock();
- }
- } else {
- log.info("{}: Got exception when sending message
{}: {}",
- id, messageIndex, exception.getMessage());
- toSendTracker.addFailed(messageIndex);
- }
- });
- }
- } catch (Throwable e) {
- WorkerUtils.abort(log, "ProducerRunnable", e, doneFuture);
- } finally {
- lock.lock();
- try {
- log.info("{}: ProducerRunnable is exiting.
messagesSent={}; uniqueMessagesSent={}; " +
- "ackedSends={}/{}.", id, messagesSent,
uniqueMessagesSent,
- spec.maxMessages() - unackedSends,
spec.maxMessages());
- } finally {
- lock.unlock();
- }
- }
- }
- }
-
- private class ToReceiveTracker {
- private final TreeSet<Long> pending = new TreeSet<>();
-
- private long totalReceived = 0;
-
- synchronized void addPending(long messageIndex) {
- pending.add(messageIndex);
- }
-
- synchronized boolean removePending(long messageIndex) {
- if (pending.remove(messageIndex)) {
- totalReceived++;
- return true;
- } else {
- return false;
- }
- }
-
- synchronized long totalReceived() {
- return totalReceived;
- }
-
- void log() {
- long numToReceive;
- List<Long> list = new ArrayList<>(LOG_NUM_MESSAGES);
- synchronized (this) {
- numToReceive = pending.size();
- for (Iterator<Long> iter = pending.iterator();
- iter.hasNext() && (list.size() < LOG_NUM_MESSAGES); ) {
- Long i = iter.next();
- list.add(i);
- }
- }
- log.info("{}: consumer waiting for {} message(s), starting with:
{}",
- id, numToReceive,
list.stream().map(Object::toString).collect(Collectors.joining(", ")));
- }
- }
-
- class ConsumerRunnable implements Runnable {
-
- ConsumerRunnable(HashSet<TopicPartition> partitions) {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
spec.bootstrapServers());
- props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
- props.put(ConsumerConfig.GROUP_ID_CONFIG,
"round-trip-consumer-group-" + id);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
- props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
- // user may over-write the defaults with common client config and
consumer config
- WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(),
spec.consumerConf());
- consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
+ public void initializeConsumer(HashSet<TopicPartition> partitions) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
spec.bootstrapServers());
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+ // user may over-write the defaults with common client config and
consumer config
+ WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(),
spec.consumerConf());
+
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-consumer-group-"
+ id);
+ consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
new ByteArrayDeserializer());
- consumer.assign(partitions);
- }
-
- @Override
- public void run() {
- long uniqueMessagesReceived = 0;
- long messagesReceived = 0;
- long pollInvoked = 0;
- log.debug("{}: Starting RoundTripWorker#ConsumerRunnable.", id);
- try {
- long lastLogTimeMs = Time.SYSTEM.milliseconds();
- while (true) {
- try {
- pollInvoked++;
- ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(50));
- for (ConsumerRecord<byte[], byte[]> record : records) {
- int messageIndex =
ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();
- messagesReceived++;
- if (toReceiveTracker.removePending(messageIndex)) {
- uniqueMessagesReceived++;
- if (uniqueMessagesReceived >=
spec.maxMessages()) {
- lock.lock();
- try {
- log.info("{}: Consumer received the
full count of {} unique messages. " +
- "Waiting for all {} sends to
be acked...", id, spec.maxMessages(), unackedSends);
- while (unackedSends > 0)
- unackedSendsAreZero.await();
- } finally {
- lock.unlock();
- }
-
- log.info("{}: all sends have been acked.",
id);
- new StatusUpdater().update();
- doneFuture.complete("");
- return;
- }
- }
- }
- long curTimeMs = Time.SYSTEM.milliseconds();
- if (curTimeMs > lastLogTimeMs + LOG_INTERVAL_MS) {
- toReceiveTracker.log();
- lastLogTimeMs = curTimeMs;
- }
- } catch (WakeupException e) {
- log.debug("{}: Consumer got WakeupException", id, e);
- } catch (TimeoutException e) {
- log.debug("{}: Consumer got TimeoutException", id, e);
- }
- }
- } catch (Throwable e) {
- WorkerUtils.abort(log, "ConsumerRunnable", e, doneFuture);
- } finally {
- log.info("{}: ConsumerRunnable is exiting. Invoked poll {}
time(s). " +
- "messagesReceived = {}; uniqueMessagesReceived = {}.",
- id, pollInvoked, messagesReceived, uniqueMessagesReceived);
- }
- }
+ consumer.assign(partitions);
}
- public class StatusUpdater implements Runnable {
- @Override
- public void run() {
- try {
- update();
- } catch (Exception e) {
- WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
- }
- }
-
- StatusData update() {
- StatusData statusData =
- new StatusData(toSendTracker.frontier(),
toReceiveTracker.totalReceived());
- status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
- return statusData;
- }
- }
-
- public static class StatusData {
- private final long totalUniqueSent;
- private final long totalReceived;
-
- @JsonCreator
- public StatusData(@JsonProperty("totalUniqueSent") long
totalUniqueSent,
- @JsonProperty("totalReceived") long totalReceived) {
- this.totalUniqueSent = totalUniqueSent;
- this.totalReceived = totalReceived;
- }
-
- @JsonProperty
- public long totalUniqueSent() {
- return totalUniqueSent;
- }
-
- @JsonProperty
- public long totalReceived() {
- return totalReceived;
- }
+ @Override
+ protected ConsumerRecords<byte[], byte[]> fetchRecords(Duration duration) {
+ return consumer.poll(duration);
}
@Override
- public void stop(Platform platform) throws Exception {
- if (!running.compareAndSet(true, false)) {
- throw new IllegalStateException("RoundTripWorker is not running.");
- }
- log.info("{}: Deactivating RoundTripWorker.", id);
- doneFuture.complete("");
- executor.shutdownNow();
- executor.awaitTermination(1, TimeUnit.DAYS);
+ protected void shutdownConsumer() {
Utils.closeQuietly(consumer, "consumer");
- Utils.closeQuietly(producer, "producer");
- this.consumer = null;
- this.producer = null;
- this.unackedSends = null;
- this.executor = null;
- this.doneFuture = null;
- log.info("{}: Deactivated RoundTripWorker.", id);
+ consumer = null;
}
}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkerBase.java
similarity index 91%
copy from
trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
copy to
trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkerBase.java
index 385e14162ee..7adc1287d71 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkerBase.java
@@ -18,10 +18,8 @@
package org.apache.kafka.trogdor.workload;
import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -30,7 +28,6 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
@@ -69,22 +66,31 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
-public class RoundTripWorker implements TaskWorker {
+/**
+ * A base class for a round-trip worker which will produce and consume equal
number of messages.
+ *
+ * This is used to create a round-trip trogdor agent which will spawn
producers and consumers to
+ * produce and consume equal number of messages based on the workload it is
executing.
+ *
+ * Currently, there are 2 subclasses, one which uses {@link
org.apache.kafka.clients.consumer.KafkaConsumer}
+ * and another which uses {@link
org.apache.kafka.clients.consumer.KafkaShareConsumer} as the consumer.
+ */
+public abstract class RoundTripWorkerBase implements TaskWorker {
private static final int THROTTLE_PERIOD_MS = 100;
private static final int LOG_INTERVAL_MS = 5000;
private static final int LOG_NUM_MESSAGES = 10;
- private static final Logger log =
LoggerFactory.getLogger(RoundTripWorker.class);
+ private static final Logger log =
LoggerFactory.getLogger(RoundTripWorkerBase.class);
private static final PayloadGenerator KEY_GENERATOR = new
SequentialPayloadGenerator(4, 0);
private ToReceiveTracker toReceiveTracker;
- private final String id;
+ protected String id;
- private final RoundTripWorkloadSpec spec;
+ protected RoundTripWorkloadSpec spec;
private final AtomicBoolean running = new AtomicBoolean(false);
@@ -100,17 +106,10 @@ public class RoundTripWorker implements TaskWorker {
private KafkaProducer<byte[], byte[]> producer;
- private KafkaConsumer<byte[], byte[]> consumer;
-
private Long unackedSends;
private ToSendTracker toSendTracker;
- public RoundTripWorker(String id, RoundTripWorkloadSpec spec) {
- this.id = id;
- this.spec = spec;
- }
-
@Override
public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> doneFuture) throws Exception {
@@ -123,7 +122,6 @@ public class RoundTripWorker implements TaskWorker {
this.status = status;
this.doneFuture = doneFuture;
this.producer = null;
- this.consumer = null;
this.unackedSends = spec.maxMessages();
executor.submit(new Prepare());
}
@@ -329,18 +327,7 @@ public class RoundTripWorker implements TaskWorker {
class ConsumerRunnable implements Runnable {
ConsumerRunnable(HashSet<TopicPartition> partitions) {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
spec.bootstrapServers());
- props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
- props.put(ConsumerConfig.GROUP_ID_CONFIG,
"round-trip-consumer-group-" + id);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
- props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
- // user may over-write the defaults with common client config and
consumer config
- WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(),
spec.consumerConf());
- consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
- new ByteArrayDeserializer());
- consumer.assign(partitions);
+ initializeConsumer(partitions);
}
@Override
@@ -354,7 +341,7 @@ public class RoundTripWorker implements TaskWorker {
while (true) {
try {
pollInvoked++;
- ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(50));
+ ConsumerRecords<byte[], byte[]> records =
fetchRecords(Duration.ofMillis(50));
for (ConsumerRecord<byte[], byte[]> record : records) {
int messageIndex =
ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();
messagesReceived++;
@@ -439,6 +426,22 @@ public class RoundTripWorker implements TaskWorker {
}
}
+ /**
+ * Initialize the consumer.
+ */
+ protected abstract void initializeConsumer(HashSet<TopicPartition>
partitions);
+
+ /**
+ *
+ * Invoke poll from the consumer and return the records fetched.
+ */
+ protected abstract ConsumerRecords<byte[], byte[]> fetchRecords(Duration
duration);
+
+ /**
+ * Close the consumer.
+ */
+ protected abstract void shutdownConsumer();
+
@Override
public void stop(Platform platform) throws Exception {
if (!running.compareAndSet(true, false)) {
@@ -448,9 +451,8 @@ public class RoundTripWorker implements TaskWorker {
doneFuture.complete("");
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.DAYS);
- Utils.closeQuietly(consumer, "consumer");
+ shutdownConsumer();
Utils.closeQuietly(producer, "producer");
- this.consumer = null;
this.producer = null;
this.unackedSends = null;
this.executor = null;
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 223b0c0e20c..67fec60990b 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -31,7 +31,7 @@ import java.util.Map;
* The specification for a workload that sends messages to a broker and then
* reads them back.
*/
-public final class RoundTripWorkloadSpec extends TaskSpec {
+public class RoundTripWorkloadSpec extends TaskSpec {
private final String clientNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
new file mode 100644
index 00000000000..2ce028243a2
--- /dev/null
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.WorkerUtils;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Properties;
+
+
+public class ShareRoundTripWorker extends RoundTripWorkerBase {
+ KafkaShareConsumer<byte[], byte[]> consumer;
+
+ ShareRoundTripWorker(String id, RoundTripWorkloadSpec spec) {
+ this.id = id;
+ this.spec = spec;
+ }
+
+ @Override
+ public void initializeConsumer(HashSet<TopicPartition> partitions) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
spec.bootstrapServers());
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+ // user may over-write the defaults with common client config and
consumer config
+ WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(),
spec.consumerConf());
+
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-share-group-" +
id);
+ consumer = new KafkaShareConsumer<>(props, new ByteArrayDeserializer(),
+ new ByteArrayDeserializer());
+ consumer.subscribe(spec.activeTopics().materialize().keySet());
+ }
+
+ @Override
+ protected ConsumerRecords<byte[], byte[]> fetchRecords(Duration duration) {
+ return consumer.poll(duration);
+ }
+
+ @Override
+ protected void shutdownConsumer() {
+ Utils.closeQuietly(consumer, "consumer");
+ consumer = null;
+ }
+}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorkloadSpec.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorkloadSpec.java
new file mode 100644
index 00000000000..3ea5e0938bb
--- /dev/null
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorkloadSpec.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+public class ShareRoundTripWorkloadSpec extends RoundTripWorkloadSpec {
+
+ @JsonCreator
+ public ShareRoundTripWorkloadSpec(@JsonProperty("startMs") long startMs,
+ @JsonProperty("durationMs") long durationMs,
+ @JsonProperty("clientNode") String clientNode,
+ @JsonProperty("bootstrapServers") String bootstrapServers,
+ @JsonProperty("commonClientConf") Map<String, String>
commonClientConf,
+ @JsonProperty("adminClientConf") Map<String, String>
adminClientConf,
+ @JsonProperty("consumerConf") Map<String, String> consumerConf,
+ @JsonProperty("producerConf") Map<String, String> producerConf,
+ @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
+ @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
+ @JsonProperty("activeTopics") TopicsSpec activeTopics,
+ @JsonProperty("maxMessages") long maxMessages) {
+ super(startMs, durationMs, clientNode, bootstrapServers,
commonClientConf, adminClientConf, consumerConf, producerConf,
targetMessagesPerSec, valueGenerator, activeTopics, maxMessages);
+ }
+
+ @Override
+ public TaskWorker newTaskWorker(String id) {
+ return new ShareRoundTripWorker(id, this);
+ }
+}