[ 
https://issues.apache.org/jira/browse/KAFKA-6771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439088#comment-16439088
 ] 

ASF GitHub Bot commented on KAFKA-6771:
---------------------------------------

rajinisivaram closed pull request #4850: KAFKA-6771. Make specifying partitions 
more flexible
URL: https://github.com/apache/kafka/pull/4850
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java 
b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java
new file mode 100644
index 00000000000..82f5003fbb6
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import java.util.HashSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Utilities for expanding strings that have range expressions in them.
+ *
+ * For example, 'foo[1-3]' would be expaneded to foo1, foo2, foo3.
+ * Strings that have no range expressions will not be expanded.
+ */
+public class StringExpander {
+    private final static Pattern NUMERIC_RANGE_PATTERN =
+        Pattern.compile("(.*?)\\[([0-9]*)\\-([0-9]*)\\](.*?)");
+
+    public static HashSet<String> expand(String val) {
+        HashSet<String> set = new HashSet<>();
+        Matcher matcher = NUMERIC_RANGE_PATTERN.matcher(val);
+        if (!matcher.matches()) {
+            set.add(val);
+            return set;
+        }
+        String prequel = matcher.group(1);
+        String rangeStart = matcher.group(2);
+        String rangeEnd = matcher.group(3);
+        String epilog = matcher.group(4);
+        int rangeStartInt = Integer.parseInt(rangeStart);
+        int rangeEndInt = Integer.parseInt(rangeEnd);
+        if (rangeEndInt < rangeStartInt) {
+            throw new RuntimeException("Invalid range: start " + rangeStartInt 
+
+                    " is higher than end " + rangeEndInt);
+        }
+        for (int i = rangeStartInt; i <= rangeEndInt; i++) {
+            set.add(String.format("%s%d%s", prequel, i, epilog));
+        }
+        return set;
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 0677296ee3c..f2eb0343aa8 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -132,7 +132,7 @@ public StopTaskResponse stopTask(StopTaskRequest request) 
throws Exception {
 
     public TasksResponse tasks(TasksRequest request) throws Exception {
         UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks"));
-        uriBuilder.queryParam("taskId", request.taskIds().toArray(new 
String[0]));
+        uriBuilder.queryParam("taskId", (Object[]) 
request.taskIds().toArray(new String[0]));
         uriBuilder.queryParam("firstStartMs", request.firstStartMs());
         uriBuilder.queryParam("lastStartMs", request.lastStartMs());
         uriBuilder.queryParam("firstEndMs", request.firstEndMs());
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index cef913bc01a..1b429ead3c8 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -41,10 +41,7 @@
     private final Map<String, String> consumerConf;
     private final Map<String, String> adminClientConf;
     private final Map<String, String> commonClientConf;
-    private final String topicRegex;
-    private final int startPartition;
-    private final int endPartition;
-
+    private final TopicsSpec activeTopics;
 
     @JsonCreator
     public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@@ -56,9 +53,7 @@ public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
                             @JsonProperty("consumerConf") Map<String, String> 
consumerConf,
                             @JsonProperty("commonClientConf") Map<String, 
String> commonClientConf,
                             @JsonProperty("adminClientConf") Map<String, 
String> adminClientConf,
-                            @JsonProperty("topicRegex") String topicRegex,
-                            @JsonProperty("startPartition") int startPartition,
-                            @JsonProperty("endPartition") int endPartition) {
+                            @JsonProperty("activeTopics") TopicsSpec 
activeTopics) {
         super(startMs, durationMs);
         this.consumerNode = (consumerNode == null) ? "" : consumerNode;
         this.bootstrapServers = (bootstrapServers == null) ? "" : 
bootstrapServers;
@@ -67,9 +62,7 @@ public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
         this.consumerConf = configOrEmptyMap(consumerConf);
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.adminClientConf = configOrEmptyMap(adminClientConf);
-        this.topicRegex = topicRegex;
-        this.startPartition = startPartition;
-        this.endPartition = endPartition;
+        this.activeTopics = activeTopics == null ? TopicsSpec.EMPTY : 
activeTopics.immutableCopy();
     }
 
     @JsonProperty
@@ -108,18 +101,8 @@ public int maxMessages() {
     }
 
     @JsonProperty
-    public String topicRegex() {
-        return topicRegex;
-    }
-
-    @JsonProperty
-    public int startPartition() {
-        return startPartition;
-    }
-
-    @JsonProperty
-    public int endPartition() {
-        return endPartition;
+    public TopicsSpec activeTopics() {
+        return activeTopics;
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
index 5c74d906601..1a852964070 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -24,7 +24,6 @@
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Time;
@@ -40,6 +39,8 @@
 import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -84,19 +85,14 @@ public void start(Platform platform, WorkerStatusTracker 
status,
         @Override
         public void run() {
             try {
-                // find topics to consume from based on provided topic regular 
expression
-                if (spec.topicRegex() == null) {
-                    throw new ConfigException(
-                        "Must provide topic name or regular expression to 
match existing topics.");
+                HashSet<TopicPartition> partitions = new HashSet<>();
+                for (Map.Entry<String, PartitionsSpec> entry : 
spec.activeTopics().materialize().entrySet()) {
+                    for (Integer partitionNumber : 
entry.getValue().partitionNumbers()) {
+                        partitions.add(new TopicPartition(entry.getKey(), 
partitionNumber));
+                    }
                 }
-                Collection<TopicPartition> topicPartitions =
-                    WorkerUtils.getMatchingTopicPartitions(
-                        log, spec.bootstrapServers(),
-                        spec.commonClientConf(), spec.adminClientConf(),
-                        spec.topicRegex(), spec.startPartition(), 
spec.endPartition());
-                log.info("Will consume from {}", topicPartitions);
-
-                executor.submit(new ConsumeMessages(topicPartitions));
+                log.info("Will consume from {}", partitions);
+                executor.submit(new ConsumeMessages(partitions));
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
             }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java
new file mode 100644
index 00000000000..75f85c479db
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.trogdor.rest.Message;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Describes some partitions.
+ */
+public class PartitionsSpec extends Message {
+    private final static short DEFAULT_REPLICATION_FACTOR = 3;
+    private final static short DEFAULT_NUM_PARTITIONS = 1;
+
+    private final int numPartitions;
+    private final short replicationFactor;
+    private final Map<Integer, List<Integer>> partitionAssignments;
+
+    @JsonCreator
+    public PartitionsSpec(@JsonProperty("numPartitions") int numPartitions,
+            @JsonProperty("replicationFactor") short replicationFactor,
+            @JsonProperty("partitionAssignments") Map<Integer, List<Integer>> 
partitionAssignments) {
+        this.numPartitions = numPartitions;
+        this.replicationFactor = replicationFactor;
+        this.partitionAssignments = partitionAssignments == null ?
+            new HashMap<Integer, List<Integer>>() : partitionAssignments;
+    }
+
+    @JsonProperty
+    public int numPartitions() {
+        return numPartitions;
+    }
+
+    public List<Integer> partitionNumbers() {
+        if (partitionAssignments.isEmpty()) {
+            ArrayList<Integer> partitionNumbers = new ArrayList<>();
+            int effectiveNumPartitions = numPartitions <= 0 ? 
DEFAULT_NUM_PARTITIONS : numPartitions;
+            for (int i = 0; i < effectiveNumPartitions; i++) {
+                partitionNumbers.add(i);
+            }
+            return partitionNumbers;
+        } else {
+            return new ArrayList<>(partitionAssignments.keySet());
+        }
+    }
+
+    @JsonProperty
+    public short replicationFactor() {
+        return replicationFactor;
+    }
+
+    @JsonProperty
+    public Map<Integer, List<Integer>> partitionAssignmentsap() {
+        return partitionAssignments;
+    }
+
+    public NewTopic newTopic(String topicName) {
+        if (partitionAssignments.isEmpty()) {
+            int effectiveNumPartitions = numPartitions <= 0 ?
+                DEFAULT_NUM_PARTITIONS : numPartitions;
+            short effectiveReplicationFactor = replicationFactor <= 0 ?
+                DEFAULT_REPLICATION_FACTOR : replicationFactor;
+            return new NewTopic(topicName, effectiveNumPartitions, 
effectiveReplicationFactor);
+        } else {
+            return new NewTopic(topicName, partitionAssignments);
+        }
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index ec6e3096469..30878bf303f 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -32,11 +32,6 @@
  * The specification for a benchmark that produces messages to a set of topics.
  */
 public class ProduceBenchSpec extends TaskSpec {
-
-    private static final String DEFAULT_TOPIC_PREFIX = "produceBenchTopic";
-    private static final int DEFAULT_NUM_PARTITIONS = 1;
-    private static final short DEFAULT_REPLICATION_FACTOR = 3;
-
     private final String producerNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
@@ -46,11 +41,8 @@
     private final Map<String, String> producerConf;
     private final Map<String, String> adminClientConf;
     private final Map<String, String> commonClientConf;
-    private final int totalTopics;
-    private final int activeTopics;
-    private final String topicPrefix;
-    private final int numPartitions;
-    private final short replicationFactor;
+    private final TopicsSpec activeTopics;
+    private final TopicsSpec inactiveTopics;
 
     @JsonCreator
     public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
@@ -64,11 +56,8 @@ public ProduceBenchSpec(@JsonProperty("startMs") long 
startMs,
                          @JsonProperty("producerConf") Map<String, String> 
producerConf,
                          @JsonProperty("commonClientConf") Map<String, String> 
commonClientConf,
                          @JsonProperty("adminClientConf") Map<String, String> 
adminClientConf,
-                         @JsonProperty("totalTopics") int totalTopics,
-                         @JsonProperty("activeTopics") int activeTopics,
-                         @JsonProperty("topicPrefix") String topicPrefix,
-                         @JsonProperty("partitionsPerTopic") int 
partitionsPerTopic,
-                         @JsonProperty("replicationFactor") short 
replicationFactor) {
+                         @JsonProperty("activeTopics") TopicsSpec activeTopics,
+                         @JsonProperty("inactiveTopics") TopicsSpec 
inactiveTopics) {
         super(startMs, durationMs);
         this.producerNode = (producerNode == null) ? "" : producerNode;
         this.bootstrapServers = (bootstrapServers == null) ? "" : 
bootstrapServers;
@@ -81,13 +70,10 @@ public ProduceBenchSpec(@JsonProperty("startMs") long 
startMs,
         this.producerConf = configOrEmptyMap(producerConf);
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.adminClientConf = configOrEmptyMap(adminClientConf);
-        this.totalTopics = totalTopics;
-        this.activeTopics = activeTopics;
-        this.topicPrefix = (topicPrefix == null) ? DEFAULT_TOPIC_PREFIX : 
topicPrefix;
-        this.numPartitions = (partitionsPerTopic == 0)
-                             ? DEFAULT_NUM_PARTITIONS : partitionsPerTopic;
-        this.replicationFactor = (replicationFactor == 0)
-                                 ? DEFAULT_REPLICATION_FACTOR : 
replicationFactor;
+        this.activeTopics = (activeTopics == null) ?
+            TopicsSpec.EMPTY : activeTopics.immutableCopy();
+        this.inactiveTopics = (inactiveTopics == null) ?
+            TopicsSpec.EMPTY : inactiveTopics.immutableCopy();
     }
 
     @JsonProperty
@@ -136,28 +122,13 @@ public PayloadGenerator valueGenerator() {
     }
 
     @JsonProperty
-    public int totalTopics() {
-        return totalTopics;
-    }
-
-    @JsonProperty
-    public int activeTopics() {
+    public TopicsSpec activeTopics() {
         return activeTopics;
     }
 
     @JsonProperty
-    public String topicPrefix() {
-        return topicPrefix;
-    }
-
-    @JsonProperty
-    public int numPartitions() {
-        return numPartitions;
-    }
-
-    @JsonProperty
-    public short replicationFactor() {
-        return replicationFactor;
+    public TopicsSpec inactiveTopics() {
+        return inactiveTopics;
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index 4c3095f0fbd..dc749eb65a4 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -26,7 +26,7 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Time;
@@ -40,6 +40,8 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
@@ -66,16 +68,6 @@
 
     private KafkaFutureImpl<String> doneFuture;
 
-    /**
-     * Generate a topic name based on a topic number.
-     *
-     * @param topicIndex        The topic number.
-     * @return                  The topic name.
-     */
-    public String topicIndexToName(int topicIndex) {
-        return String.format("%s%05d", spec.topicPrefix(), topicIndex);
-    }
-
     public ProduceBenchWorker(String id, ProduceBenchSpec spec) {
         this.id = id;
         this.spec = spec;
@@ -88,7 +80,9 @@ public void start(Platform platform, WorkerStatusTracker 
status,
             throw new IllegalStateException("ProducerBenchWorker is already 
running.");
         }
         log.info("{}: Activating ProduceBenchWorker with {}", id, spec);
-        this.executor = Executors.newScheduledThreadPool(1,
+        // Create an executor with 2 threads.  We need the second thread so
+        // that the StatusUpdater can run in parallel with SendRecords.
+        this.executor = Executors.newScheduledThreadPool(2,
             ThreadUtils.createThreadFactory("ProduceBenchWorkerThread%d", 
false));
         this.status = status;
         this.doneFuture = doneFuture;
@@ -99,25 +93,31 @@ public void start(Platform platform, WorkerStatusTracker 
status,
         @Override
         public void run() {
             try {
-                if (spec.activeTopics() == 0) {
-                    throw new ConfigException("Can't have activeTopics == 0.");
+                Map<String, NewTopic> newTopics = new HashMap<>();
+                HashSet<TopicPartition> active = new HashSet<>();
+                for (Map.Entry<String, PartitionsSpec> entry :
+                        spec.activeTopics().materialize().entrySet()) {
+                    String topicName = entry.getKey();
+                    PartitionsSpec partSpec = entry.getValue();
+                    newTopics.put(topicName, partSpec.newTopic(topicName));
+                    for (Integer partitionNumber : 
partSpec.partitionNumbers()) {
+                        active.add(new TopicPartition(topicName, 
partitionNumber));
+                    }
                 }
-                if (spec.totalTopics() < spec.activeTopics()) {
-                    throw new ConfigException(String.format(
-                        "activeTopics was %d, but totalTopics was only %d.  
activeTopics must " +
-                            "be less than or equal to totalTopics.", 
spec.activeTopics(), spec.totalTopics()));
+                if (active.isEmpty()) {
+                    throw new RuntimeException("You must specify at least one 
active topic.");
                 }
-                Map<String, NewTopic> newTopics = new HashMap<>();
-                for (int i = 0; i < spec.totalTopics(); i++) {
-                    String name = topicIndexToName(i);
-                    newTopics.put(name, new NewTopic(name, 
spec.numPartitions(),
-                                                     
spec.replicationFactor()));
+                for (Map.Entry<String, PartitionsSpec> entry :
+                        spec.inactiveTopics().materialize().entrySet()) {
+                    String topicName = entry.getKey();
+                    PartitionsSpec partSpec = entry.getValue();
+                    newTopics.put(topicName, partSpec.newTopic(topicName));
                 }
-                status.update(new TextNode("Creating " + spec.totalTopics() + 
" topic(s)"));
+                status.update(new TextNode("Creating " + 
newTopics.keySet().size() + " topic(s)"));
                 WorkerUtils.createTopics(log, spec.bootstrapServers(), 
spec.commonClientConf(),
                                          spec.adminClientConf(), newTopics, 
false);
-                status.update(new TextNode("Created " + spec.totalTopics() + " 
topic(s)"));
-                executor.submit(new SendRecords());
+                status.update(new TextNode("Created " + 
newTopics.keySet().size() + " topic(s)"));
+                executor.submit(new SendRecords(active));
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
             }
@@ -167,6 +167,8 @@ protected synchronized void delay(long amount) throws 
InterruptedException {
     }
 
     public class SendRecords implements Callable<Void> {
+        private final HashSet<TopicPartition> activePartitions;
+
         private final Histogram histogram;
 
         private final Future<?> statusUpdaterFuture;
@@ -179,7 +181,8 @@ protected synchronized void delay(long amount) throws 
InterruptedException {
 
         private final Throttle throttle;
 
-        SendRecords() {
+        SendRecords(HashSet<TopicPartition> activePartitions) {
+            this.activePartitions = activePartitions;
             this.histogram = new Histogram(5000);
             int perPeriod = 
WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
@@ -201,13 +204,16 @@ public Void call() throws Exception {
             try {
                 Future<RecordMetadata> future = null;
                 try {
+                    Iterator<TopicPartition> iter = 
activePartitions.iterator();
                     for (int m = 0; m < spec.maxMessages(); m++) {
-                        for (int i = 0; i < spec.activeTopics(); i++) {
-                            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<byte[], byte[]>(
-                                topicIndexToName(i), 0, keys.next(), 
values.next());
-                            future = producer.send(record,
-                                new SendRecordsCallback(this, 
Time.SYSTEM.milliseconds()));
+                        if (!iter.hasNext()) {
+                            iter = activePartitions.iterator();
                         }
+                        TopicPartition partition = iter.next();
+                        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(
+                            partition.topic(), partition.partition(), 
keys.next(), values.next());
+                        future = producer.send(record,
+                            new SendRecordsCallback(this, 
Time.SYSTEM.milliseconds()));
                         throttle.increment();
                     }
                 } finally {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 12b0c08a700..570f6a11e34 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.trogdor.workload;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -27,6 +30,7 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
@@ -35,6 +39,7 @@
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.common.WorkerUtils;
@@ -46,33 +51,31 @@
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class RoundTripWorker implements TaskWorker {
     private static final int THROTTLE_PERIOD_MS = 100;
 
-    private static final int MESSAGE_SIZE = 512;
-
     private static final int LOG_INTERVAL_MS = 5000;
 
     private static final int LOG_NUM_MESSAGES = 10;
 
-    private static final String TOPIC_NAME = "round_trip_topic";
-
     private static final Logger log = 
LoggerFactory.getLogger(RoundTripWorker.class);
 
     private static final PayloadGenerator KEY_GENERATOR = new 
SequentialPayloadGenerator(4, 0);
 
-    private final ToReceiveTracker toReceiveTracker = new ToReceiveTracker();
+    private ToReceiveTracker toReceiveTracker;
 
     private final String id;
 
@@ -80,18 +83,20 @@
 
     private final AtomicBoolean running = new AtomicBoolean(false);
 
-    private ExecutorService executor;
+    private ScheduledExecutorService executor;
+
+    private WorkerStatusTracker status;
 
     private KafkaFutureImpl<String> doneFuture;
 
     private KafkaProducer<byte[], byte[]> producer;
 
-    private PayloadGenerator payloadGenerator;
-
     private KafkaConsumer<byte[], byte[]> consumer;
 
     private CountDownLatch unackedSends;
 
+    private ToSendTracker toSendTracker;
+
     public RoundTripWorker(String id, RoundTripWorkloadSpec spec) {
         this.id = id;
         this.spec = spec;
@@ -104,8 +109,9 @@ public void start(Platform platform, WorkerStatusTracker 
status,
             throw new IllegalStateException("RoundTripWorker is already 
running.");
         }
         log.info("{}: Activating RoundTripWorker.", id);
-        this.executor = Executors.newCachedThreadPool(
+        this.executor = Executors.newScheduledThreadPool(3,
             ThreadUtils.createThreadFactory("RoundTripWorker%d", false));
+        this.status = status;
         this.doneFuture = doneFuture;
         this.producer = null;
         this.consumer = null;
@@ -120,16 +126,31 @@ public void run() {
                 if (spec.targetMessagesPerSec() <= 0) {
                     throw new ConfigException("Can't have targetMessagesPerSec 
<= 0.");
                 }
-                if ((spec.partitionAssignments() == null) || 
spec.partitionAssignments().isEmpty()) {
-                    throw new ConfigException("Invalid null or empty 
partitionAssignments.");
+                Map<String, NewTopic> newTopics = new HashMap<>();
+                HashSet<TopicPartition> active = new HashSet<>();
+                for (Map.Entry<String, PartitionsSpec> entry :
+                    spec.activeTopics().materialize().entrySet()) {
+                    String topicName = entry.getKey();
+                    PartitionsSpec partSpec = entry.getValue();
+                    newTopics.put(topicName, partSpec.newTopic(topicName));
+                    for (Integer partitionNumber : 
partSpec.partitionNumbers()) {
+                        active.add(new TopicPartition(topicName, 
partitionNumber));
+                    }
+                }
+                if (active.isEmpty()) {
+                    throw new RuntimeException("You must specify at least one 
active topic.");
                 }
-                WorkerUtils.createTopics(
-                    log, spec.bootstrapServers(), spec.commonClientConf(), 
spec.adminClientConf(),
-                    Collections.singletonMap(TOPIC_NAME,
-                                             new NewTopic(TOPIC_NAME, 
spec.partitionAssignments())),
-                    true);
-                executor.submit(new ProducerRunnable());
-                executor.submit(new ConsumerRunnable());
+                status.update(new TextNode("Creating " + 
newTopics.keySet().size() + " topic(s)"));
+                WorkerUtils.createTopics(log, spec.bootstrapServers(), 
spec.commonClientConf(),
+                    spec.adminClientConf(), newTopics, true);
+                status.update(new TextNode("Created " + 
newTopics.keySet().size() + " topic(s)"));
+                toSendTracker = new ToSendTracker(spec.maxMessages());
+                toReceiveTracker = new ToReceiveTracker();
+                executor.submit(new ProducerRunnable(active));
+                executor.submit(new ConsumerRunnable(active));
+                executor.submit(new StatusUpdater());
+                executor.scheduleWithFixedDelay(
+                    new StatusUpdater(), 30, 30, TimeUnit.SECONDS);
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
             }
@@ -159,6 +180,10 @@ synchronized void addFailed(int index) {
             failed.add(index);
         }
 
+        synchronized int frontier() {
+            return frontier;
+        }
+
         synchronized ToSendTrackerResult next() {
             if (failed.isEmpty()) {
                 if (frontier >= maxMessages) {
@@ -173,9 +198,11 @@ synchronized ToSendTrackerResult next() {
     }
 
     class ProducerRunnable implements Runnable {
+        private final HashSet<TopicPartition> partitions;
         private final Throttle throttle;
 
-        ProducerRunnable() {
+        ProducerRunnable(HashSet<TopicPartition> partitions) {
+            this.partitions = partitions;
             Properties props = new Properties();
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
             props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
@@ -195,11 +222,11 @@ synchronized ToSendTrackerResult next() {
 
         @Override
         public void run() {
-            final ToSendTracker toSendTracker = new 
ToSendTracker(spec.maxMessages());
             long messagesSent = 0;
             long uniqueMessagesSent = 0;
             log.debug("{}: Starting RoundTripWorker#ProducerRunnable.", id);
             try {
+                Iterator<TopicPartition> iter = partitions.iterator();
                 while (true) {
                     final ToSendTrackerResult result = toSendTracker.next();
                     if (result == null) {
@@ -212,9 +239,13 @@ public void run() {
                         uniqueMessagesSent++;
                     }
                     messagesSent++;
+                    if (!iter.hasNext()) {
+                        iter = partitions.iterator();
+                    }
+                    TopicPartition partition = iter.next();
                     // we explicitly specify generator position based on 
message index
-                    ProducerRecord<byte[], byte[]> record = new 
ProducerRecord(TOPIC_NAME, 0,
-                        KEY_GENERATOR.generate(messageIndex),
+                    ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(partition.topic(),
+                        partition.partition(), 
KEY_GENERATOR.generate(messageIndex),
                         spec.valueGenerator().generate(messageIndex));
                     producer.send(record, new Callback() {
                         @Override
@@ -242,12 +273,23 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
     private class ToReceiveTracker {
         private final TreeSet<Integer> pending = new TreeSet<>();
 
+        private int totalReceived = 0;
+
         synchronized void addPending(int messageIndex) {
             pending.add(messageIndex);
         }
 
         synchronized boolean removePending(int messageIndex) {
-            return pending.remove(messageIndex);
+            if (pending.remove(messageIndex)) {
+                totalReceived++;
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        synchronized int totalReceived() {
+            return totalReceived;
         }
 
         void log() {
@@ -269,7 +311,7 @@ void log() {
     class ConsumerRunnable implements Runnable {
         private final Properties props;
 
-        ConsumerRunnable() {
+        ConsumerRunnable(HashSet<TopicPartition> partitions) {
             this.props = new Properties();
             props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
             props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
@@ -281,7 +323,7 @@ void log() {
             WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
             consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
-            consumer.subscribe(Collections.singleton(TOPIC_NAME));
+            consumer.assign(partitions);
         }
 
         @Override
@@ -296,7 +338,8 @@ public void run() {
                     try {
                         pollInvoked++;
                         ConsumerRecords<byte[], byte[]> records = 
consumer.poll(50);
-                        for (ConsumerRecord<byte[], byte[]> record : 
records.records(TOPIC_NAME)) {
+                        for (Iterator<ConsumerRecord<byte[], byte[]>> iter = 
records.iterator(); iter.hasNext(); ) {
+                            ConsumerRecord<byte[], byte[]> record = 
iter.next();
                             int messageIndex = 
ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();
                             messagesReceived++;
                             if (toReceiveTracker.removePending(messageIndex)) {
@@ -306,6 +349,7 @@ public void run() {
                                         "Waiting for all sends to be 
acked...", id, spec.maxMessages());
                                     unackedSends.await();
                                     log.info("{}: all sends have been acked.", 
id);
+                                    new StatusUpdater().update();
                                     doneFuture.complete("");
                                     return;
                                 }
@@ -332,6 +376,46 @@ public void run() {
         }
     }
 
+    public class StatusUpdater implements Runnable {
+        @Override
+        public void run() {
+            try {
+                update();
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
+            }
+        }
+
+        StatusData update() {
+            StatusData statusData =
+                new StatusData(toSendTracker.frontier(), 
toReceiveTracker.totalReceived());
+            status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
+            return statusData;
+        }
+    }
+
+    public static class StatusData {
+        private final long totalUniqueSent;
+        private final long totalReceived;
+
+        @JsonCreator
+        public StatusData(@JsonProperty("totalUniqueSent") long 
totalUniqueSent,
+                          @JsonProperty("totalReceived") long totalReceived) {
+            this.totalUniqueSent = totalUniqueSent;
+            this.totalReceived = totalReceived;
+        }
+
+        @JsonProperty
+        public long totalUniqueSent() {
+            return totalUniqueSent;
+        }
+
+        @JsonProperty
+        public long totalReceived() {
+            return totalReceived;
+        }
+    }
+
     @Override
     public void stop(Platform platform) throws Exception {
         if (!running.compareAndSet(true, false)) {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 3d0e3ef2e6b..9522e0a938e 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -25,11 +25,8 @@
 import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Set;
-import java.util.TreeMap;
 
 /**
  * The specification for a workload that sends messages to a broker and then
@@ -39,8 +36,8 @@
     private final String clientNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
-    private final NavigableMap<Integer, List<Integer>> partitionAssignments;
     private final PayloadGenerator valueGenerator;
+    private final TopicsSpec activeTopics;
     private final int maxMessages;
     private final Map<String, String> commonClientConf;
     private final Map<String, String> producerConf;
@@ -57,17 +54,17 @@ public RoundTripWorkloadSpec(@JsonProperty("startMs") long 
startMs,
              @JsonProperty("consumerConf") Map<String, String> consumerConf,
              @JsonProperty("producerConf") Map<String, String> producerConf,
              @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
-             @JsonProperty("partitionAssignments") NavigableMap<Integer, 
List<Integer>> partitionAssignments,
              @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
+             @JsonProperty("activeTopics") TopicsSpec activeTopics,
              @JsonProperty("maxMessages") int maxMessages) {
         super(startMs, durationMs);
         this.clientNode = clientNode == null ? "" : clientNode;
         this.bootstrapServers = bootstrapServers == null ? "" : 
bootstrapServers;
         this.targetMessagesPerSec = targetMessagesPerSec;
-        this.partitionAssignments = partitionAssignments == null ?
-            new TreeMap<Integer, List<Integer>>() : partitionAssignments;
         this.valueGenerator = valueGenerator == null ?
             new UniformRandomPayloadGenerator(32, 123, 10) : valueGenerator;
+        this.activeTopics = activeTopics == null ?
+            TopicsSpec.EMPTY : activeTopics.immutableCopy();
         this.maxMessages = maxMessages;
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.adminClientConf = configOrEmptyMap(adminClientConf);
@@ -91,8 +88,8 @@ public int targetMessagesPerSec() {
     }
 
     @JsonProperty
-    public NavigableMap<Integer, List<Integer>> partitionAssignments() {
-        return partitionAssignments;
+    public TopicsSpec activeTopics() {
+        return activeTopics;
     }
 
     @JsonProperty
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java
new file mode 100644
index 00000000000..a9b550d648c
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.kafka.trogdor.common.StringExpander;
+import org.apache.kafka.trogdor.rest.Message;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TopicsSpec maps topic names to descriptions of the partitions in them.
+ *
+ * In JSON form, this is serialized as a map whose keys are topic names,
+ * and whose entries are partition descriptions.
+ * Keys may also refer to multiple partitions.  For example, this specification
+ * refers to 3 topics foo1, foo2, and foo3:
+ *
+ * {
+ *   "foo[1-3]" : {
+ *      "numPartitions": 3
+ *      "replicationFactor": 3
+ *    }
+ * }
+ */
+public class TopicsSpec extends Message {
+    public static final TopicsSpec EMPTY = new TopicsSpec().immutableCopy();
+
+    private final Map<String, PartitionsSpec> map;
+
+    @JsonCreator
+    public TopicsSpec() {
+        this.map = new HashMap<>();
+    }
+
+    private TopicsSpec(Map<String, PartitionsSpec> map) {
+        this.map = map;
+    }
+
+    @JsonAnyGetter
+    public Map<String, PartitionsSpec> get() {
+        return map;
+    }
+
+    @JsonAnySetter
+    public void set(String name, PartitionsSpec value) {
+        map.put(name, value);
+    }
+
+    public TopicsSpec immutableCopy() {
+        HashMap<String, PartitionsSpec> mapCopy = new HashMap<>();
+        mapCopy.putAll(map);
+        return new TopicsSpec(Collections.unmodifiableMap(mapCopy));
+    }
+
+    /**
+     * Enumerate the partitions inside this TopicsSpec.
+     *
+     * @return      A map from topic names to PartitionsSpec objects.
+     */
+    public Map<String, PartitionsSpec> materialize() {
+        HashMap<String, PartitionsSpec> all = new HashMap<>();
+        for (Map.Entry<String, PartitionsSpec> entry : map.entrySet()) {
+            for (String topicName : StringExpander.expand(entry.getKey())) {
+                all.put(topicName, entry.getValue());
+            }
+        }
+        return all;
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 8101d9c6e4e..bb65bd543d1 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -26,9 +26,10 @@
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.rest.WorkerStopping;
-import org.apache.kafka.trogdor.task.SampleTaskSpec;
+import org.apache.kafka.trogdor.workload.PartitionsSpec;
 import org.apache.kafka.trogdor.workload.ProduceBenchSpec;
 import org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec;
+import org.apache.kafka.trogdor.workload.TopicsSpec;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -49,10 +50,11 @@ public void testDeserializationDoesNotProduceNulls() throws 
Exception {
         verify(new WorkerRunning(null, 0, null));
         verify(new WorkerStopping(null, 0, null));
         verify(new ProduceBenchSpec(0, 0, null, null,
-            0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 
3));
+            0, 0, null, null, null, null, null, null, null));
         verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, 
null,
             0, null, null, 0));
-        verify(new SampleTaskSpec(0, 0, null, null));
+        verify(new TopicsSpec());
+        verify(new PartitionsSpec(0, (short) 0, null));
     }
 
     private <T> void verify(T val1) throws Exception {
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java
new file mode 100644
index 00000000000..72e1c202bd8
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertEquals;
+
+public class StringExpanderTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    @Test
+    public void testNoExpansionNeeded() throws Exception {
+        assertEquals(Collections.singleton("foo"), 
StringExpander.expand("foo"));
+        assertEquals(Collections.singleton("bar"), 
StringExpander.expand("bar"));
+        assertEquals(Collections.singleton(""), StringExpander.expand(""));
+    }
+
+    @Test
+    public void testExpansions() throws Exception {
+        HashSet<String> expected1 = new HashSet<>(Arrays.asList(
+            "foo1",
+            "foo2",
+            "foo3"
+        ));
+        assertEquals(expected1, StringExpander.expand("foo[1-3]"));
+
+        HashSet<String> expected2 = new HashSet<>(Arrays.asList(
+            "foo bar baz 0"
+        ));
+        assertEquals(expected2, StringExpander.expand("foo bar baz [0-0]"));
+
+        HashSet<String> expected3 = new HashSet<>(Arrays.asList(
+            "[[ wow50 ]]",
+            "[[ wow51 ]]",
+            "[[ wow52 ]]"
+        ));
+        assertEquals(expected3, StringExpander.expand("[[ wow[50-52] ]]"));
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java
new file mode 100644
index 00000000000..f86ca0f1f57
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TopicsSpecTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    private final static TopicsSpec FOO;
+    private final static PartitionsSpec PARTSA;
+    private final static PartitionsSpec PARTSB;
+
+    static {
+        FOO = new TopicsSpec();
+
+        PARTSA = new PartitionsSpec(3, (short) 3, null);
+        FOO.set("topicA[0-2]", PARTSA);
+
+        Map<Integer, List<Integer>> assignmentsB = new HashMap<>();
+        assignmentsB.put(0, Arrays.asList(0, 1, 2));
+        assignmentsB.put(1, Arrays.asList(2, 3, 4));
+        PARTSB = new PartitionsSpec(0, (short) 0, assignmentsB);
+        FOO.set("topicB", PARTSB);
+    }
+
+    @Test
+    public void testMaterialize() {
+        Map<String, PartitionsSpec> parts = FOO.materialize();
+        assertTrue(parts.containsKey("topicA0"));
+        assertTrue(parts.containsKey("topicA1"));
+        assertTrue(parts.containsKey("topicA2"));
+        assertTrue(parts.containsKey("topicB"));
+        assertEquals(4, parts.keySet().size());
+        assertEquals(PARTSA, parts.get("topicA0"));
+        assertEquals(PARTSA, parts.get("topicA1"));
+        assertEquals(PARTSA, parts.get("topicA2"));
+        assertEquals(PARTSB, parts.get("topicB"));
+    }
+
+    @Test
+    public void testPartitionNumbers() {
+        List<Integer> partsANumbers = PARTSA.partitionNumbers();
+        assertEquals(Integer.valueOf(0), partsANumbers.get(0));
+        assertEquals(Integer.valueOf(1), partsANumbers.get(1));
+        assertEquals(Integer.valueOf(2), partsANumbers.get(2));
+        assertEquals(3, partsANumbers.size());
+
+        List<Integer> partsBNumbers = PARTSB.partitionNumbers();
+        assertEquals(Integer.valueOf(0), partsBNumbers.get(0));
+        assertEquals(Integer.valueOf(1), partsBNumbers.get(1));
+        assertEquals(2, partsBNumbers.size());
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make specifying partitions in RoundTripWorkload, ProduceBench more flexible
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-6771
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6771
>             Project: Kafka
>          Issue Type: Improvement
>          Components: system tests
>            Reporter: Colin P. McCabe
>            Assignee: Colin P. McCabe
>            Priority: Major
>
> Make specifying partitions in RoundTripWorkload, ProduceBench more flexible



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to