[ 
https://issues.apache.org/jira/browse/KAFKA-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434326#comment-16434326
 ] 

ASF GitHub Bot commented on KAFKA-6611:
---------------------------------------

guozhangwang closed pull request #4744: KAFKA-6611, PART II: Improve Streams 
SimpleBenchmark
URL: https://github.com/apache/kafka/pull/4744
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 3a9cbc4d664..fc4745d8c6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -832,6 +832,7 @@ public long maybeUpdate(long now) {
             long waitForMetadataFetch = this.metadataFetchInProgress ? 
requestTimeoutMs : 0;
 
             long metadataTimeout = Math.max(timeToNextMetadataUpdate, 
waitForMetadataFetch);
+
             if (metadataTimeout > 0) {
                 return metadataTimeout;
             }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 0514c995635..6a108269f9a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -252,6 +252,9 @@ private long sendProducerData(long now) {
             // and request metadata update, since there are messages to send 
to the topic.
             for (String topic : result.unknownLeaderTopics)
                 this.metadata.add(topic);
+
+            log.debug("Requesting metadata update due to unknown leader topics 
from the batched records: {}", result.unknownLeaderTopics);
+
             this.metadata.requestUpdate();
         }
 
@@ -557,9 +560,13 @@ else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
                 failBatch(batch, response, exception, batch.attempts() < 
this.retries);
             }
             if (error.exception() instanceof InvalidMetadataException) {
-                if (error.exception() instanceof 
UnknownTopicOrPartitionException)
+                if (error.exception() instanceof 
UnknownTopicOrPartitionException) {
                     log.warn("Received unknown topic or partition error in 
produce request on partition {}. The " +
                             "topic/partition may not exist or the user may not 
have Describe access to it", batch.topicPartition);
+                } else {
+                    log.warn("Received invalid metadata error in produce 
request on partition {} due to {}. Going " +
+                            "to request metadata update now", 
batch.topicPartition, error.exception().toString());
+                }
                 metadata.requestUpdate();
             }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index c66d78b7310..423184c5bdf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -40,6 +40,7 @@
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -48,9 +49,7 @@
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.TestUtils;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -59,7 +58,6 @@
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Class that provides support for a series of benchmarks. It is usually 
driven by
@@ -77,114 +75,125 @@
  * is still running "consume"
  */
 public class SimpleBenchmark {
+    private static final String LOADING_PRODUCER_CLIENT_ID = 
"simple-benchmark-loading-producer";
 
-    final String kafka;
-    final Boolean loadPhase;
-    final String testName;
-    final int numThreads;
-    final Properties props;
-    static final String ALL_TESTS = "all";
-    private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
+    private static final String SOURCE_TOPIC_ONE = 
"simpleBenchmarkSourceTopic1";
+    private static final String SOURCE_TOPIC_TWO = 
"simpleBenchmarkSourceTopic2";
     private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
 
-    private static final String COUNT_TOPIC = "countTopic";
-    private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
-    private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
     private static final String YAHOO_CAMPAIGNS_TOPIC = "yahooCampaigns";
     private static final String YAHOO_EVENTS_TOPIC = "yahooEvents";
-    private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], 
byte[], byte[]>() {
+
+    private static final ValueJoiner<byte[], byte[], byte[]> VALUE_JOINER = 
new ValueJoiner<byte[], byte[], byte[]>() {
         @Override
         public byte[] apply(final byte[] value1, final byte[] value2) {
-            if (value1 == null && value2 == null)
-                return new byte[VALUE_SIZE];
-            if (value1 == null && value2 != null)
-                return value2;
-            if (value1 != null && value2 == null)
+            // dump joiner in order to have as less join overhead as possible
+            if (value1 != null)
                 return value1;
-
-            byte[] tmp = new byte[value1.length + value2.length];
-            System.arraycopy(value1, 0, tmp, 0, value1.length);
-            System.arraycopy(value2, 0, tmp, value1.length, value2.length);
-            return tmp;
+            else if (value2 != null)
+                return value2;
+            else
+                return new byte[100];
         }
     };
 
-    int numRecords;
-    final AtomicInteger processedRecords = new AtomicInteger(0);
-    long processedBytes = 0;
-    private static final int VALUE_SIZE = 100;
+    private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
+    private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
+
+    long processedBytes = 0L;
+    int processedRecords = 0;
+
     private static final long POLL_MS = 500L;
     private static final long COMMIT_INTERVAL_MS = 30000L;
     private static final int MAX_POLL_RECORDS = 1000;
-    private static final int SOCKET_SIZE_BYTES = 1 * 1024 * 1024;
 
-    private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
-    private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
+    /* ----------- benchmark variables that are hard-coded ----------- */
+
+    private static final int KEY_SPACE_SIZE = 10000;
+
+    private static final long STREAM_STREAM_JOIN_WINDOW = 10000L;
+
+    private static final int SOCKET_SIZE_BYTES = 1024 * 1024;
+
+    // the following numbers are based on empirical results and should only
+    // be considered for updates when perf results have significantly changed
+
+    // with at least 10 million records, we run for at most 3 minutes
+    private static final int MAX_WAIT_MS = 3 * 60 * 1000;
+
+    /* ----------- benchmark variables that can be specified ----------- */
+
+    final String kafka;
+
+    final String testName;
+
+    final int numRecords;
 
-    public SimpleBenchmark(final Properties props, final String kafka, final 
Boolean loadPhase,
-                           final String testName, final int numRecords, final 
int numThreads) {
+    final Properties props;
+
+    private final int valueSize;
+
+    private final double keySkew;
+
+    /* ----------- ----------------------------------------- ----------- */
+
+
+    private SimpleBenchmark(final Properties props,
+                            final String kafka,
+                            final String testName,
+                            final int numRecords,
+                            final double keySkew,
+                            final int valueSize) {
         super();
         this.props = props;
         this.kafka = kafka;
-        this.loadPhase = loadPhase;
         this.testName = testName;
+        this.keySkew = keySkew;
+        this.valueSize = valueSize;
         this.numRecords = numRecords;
-        this.numThreads = numThreads;
     }
 
     private void run() {
         switch (testName) {
-            case ALL_TESTS:
-                // producer performance
-                produce(SOURCE_TOPIC);
-                // consumer performance
-                consume(SOURCE_TOPIC);
-                // simple stream performance source->process
-                processStream(SOURCE_TOPIC);
-                // simple stream performance source->sink
-                processStreamWithSink(SOURCE_TOPIC);
-                // simple stream performance source->store
-                processStreamWithStateStore(SOURCE_TOPIC);
-                // simple stream performance source->cache->store
-                processStreamWithCachedStateStore(SOURCE_TOPIC);
-                // simple aggregation
-                count(COUNT_TOPIC);
-                // simple streams performance KSTREAM-KTABLE join
-                kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", 
JOIN_TOPIC_2_PREFIX + "KStreamKTable");
-                // simple streams performance KSTREAM-KSTREAM join
-                kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", 
JOIN_TOPIC_2_PREFIX + "KStreamKStream");
-                // simple streams performance KTABLE-KTABLE join
-                kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", 
JOIN_TOPIC_2_PREFIX + "KTableKTable");
+            // loading phases
+            case "load-one":
+                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, 
numRecords, keySkew, valueSize);
                 break;
-            case "produce":
-                produce(SOURCE_TOPIC);
+            case "load-two":
+                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, 
numRecords, keySkew, valueSize);
+                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_TWO, 
numRecords, keySkew, valueSize);
                 break;
+
+            // testing phases
             case "consume":
-                consume(SOURCE_TOPIC);
+                consume(SOURCE_TOPIC_ONE);
+                break;
+            case "consumeproduce":
+                consumeAndProduce(SOURCE_TOPIC_ONE);
                 break;
-            case "count":
-                count(COUNT_TOPIC);
+            case "streamcount":
+                countStreamsNonWindowed(SOURCE_TOPIC_ONE);
                 break;
-            case "processstream":
-                processStream(SOURCE_TOPIC);
+            case "streamcountwindowed":
+                countStreamsWindowed(SOURCE_TOPIC_ONE);
                 break;
-            case "processstreamwithsink":
-                processStreamWithSink(SOURCE_TOPIC);
+            case "streamprocess":
+                processStream(SOURCE_TOPIC_ONE);
                 break;
-            case "processstreamwithstatestore":
-                processStreamWithStateStore(SOURCE_TOPIC);
+            case "streamprocesswithsink":
+                processStreamWithSink(SOURCE_TOPIC_ONE);
                 break;
-            case "processstreamwithcachedstatestore":
-                processStreamWithCachedStateStore(SOURCE_TOPIC);
+            case "streamprocesswithstatestore":
+                processStreamWithStateStore(SOURCE_TOPIC_ONE);
                 break;
-            case "kstreamktablejoin":
-                kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", 
JOIN_TOPIC_2_PREFIX + "KStreamKTable");
+            case "streamtablejoin":
+                streamTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                 break;
-            case "kstreamkstreamjoin":
-                kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", 
JOIN_TOPIC_2_PREFIX + "KStreamKStream");
+            case "streamstreamjoin":
+                streamStreamJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                 break;
-            case "ktablektablejoin":
-                kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", 
JOIN_TOPIC_2_PREFIX + "KTableKTable");
+            case "tabletablejoin":
+                tableTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                 break;
             case "yahoo":
                 yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
@@ -196,53 +205,58 @@ private void run() {
     }
 
     public static void main(String[] args) throws IOException {
-        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final int numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 
10000000;
-        final boolean loadPhase = args.length > 3 ? 
Boolean.parseBoolean(args[3]) : false;
-        final String testName = args.length > 4 ? 
args[4].toLowerCase(Locale.ROOT) : ALL_TESTS;
-        final int numThreads = args.length > 5 ? Integer.parseInt(args[5]) : 1;
+        if (args.length < 5) {
+            System.err.println("Not enough parameters are provided; expecting 
propFileName, testName, numRecords, keySkew, valueSize");
+            System.exit(1);
+        }
+
+        String propFileName = args[0];
+        String testName = args[1].toLowerCase(Locale.ROOT);
+        int numRecords = Integer.parseInt(args[2]);
+        double keySkew = Double.parseDouble(args[3]); // 0d means even 
distribution
+        int valueSize = Integer.parseInt(args[4]);
 
         final Properties props = Utils.loadProps(propFileName);
+        final String kafka = 
props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
 
-        final String stateDirStr;
-        if (props.containsKey(StreamsConfig.STATE_DIR_CONFIG)) {
-            stateDirStr = props.get(StreamsConfig.STATE_DIR_CONFIG).toString();
-        } else {
-            stateDirStr = TestUtils.tempDirectory().getAbsolutePath();
-            props.put(StreamsConfig.STATE_DIR_CONFIG, stateDirStr);
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + 
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
         }
 
-        final File stateDir = new File(stateDirStr);
-        stateDir.mkdir();
-
         // Note: this output is needed for automated tests and must not be 
removed
         System.out.println("StreamsTest instance started");
+
+        System.out.println("testName=" + testName);
         System.out.println("kafka=" + kafka);
         System.out.println("streamsProperties=" + props);
         System.out.println("numRecords=" + numRecords);
-        System.out.println("loadPhase=" + loadPhase);
-        System.out.println("testName=" + testName);
-        System.out.println("numThreads=" + numThreads);
+        System.out.println("keySkew=" + keySkew);
+        System.out.println("valueSize=" + valueSize);
+
+        SimpleBenchmark benchmark = new SimpleBenchmark(props, kafka, 
testName, numRecords, keySkew, valueSize);
 
-        SimpleBenchmark benchmark = new SimpleBenchmark(props, kafka, 
loadPhase, testName, numRecords, numThreads);
         benchmark.run();
     }
 
     public void setStreamProperties(final String applicationId) {
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         props.put(StreamsConfig.CLIENT_ID_CONFIG, "simple-benchmark");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArray().getClass());
         // the socket buffer needs to be large, especially when running in AWS 
with
         // high latency. if running locally the default is fine.
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArray().getClass());
         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
-        props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
-        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
+
+        // improve producer throughput
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
+
         //TODO remove this config or set to smaller value when KIP-91 is merged
         
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
 60000);
     }
@@ -251,11 +265,13 @@ private Properties setProduceConsumeProperties(final 
String clientId) {
         Properties props = new Properties();
         props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
         // the socket buffer needs to be large, especially when running in AWS 
with
         // high latency. if running locally the default is fine.
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
         props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
IntegerDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -266,521 +282,339 @@ private Properties setProduceConsumeProperties(final 
String clientId) {
         return props;
     }
 
-    private boolean maybeSetupPhase(final String topic, final String clientId,
-                                    final boolean skipIfAllTests) {
-        resetStats();
-        // initialize topics
-        if (loadPhase) {
-            if (skipIfAllTests) {
-                // if we run all tests, the produce test will have already 
loaded the data
-                if (testName.equals(ALL_TESTS)) {
-                    // Skipping loading phase since previously loaded
-                    return true;
-                }
-            }
-            System.out.println("Initializing topic " + topic);
-            // WARNING: The keys must be sequential, i.e., unique, otherwise 
the logic for when this test
-            // stops will not work (in createCountStreams)
-            produce(topic, VALUE_SIZE, clientId, numRecords, true, numRecords, 
false);
-            return true;
-        }
-        return false;
-    }
-
     void resetStats() {
-        processedRecords.set(0);
-        processedBytes = 0;
-    }
-
-
-    private KafkaStreams createCountStreams(Properties streamConfig, String 
topic, final CountDownLatch latch) {
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, byte[]> input = builder.stream(topic);
-
-        input.groupByKey()
-            .count("tmpStoreName").foreach(new CountDownAction(latch));
-
-        return new KafkaStreams(builder.build(), streamConfig);
-    }
-
-
-    private void yahooBenchmark(final String campaignsTopic, final String 
eventsTopic) {
-        YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, 
eventsTopic);
-
-        benchmark.run();
-    }
-
-    /**
-     * Measure the performance of a simple aggregate like count.
-     * Counts the occurrence of numbers (note that normally people count 
words, this
-     * example counts numbers)
-     * @param countTopic Topic where numbers are stored
-     * @throws Exception
-     */
-    public void count(String countTopic) {
-        if (maybeSetupPhase(countTopic, "simple-benchmark-produce-count", 
false)) {
-            return;
-        }
-
-        CountDownLatch latch = new CountDownLatch(1);
-        setStreamProperties("simple-benchmark-count");
-        final KafkaStreams streams = createCountStreams(props, countTopic, 
latch);
-        runGenericBenchmark(streams, "Streams Count Performance 
[records/latency/rec-sec/MB-sec counted]: ", latch);
-    }
-
-    /**
-     * Measure the performance of a KStream-KTable left join. The setup is 
such that each
-     * KStream record joins to exactly one element in the KTable
-     */
-    public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) {
-        if (maybeSetupPhase(kStreamTopic, "simple-benchmark-produce-kstream", 
false)) {
-            maybeSetupPhase(kTableTopic, "simple-benchmark-produce-ktable", 
false);
-            return;
-        }
-
-        CountDownLatch latch = new CountDownLatch(1);
-
-        // setup join
-        setStreamProperties("simple-benchmark-kstream-ktable-join");
-        final KafkaStreams streams = 
createKafkaStreamsKStreamKTableJoin(props, kStreamTopic, kTableTopic, latch);
-
-        // run benchmark
-        runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin 
Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
+        processedRecords = 0;
+        processedBytes = 0L;
     }
 
     /**
-     * Measure the performance of a KStream-KStream left join. The setup is 
such that each
-     * KStream record joins to exactly one element in the other KStream
+     * Produce values to a topic
+     * @param clientId String specifying client ID
+     * @param topic Topic to produce to
+     * @param numRecords Number of records to produce
+     * @param keySkew Key zipf distribution skewness
+     * @param valueSize Size of value in bytes
      */
-    public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) 
{
-        if (maybeSetupPhase(kStreamTopic1, 
"simple-benchmark-produce-kstream-topic1", false)) {
-            maybeSetupPhase(kStreamTopic2, 
"simple-benchmark-produce-kstream-topic2", false);
-            return;
-        }
-
-        CountDownLatch latch = new CountDownLatch(1);
-
-        // setup join
-        setStreamProperties("simple-benchmark-kstream-kstream-join");
-        final KafkaStreams streams = 
createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1, kStreamTopic2, 
latch);
-
-        // run benchmark
-        runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin 
Performance [records/latency/rec-sec/MB-sec  joined]: ", latch);
-    }
+    private void produce(final String clientId,
+                         final String topic,
+                         final int numRecords,
+                         final double keySkew,
+                         final int valueSize) {
+        final Properties props = setProduceConsumeProperties(clientId);
+        final ZipfGenerator keyGen = new ZipfGenerator(KEY_SPACE_SIZE, 
keySkew);
+        final KafkaProducer<Integer, byte[]> producer = new 
KafkaProducer<>(props);
+
+        final byte[] value = new byte[valueSize];
+        // put some random values to increase entropy. Some devices
+        // like SSDs do compression and if the array is all zeros
+        // the performance will be too good.
+        new Random().nextBytes(value);
 
-    /**
-     * Measure the performance of a KTable-KTable left join. The setup is such 
that each
-     * KTable record joins to exactly one element in the other KTable
-     */
-    public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) {
-        if (maybeSetupPhase(kTableTopic1, 
"simple-benchmark-produce-ktable-topic1", false)) {
-            maybeSetupPhase(kTableTopic2, 
"simple-benchmark-produce-ktable-topic2", false);
-            return;
+        for (int i = 0; i < numRecords; i++) {
+            producer.send(new ProducerRecord<>(topic, keyGen.next(), value));
         }
-        CountDownLatch latch = new CountDownLatch(1);
-
-        // setup join
-        setStreamProperties("simple-benchmark-ktable-ktable-join");
-        final KafkaStreams streams = createKafkaStreamsKTableKTableJoin(props, 
kTableTopic1, kTableTopic2, latch);
 
-        // run benchmark
-        runGenericBenchmark(streams, "Streams KTableKTable LeftJoin 
Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
-    }
-
-    void printResults(final String nameOfBenchmark, final long latency) {
-        System.out.println(nameOfBenchmark +
-            processedRecords.get() + "/" +
-            latency + "/" +
-            recordsPerSec(latency, processedRecords.get()) + "/" +
-            megabytesPerSec(latency, processedBytes));
+        producer.close();
     }
 
-    void runGenericBenchmark(final KafkaStreams streams, final String 
nameOfBenchmark, final CountDownLatch latch) {
-        streams.start();
+    private void consumeAndProduce(final String topic) {
+        final Properties consumerProps = 
setProduceConsumeProperties("simple-benchmark-consumer");
+        final Properties producerProps = 
setProduceConsumeProperties("simple-benchmark-producer");
 
-        long startTime = System.currentTimeMillis();
+        final KafkaConsumer<Integer, byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+        final KafkaProducer<Integer, byte[]> producer = new 
KafkaProducer<>(producerProps);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, 
topic);
 
-        while (latch.getCount() > 0) {
-            try {
-                latch.await();
-            } catch (InterruptedException ex) {
-                Thread.currentThread().interrupt();
-            }
-        }
-        long endTime = System.currentTimeMillis();
-        printResults(nameOfBenchmark, endTime - startTime);
-
-        streams.close();
-    }
-
-    private long startStreamsThread(final KafkaStreams streams, final 
CountDownLatch latch) {
-        Thread thread = new Thread() {
-            public void run() {
-                streams.start();
-            }
-        };
-        thread.start();
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
 
         long startTime = System.currentTimeMillis();
 
-        while (latch.getCount() > 0) {
-            try {
-                latch.await();
-            } catch (InterruptedException ex) {
-                Thread.interrupted();
+        boolean keepProcessing = true;
+        while (keepProcessing) {
+            ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
+            if (records.isEmpty()) {
+                if (processedRecords == numRecords)
+                    keepProcessing = false;
+            } else {
+                for (ConsumerRecord<Integer, byte[]> record : records) {
+                    producer.send(new ProducerRecord<>(SINK_TOPIC, 
record.key(), record.value()));
+                    processedRecords++;
+                    processedBytes += record.value().length + Integer.SIZE;
+                }
             }
+            if (processedRecords == numRecords)
+                keepProcessing = false;
         }
 
         long endTime = System.currentTimeMillis();
 
-        streams.close();
-        try {
-            thread.join();
-        } catch (Exception ex) {
-            // ignore
-        }
-
-        return endTime - startTime;
-    }
-
-    public void processStream(final String topic) {
-        if (maybeSetupPhase(topic, "simple-benchmark-process-stream-load", 
true)) {
-            return;
-        }
-
-        CountDownLatch latch = new CountDownLatch(1);
-
-        final KafkaStreams streams = createKafkaStreams(topic, latch);
-        long latency = startStreamsThread(streams, latch);
-
-        printResults("Streams Performance [records/latency/rec-sec/MB-sec 
source]: ", latency);
-    }
-
-    public void processStreamWithSink(String topic) {
-        if (maybeSetupPhase(topic, 
"simple-benchmark-process-stream-with-sink-load", true)) {
-            return;
-        }
-
-        CountDownLatch latch = new CountDownLatch(1);
-        final KafkaStreams streams = createKafkaStreamsWithSink(topic, latch);
-        long latency = startStreamsThread(streams, latch);
-
-        printResults("Streams Performance [records/latency/rec-sec/MB-sec 
source+sink]: ", latency);
-
-    }
-
-    public void processStreamWithStateStore(String topic) {
-        if (maybeSetupPhase(topic, 
"simple-benchmark-process-stream-with-state-store-load", true)) {
-            return;
-        }
-
-        CountDownLatch latch = new CountDownLatch(1);
-        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, 
latch, false);
-        long latency = startStreamsThread(streams, latch);
-        printResults("Streams Performance [records/latency/rec-sec/MB-sec 
source+store]: ", latency);
-
-    }
-
-    public void processStreamWithCachedStateStore(String topic) {
-        if (maybeSetupPhase(topic, 
"simple-benchmark-process-stream-with-cached-state-store-load", true)) {
-            return;
-        }
-
-        CountDownLatch latch = new CountDownLatch(1);
-        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, 
latch, true);
-        long latency = startStreamsThread(streams, latch);
-        printResults("Streams Performance [records/latency/rec-sec/MB-sec 
source+cache+store]: ", latency);
-    }
-
-    public void produce(String topic) {
-        // loading phase does not make sense for producer
-        if (loadPhase) {
-            resetStats();
-            return;
-        }
-        produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, 
true, numRecords, true);
-    }
-    /**
-     * Produce values to a topic
-     * @param topic Topic to produce to
-     * @param valueSizeBytes Size of value in bytes
-     * @param clientId String specifying client ID
-     * @param numRecords Number of records to produce
-     * @param sequential if True, then keys are produced sequentially from 0 
to upperRange. In this case upperRange must be >= numRecords.
-     *                   if False, then keys are produced randomly in range 
[0, upperRange)
-     * @param printStats if True, print stats on how long producing took. If 
False, don't print stats. False can be used
-     *                   when this produce step is part of another benchmark 
that produces its own stats
-     */
-    private void produce(String topic, int valueSizeBytes, String clientId, 
int numRecords, boolean sequential,
-                         int upperRange, boolean printStats) {
-
-
-        if (sequential) {
-            if (upperRange < numRecords) throw new 
IllegalArgumentException("UpperRange must be >= numRecords");
-        }
-        if (!sequential) {
-            System.out.println("WARNING: You are using non-sequential keys. If 
your tests' exit logic expects to see a final key, random keys may not work.");
-        }
-        Properties props = setProduceConsumeProperties(clientId);
-
-        int key = 0;
-        Random rand = new Random();
-        KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(props);
-
-        byte[] value = new byte[valueSizeBytes];
-        // put some random values to increase entropy. Some devices
-        // like SSDs do compression and if the array is all zeros
-        // the performance will be too good.
-        new Random().nextBytes(value);
-        long startTime = System.currentTimeMillis();
-
-        if (sequential) key = 0;
-        else key = rand.nextInt(upperRange);
-        for (int i = 0; i < numRecords; i++) {
-            producer.send(new ProducerRecord<>(topic, key, value));
-            if (sequential) key++;
-            else key = rand.nextInt(upperRange);
-            processedRecords.getAndIncrement();
-            processedBytes += value.length + Integer.SIZE;
-        }
+        consumer.close();
         producer.close();
 
-        long endTime = System.currentTimeMillis();
-
-        if (printStats) {
-            printResults("Producer Performance [records/latency/rec-sec/MB-sec 
write]: ", endTime - startTime);
-        }
+        printResults("ConsumerProducer Performance 
[records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
     }
 
-    public void consume(String topic) {
-        if (maybeSetupPhase(topic, "simple-benchmark-consumer-load", true)) {
-            return;
-        }
+    private void consume(final String topic) {
+        final Properties consumerProps = 
setProduceConsumeProperties("simple-benchmark-consumer");
+        final KafkaConsumer<Integer, byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, 
topic);
 
-        Properties props = 
setProduceConsumeProperties("simple-benchmark-consumer");
-
-        KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(props);
-
-        List<TopicPartition> partitions = getAllPartitions(consumer, topic);
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
-        Integer key = null;
-
         long startTime = System.currentTimeMillis();
 
         while (true) {
             ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
             if (records.isEmpty()) {
-                if (processedRecords.get() == numRecords)
+                if (processedRecords == numRecords)
                     break;
             } else {
                 for (ConsumerRecord<Integer, byte[]> record : records) {
-                    processedRecords.getAndIncrement();
+                    processedRecords++;
                     processedBytes += record.value().length + Integer.SIZE;
-                    Integer recKey = record.key();
-                    if (key == null || key < recKey)
-                        key = recKey;
-                    if (processedRecords.get() == numRecords)
+                    if (processedRecords == numRecords)
                         break;
                 }
             }
-            if (processedRecords.get() == numRecords)
+            if (processedRecords == numRecords)
                 break;
         }
 
         long endTime = System.currentTimeMillis();
 
         consumer.close();
+
         printResults("Consumer Performance [records/latency/rec-sec/MB-sec 
read]: ", endTime - startTime);
     }
 
-    private KafkaStreams createKafkaStreams(String topic, final CountDownLatch 
latch) {
-        setStreamProperties("simple-benchmark-streams");
+    private void processStream(final String topic) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        setStreamProperties("simple-benchmark-streams-source");
 
         StreamsBuilder builder = new StreamsBuilder();
 
         KStream<Integer, byte[]> source = builder.stream(topic, 
Consumed.with(INTEGER_SERDE, BYTE_SERDE));
 
-        source.process(new ProcessorSupplier<Integer, byte[]>() {
-            @Override
-            public Processor<Integer, byte[]> get() {
-                return new AbstractProcessor<Integer, byte[]>() {
+        source.peek(new CountDownAction(latch));
 
-                    @Override
-                    public void init(ProcessorContext context) {
-                    }
+        final KafkaStreams streams = 
createKafkaStreamsWithExceptionHandler(builder, props);
+        runGenericBenchmark(streams, "Streams Source Performance 
[records/latency/rec-sec/MB-sec joined]: ", latch);
+    }
 
-                    @Override
-                    public void process(Integer key, byte[] value) {
-                        processedRecords.getAndIncrement();
-                        processedBytes += value.length + Integer.SIZE;
-                        if (processedRecords.get() == numRecords) {
-                            latch.countDown();
-                        }
-                    }
+    private void processStreamWithSink(final String topic) {
+        final CountDownLatch latch = new CountDownLatch(1);
 
-                    @Override
-                    public void punctuate(long timestamp) {
-                    }
+        setStreamProperties("simple-benchmark-streams-source-sink");
 
-                    @Override
-                    public void close() {
-                    }
-                };
-            }
-        });
+        StreamsBuilder builder = new StreamsBuilder();
 
-        return createKafkaStreamsWithExceptionHandler(builder, props);
+        KStream<Integer, byte[]> source = builder.stream(topic);
+
+        source.peek(new CountDownAction(latch)).to(SINK_TOPIC);
+
+        final KafkaStreams streams = 
createKafkaStreamsWithExceptionHandler(builder, props);
+        runGenericBenchmark(streams, "Streams SourceSink Performance 
[records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
-    private KafkaStreams createKafkaStreamsWithSink(String topic, final 
CountDownLatch latch) {
-        setStreamProperties("simple-benchmark-streams-with-sink");
+    private void processStreamWithStateStore(String topic) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        setStreamProperties("simple-benchmark-streams-with-store");
 
         StreamsBuilder builder = new StreamsBuilder();
+        final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder
+                = 
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), 
INTEGER_SERDE, BYTE_SERDE);
+        builder.addStateStore(storeBuilder);
 
-        KStream<Integer, byte[]> source = builder.stream(topic, 
Consumed.with(INTEGER_SERDE, BYTE_SERDE));
+        KStream<Integer, byte[]> source = builder.stream(topic);
 
-        source.to(INTEGER_SERDE, BYTE_SERDE, SINK_TOPIC);
-        source.process(new ProcessorSupplier<Integer, byte[]>() {
+        source.peek(new CountDownAction(latch)).process(new 
ProcessorSupplier<Integer, byte[]>() {
             @Override
             public Processor<Integer, byte[]> get() {
                 return new AbstractProcessor<Integer, byte[]>() {
+                    KeyValueStore<Integer, byte[]> store;
+
+                    @SuppressWarnings("unchecked")
                     @Override
                     public void init(ProcessorContext context) {
+                        store = (KeyValueStore<Integer, byte[]>) 
context.getStateStore("store");
                     }
 
                     @Override
                     public void process(Integer key, byte[] value) {
-                        processedRecords.getAndIncrement();
-                        processedBytes += value.length + Integer.SIZE;
-                        if (processedRecords.get() == numRecords) {
-                            latch.countDown();
-                        }
+                        store.put(key, value);
                     }
 
                     @Override
-                    public void punctuate(long timestamp) {
-                    }
+                    public void punctuate(long timestamp) {}
 
                     @Override
-                    public void close() {
-                    }
+                    public void close() {}
                 };
             }
-        });
+        }, "store");
 
-        return createKafkaStreamsWithExceptionHandler(builder, props);
+        final KafkaStreams streams = 
createKafkaStreamsWithExceptionHandler(builder, props);
+        runGenericBenchmark(streams, "Streams Stateful Performance 
[records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
-    private class CountDownAction<V> implements ForeachAction<Integer, V> {
-        private CountDownLatch latch;
-        CountDownAction(final CountDownLatch latch) {
-            this.latch = latch;
-        }
-        @Override
-        public void apply(Integer key, V value) {
-            processedRecords.getAndIncrement();
-            if (value instanceof byte[]) {
-                processedBytes += ((byte[]) value).length + Integer.SIZE;
-            } else if (value instanceof Long) {
-                processedBytes += Long.SIZE + Integer.SIZE;
-            } else {
-                System.err.println("Unknown value type in CountDownAction");
-            }
-            if (processedRecords.get() == numRecords) {
-                this.latch.countDown();
-            }
-        }
+    /**
+     * Measure the performance of a simple aggregate like count.
+     * Counts the occurrence of numbers (note that normally people count 
words, this
+     * example counts numbers)
+     */
+    private void countStreamsNonWindowed(String sourceTopic) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        setStreamProperties("simple-benchmark-nonwindowed-count");
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<Integer, byte[]> input = builder.stream(sourceTopic);
+
+        input.peek(new CountDownAction(latch))
+                .groupByKey()
+                .count();
+
+        final KafkaStreams streams = 
createKafkaStreamsWithExceptionHandler(builder, props);
+        runGenericBenchmark(streams, "Streams Count Performance 
[records/latency/rec-sec/MB-sec counted]: ", latch);
     }
 
-    private KafkaStreams createKafkaStreamsKStreamKTableJoin(Properties 
streamConfig, String kStreamTopic,
-                                                             String 
kTableTopic, final CountDownLatch latch) {
+    /**
+     * Measure the performance of a simple aggregate like count.
+     * Counts the occurrence of numbers (note that normally people count 
words, this
+     * example counts numbers)
+     */
+    private void countStreamsWindowed(String sourceTopic) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        setStreamProperties("simple-benchmark-windowed-count");
+
         final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<Integer, byte[]> input = builder.stream(sourceTopic);
 
-        final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic);
-        final KTable<Long, byte[]> input2 = builder.table(kTableTopic);
+        input.peek(new CountDownAction(latch))
+                .groupByKey()
+                .windowedBy(TimeWindows.of(1000).advanceBy(500))
+                .count();
+
+        final KafkaStreams streams = 
createKafkaStreamsWithExceptionHandler(builder, props);
+        runGenericBenchmark(streams, "Streams Count Windowed Performance 
[records/latency/rec-sec/MB-sec counted]: ", latch);
+    }
+
+    /**
+     * Measure the performance of a KStream-KTable left join. The setup is 
such that each
+     * KStream record joins to exactly one element in the KTable
+     */
+    private void streamTableJoin(String kStreamTopic, String kTableTopic) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        setStreamProperties("simple-benchmark-stream-table-join");
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, byte[]> input1 = builder.stream(kStreamTopic);
+        final KTable<Integer, byte[]> input2 = builder.table(kTableTopic);
 
         input1.leftJoin(input2, VALUE_JOINER).foreach(new 
CountDownAction(latch));
 
-        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
+        final KafkaStreams streams = 
createKafkaStreamsWithExceptionHandler(builder, props);
+
+        // run benchmark
+        runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin 
Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
-    private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties 
streamConfig, String kTableTopic1,
-                                                            String 
kTableTopic2, final CountDownLatch latch) {
+    /**
+     * Measure the performance of a KStream-KStream left join. The setup is 
such that each
+     * KStream record joins to exactly one element in the other KStream
+     */
+    private void streamStreamJoin(String kStreamTopic1, String kStreamTopic2) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        setStreamProperties("simple-benchmark-stream-stream-join");
+
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KTable<Long, byte[]> input1 = builder.table(kTableTopic1);
-        final KTable<Long, byte[]> input2 = builder.table(kTableTopic2);
+        final KStream<Integer, byte[]> input1 = builder.stream(kStreamTopic1);
+        final KStream<Integer, byte[]> input2 = builder.stream(kStreamTopic2);
 
-        input1.leftJoin(input2, VALUE_JOINER).foreach(new 
CountDownAction(latch));
+        input1.leftJoin(input2, VALUE_JOINER, 
JoinWindows.of(STREAM_STREAM_JOIN_WINDOW)).foreach(new CountDownAction(latch));
+
+        final KafkaStreams streams = 
createKafkaStreamsWithExceptionHandler(builder, props);
 
-        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
+        // run benchmark
+        runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin 
Performance [records/latency/rec-sec/MB-sec  joined]: ", latch);
     }
 
-    private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties 
streamConfig, String kStreamTopic1,
-                                                              String 
kStreamTopic2, final CountDownLatch latch) {
+    /**
+     * Measure the performance of a KTable-KTable left join. The setup is such 
that each
+     * KTable record joins to exactly one element in the other KTable
+     */
+    private void tableTableJoin(String kTableTopic1, String kTableTopic2) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // setup join
+        setStreamProperties("simple-benchmark-table-table-join");
+
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic1);
-        final KStream<Long, byte[]> input2 = builder.stream(kStreamTopic2);
-        final long timeDifferenceMs = 10000L;
+        final KTable<Integer, byte[]> input1 = builder.table(kTableTopic1);
+        final KTable<Integer, byte[]> input2 = builder.table(kTableTopic2);
 
-        input1.leftJoin(input2, VALUE_JOINER, 
JoinWindows.of(timeDifferenceMs)).foreach(new CountDownAction(latch));
+        input1.leftJoin(input2, VALUE_JOINER).toStream().foreach(new 
CountDownAction(latch));
 
-        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
+        final KafkaStreams streams = 
createKafkaStreamsWithExceptionHandler(builder, props);
+
+        // run benchmark
+        runGenericBenchmark(streams, "Streams KTableKTable LeftJoin 
Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
-    private KafkaStreams createKafkaStreamsWithStateStore(String topic,
-                                                          final CountDownLatch 
latch,
-                                                          boolean 
enableCaching) {
-        setStreamProperties("simple-benchmark-streams-with-store" + 
enableCaching);
+    void printResults(final String nameOfBenchmark, final long latency) {
+        System.out.println(nameOfBenchmark +
+            processedRecords + "/" +
+            latency + "/" +
+            recordsPerSec(latency, processedRecords) + "/" +
+            megabytesPerSec(latency, processedBytes));
+    }
 
-        StreamsBuilder builder = new StreamsBuilder();
+    void runGenericBenchmark(final KafkaStreams streams, final String 
nameOfBenchmark, final CountDownLatch latch) {
+        streams.start();
 
-        final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder
-                = 
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), 
Serdes.Integer(), Serdes.ByteArray());
-        if (enableCaching) {
-            builder.addStateStore(storeBuilder.withCachingEnabled());
-        } else {
-            builder.addStateStore(storeBuilder);
+        long startTime = System.currentTimeMillis();
+        long endTime = startTime;
+
+        while (latch.getCount() > 0 && (endTime - startTime < MAX_WAIT_MS)) {
+            try {
+                latch.await(1000, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException ex) {
+                Thread.interrupted();
+            }
+
+            endTime = System.currentTimeMillis();
         }
-        KStream<Integer, byte[]> source = builder.stream(topic, 
Consumed.with(INTEGER_SERDE, BYTE_SERDE));
+        streams.close();
 
-        source.process(new ProcessorSupplier<Integer, byte[]>() {
-            @Override
-            public Processor<Integer, byte[]> get() {
-                return new AbstractProcessor<Integer, byte[]>() {
-                    KeyValueStore<Integer, byte[]> store;
+        printResults(nameOfBenchmark, endTime - startTime);
+    }
 
-                    @SuppressWarnings("unchecked")
-                    @Override
-                    public void init(ProcessorContext context) {
-                        store = (KeyValueStore<Integer, byte[]>) 
context.getStateStore("store");
-                    }
+    private class CountDownAction implements ForeachAction<Integer, byte[]> {
+        private final CountDownLatch latch;
 
-                    @Override
-                    public void process(Integer key, byte[] value) {
-                        store.put(key, value);
-                        processedRecords.getAndIncrement();
-                        processedBytes += value.length + Integer.SIZE;
-                        if (processedRecords.get() == numRecords) {
-                            latch.countDown();
-                        }
-                    }
+        CountDownAction(final CountDownLatch latch) {
+            this.latch = latch;
+        }
 
-                    @Override
-                    public void punctuate(long timestamp) {
-                    }
+        @Override
+        public void apply(final Integer key, final byte[] value) {
+            processedRecords++;
+            processedBytes += Integer.SIZE + value.length;
 
-                    @Override
-                    public void close() {
-                    }
-                };
+            if (processedRecords == numRecords) {
+                this.latch.countDown();
             }
-        }, "store");
-
-        return createKafkaStreamsWithExceptionHandler(builder, props);
+        }
     }
 
     private KafkaStreams createKafkaStreamsWithExceptionHandler(final 
StreamsBuilder builder, final Properties props) {
@@ -816,4 +650,47 @@ private double recordsPerSec(long time, int numRecords) {
         return partitions;
     }
 
+    private void yahooBenchmark(final String campaignsTopic, final String 
eventsTopic) {
+        YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, 
eventsTopic);
+
+        benchmark.run();
+    }
+
+    private class ZipfGenerator {
+        private Random rand = new Random(System.currentTimeMillis());
+        private int size;
+        private double skew;
+        private double bottom = 0.0d;
+
+        ZipfGenerator(int size, double skew) {
+            this.size = size;
+            this.skew = skew;
+
+            for (int i = 1; i < size; i++) {
+                this.bottom += 1.0d / Math.pow(i, this.skew);
+            }
+        }
+
+        int next() {
+            if (skew == 0.0d) {
+                return rand.nextInt(size);
+            } else {
+                int rank;
+                double dice;
+                double friquency;
+
+                rank = rand.nextInt(size);
+                friquency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
+                dice = rand.nextDouble();
+
+                while (!(dice < friquency)) {
+                    rank = rand.nextInt(size);
+                    friquency = (1.0d / Math.pow(rank, this.skew)) / 
this.bottom;
+                    dice = rand.nextDouble();
+                }
+
+                return rank;
+            }
+        }
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index f63a71ff2e4..ddb7caaa26e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -89,98 +89,78 @@ public YahooBenchmark(final SimpleBenchmark parent, final 
String campaignsTopic,
     }
 
     // just for Yahoo benchmark
-    private boolean maybeSetupPhaseCampaigns(final String topic, final String 
clientId,
+    private boolean maybeSetupPhaseCampaigns(final String topic,
+                                             final String clientId,
                                              final boolean skipIfAllTests,
-                                             final int numCampaigns, final int 
adsPerCampaign,
+                                             final int numCampaigns,
+                                             final int adsPerCampaign,
                                              final List<String> ads) {
         parent.resetStats();
         // initialize topics
-        if (parent.loadPhase) {
-            if (skipIfAllTests) {
-                // if we run all tests, the produce test will have already 
loaded the data
-                if (parent.testName.equals(SimpleBenchmark.ALL_TESTS)) {
-                    // Skipping loading phase since previously loaded
-                    return true;
-                }
-            }
-            System.out.println("Initializing topic " + topic);
-
-            Properties props = new Properties();
-            props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka);
-            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-
-            KafkaProducer<String, String> producer = new 
KafkaProducer<>(props);
-            for (int c = 0; c < numCampaigns; c++) {
-                String campaignID = UUID.randomUUID().toString();
-                for (int a = 0; a < adsPerCampaign; a++) {
-                    String adId = UUID.randomUUID().toString();
-                    String concat = adId + ":" + campaignID;
-                    producer.send(new ProducerRecord<>(topic, adId, concat));
-                    ads.add(adId);
-                    parent.processedRecords.getAndIncrement();
-                    parent.processedBytes += concat.length() + adId.length();
-                }
+        System.out.println("Initializing topic " + topic);
+
+        Properties props = new Properties();
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+
+        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
+        for (int c = 0; c < numCampaigns; c++) {
+            String campaignID = UUID.randomUUID().toString();
+            for (int a = 0; a < adsPerCampaign; a++) {
+                String adId = UUID.randomUUID().toString();
+                String concat = adId + ":" + campaignID;
+                producer.send(new ProducerRecord<>(topic, adId, concat));
+                ads.add(adId);
+                parent.processedRecords++;
+                parent.processedBytes += concat.length() + adId.length();
             }
-            return true;
         }
-        return false;
+        return true;
     }
 
     // just for Yahoo benchmark
-    private boolean maybeSetupPhaseEvents(final String topic, final String 
clientId,
-                                          final boolean skipIfAllTests, final 
int numRecords,
-                                          final List<String> ads) {
+    private void maybeSetupPhaseEvents(final String topic,
+                                       final String clientId,
+                                       final int numRecords,
+                                       final List<String> ads) {
         parent.resetStats();
         String[] eventTypes = new String[]{"view", "click", "purchase"};
         Random rand = new Random();
-        // initialize topics
-        if (parent.loadPhase) {
-            if (skipIfAllTests) {
-                // if we run all tests, the produce test will have already 
loaded the data
-                if (parent.testName.equals(SimpleBenchmark.ALL_TESTS)) {
-                    // Skipping loading phase since previously loaded
-                    return true;
-                }
-            }
-            System.out.println("Initializing topic " + topic);
+        System.out.println("Initializing topic " + topic);
 
-            Properties props = new Properties();
-            props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka);
-            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        final Properties props = new Properties();
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
 
-            KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(props);
+        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
 
-            long startTime = System.currentTimeMillis();
+        final long startTime = System.currentTimeMillis();
 
-            ProjectedEvent event = new ProjectedEvent();
+        ProjectedEvent event = new ProjectedEvent();
 
-            Map<String, Object> serdeProps = new HashMap<>();
-            final Serializer<ProjectedEvent> projectedEventSerializer = new 
JsonPOJOSerializer<>();
-            serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
-            projectedEventSerializer.configure(serdeProps, false);
-
-            for (int i = 0; i < numRecords; i++) {
-                event.eventType = eventTypes[rand.nextInt(eventTypes.length - 
1)];
-                event.adID = ads.get(rand.nextInt(ads.size() - 1));
-                event.eventTime = System.currentTimeMillis();
-                byte[] value = projectedEventSerializer.serialize(topic, 
event);
-                producer.send(new ProducerRecord<>(topic, event.adID, value));
-                parent.processedRecords.getAndIncrement();
-                parent.processedBytes += value.length + event.adID.length();
-            }
-            producer.close();
+        Map<String, Object> serdeProps = new HashMap<>();
+        final Serializer<ProjectedEvent> projectedEventSerializer = new 
JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
+        projectedEventSerializer.configure(serdeProps, false);
 
-            long endTime = System.currentTimeMillis();
+        for (int i = 0; i < numRecords; i++) {
+            event.eventType = eventTypes[rand.nextInt(eventTypes.length - 1)];
+            event.adID = ads.get(rand.nextInt(ads.size() - 1));
+            event.eventTime = System.currentTimeMillis();
+            byte[] value = projectedEventSerializer.serialize(topic, event);
+            producer.send(new ProducerRecord<>(topic, event.adID, value));
+            parent.processedRecords++;
+            parent.processedBytes += value.length + event.adID.length();
+        }
+        producer.close();
 
+        long endTime = System.currentTimeMillis();
 
-            parent.printResults("Producer Performance 
[records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
-            return true;
-        }
-        return false;
+        parent.printResults("Producer Performance 
[records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
     }
 
 
@@ -189,12 +169,8 @@ public void run() {
         int adsPerCampaign = 10;
 
         List<String> ads = new ArrayList<>(numCampaigns * adsPerCampaign);
-        if (maybeSetupPhaseCampaigns(campaignsTopic, 
"simple-benchmark-produce-campaigns", false,
-            numCampaigns, adsPerCampaign, ads)) {
-            maybeSetupPhaseEvents(eventsTopic, 
"simple-benchmark-produce-events", false,
-                parent.numRecords, ads);
-            return;
-        }
+        maybeSetupPhaseCampaigns(campaignsTopic, 
"simple-benchmark-produce-campaigns", false, numCampaigns, adsPerCampaign, ads);
+        maybeSetupPhaseEvents(eventsTopic, "simple-benchmark-produce-events", 
parent.numRecords, ads);
 
         CountDownLatch latch = new CountDownLatch(1);
         parent.setStreamProperties("simple-benchmark-yahoo" + new 
Random().nextInt());
@@ -299,11 +275,11 @@ private KafkaStreams createYahooBenchmarkStreams(final 
Properties streamConfig,
             .peek(new ForeachAction<String, ProjectedEvent>() {
                 @Override
                 public void apply(String key, ProjectedEvent value) {
-                    parent.processedRecords.getAndIncrement();
-                    if (parent.processedRecords.get() % 1000000 == 0) {
-                        System.out.println("Processed " + 
parent.processedRecords.get());
+                    parent.processedRecords++;
+                    if (parent.processedRecords % 1000000 == 0) {
+                        System.out.println("Processed " + 
parent.processedRecords);
                     }
-                    if (parent.processedRecords.get() >= numRecords) {
+                    if (parent.processedRecords >= numRecords) {
                         latch.countDown();
                     }
                 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 37d0cb6e7d0..e897088beca 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -48,14 +48,24 @@
     private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic";
 
     public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("BrokerCompatibilityTest are expecting two 
parameters: propFile, eosEnabled; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
         System.out.println("StreamsTest instance started");
 
-        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final boolean eosEnabled = args.length > 2 ? 
Boolean.parseBoolean(args[2]) : false;
+        final String propFileName = args[0];
+        final boolean eosEnabled = Boolean.parseBoolean(args[1]);
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
-        streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        final String kafka = 
streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + 
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
+
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"kafka-streams-system-test-broker-compatibility");
         streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index ecc3b916ca6..2d39d53836b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -37,7 +37,6 @@
 public class EosTestClient extends SmokeTestUtil {
 
     static final String APP_ID = "EosTest";
-    private final String kafka;
     private final Properties properties;
     private final boolean withRepartitioning;
     private final AtomicBoolean notRunningCallbackReceived = new 
AtomicBoolean(false);
@@ -45,9 +44,8 @@
     private KafkaStreams streams;
     private boolean uncaughtException;
 
-    EosTestClient(final String kafka, final Properties properties, final 
boolean withRepartitioning) {
+    EosTestClient(final Properties properties, final boolean 
withRepartitioning) {
         super();
-        this.kafka = kafka;
         this.properties = properties;
         this.withRepartitioning = withRepartitioning;
     }
@@ -79,7 +77,7 @@ public void run() {
             if (streams == null) {
                 uncaughtException = false;
 
-                streams = createKafkaStreams(properties, kafka);
+                streams = createKafkaStreams(properties);
                 streams.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
                     @Override
                     public void uncaughtException(final Thread t, final 
Throwable e) {
@@ -112,10 +110,8 @@ public void onChange(KafkaStreams.State newState, 
KafkaStreams.State oldState) {
         }
     }
 
-    private KafkaStreams createKafkaStreams(final Properties props,
-                                            final String kafka) {
+    private KafkaStreams createKafkaStreams(final Properties props) {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
         props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 2936c63d8f8..fdfe9e0dd6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -44,20 +44,19 @@
 
 public class SmokeTestClient extends SmokeTestUtil {
 
-    private final String kafka;
     private final Properties streamsProperties;
-    private KafkaStreams streams;
+
     private Thread thread;
+    private KafkaStreams streams;
     private boolean uncaughtException = false;
 
-    public SmokeTestClient(final Properties streamsProperties, final String 
kafka) {
+    public SmokeTestClient(final Properties streamsProperties) {
         super();
-        this.kafka = kafka;
         this.streamsProperties = streamsProperties;
     }
 
     public void start() {
-        streams = createKafkaStreams(streamsProperties, kafka);
+        streams = createKafkaStreams(streamsProperties);
         streams.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {
@@ -97,26 +96,26 @@ public void close() {
         }
     }
 
-    private static Properties getStreamsConfig(final Properties props, final 
String kafka) {
-        final Properties config = new Properties(props);
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
-        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        config.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
-        config.put(ProducerConfig.ACKS_CONFIG, "all");
+    private static Properties getStreamsConfig(final Properties props) {
+        final Properties fullProps = new Properties(props);
+        fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
+        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
+        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 
100);
+        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
+        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        fullProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
+
         //TODO remove this config or set to smaller value when KIP-91 is merged
-        
config.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
 80000);
+        
fullProps.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
 80000);
 
-        config.putAll(props);
-        return config;
+        fullProps.putAll(props);
+        return fullProps;
     }
 
-    private static KafkaStreams createKafkaStreams(final Properties props, 
final String kafka) {
+    private static KafkaStreams createKafkaStreams(final Properties props) {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = 
Consumed.with(stringSerde, intSerde);
         final KStream<String, Integer> source = builder.stream("data", 
stringIntConsumed);
@@ -252,7 +251,7 @@ public Double apply(final Long value1, final Long value2) {
             .toStream()
             .to("tagg", Produced.with(stringSerde, longSerde));
 
-        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), 
getStreamsConfig(props, kafka));
+        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), 
getStreamsConfig(props));
         streamsClient.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index fc7a26ee7a2..50330a08e61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -96,14 +96,15 @@ public void run() {
         };
 
         final Properties props = new Properties();
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, 
"1").getAbsolutePath());
-        SmokeTestClient streams1 = new SmokeTestClient(props, kafka);
+        SmokeTestClient streams1 = new SmokeTestClient(props);
         props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, 
"2").getAbsolutePath());
-        SmokeTestClient streams2 = new SmokeTestClient(props, kafka);
+        SmokeTestClient streams2 = new SmokeTestClient(props);
         props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, 
"3").getAbsolutePath());
-        SmokeTestClient streams3 = new SmokeTestClient(props, kafka);
+        SmokeTestClient streams3 = new SmokeTestClient(props);
         props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, 
"4").getAbsolutePath());
-        SmokeTestClient streams4 = new SmokeTestClient(props, kafka);
+        SmokeTestClient streams4 = new SmokeTestClient(props);
 
         System.out.println("starting the driver");
         driver.start();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index 5219c953a47..8adf43acc15 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -45,17 +45,24 @@
     private static final String SINK_TOPIC = "streamsResilienceSink";
 
     public static void main(String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsBrokerDownResilienceTest are expecting 
two parameters: propFile, additionalConfigs; but only see " + args.length + " 
parameter");
+            System.exit(1);
+        }
 
         System.out.println("StreamsTest instance started");
 
-        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final String additionalConfigs = args.length > 2 ? args[2] : null;
-
-        final Serde<String> stringSerde = Serdes.String();
+        final String propFileName = args[0];
+        final String additionalConfigs = args[1];
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
-        streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        final String kafka = 
streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + 
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
+
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"kafka-streams-resilience");
         streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
@@ -79,7 +86,10 @@ public static void main(String[] args) throws IOException {
 
             System.exit(1);
         }
+
         final StreamsBuilder builder = new StreamsBuilder();
+        final Serde<String> stringSerde = Serdes.String();
+
         builder.stream(Collections.singletonList(SOURCE_TOPIC_1), 
Consumed.with(stringSerde, stringSerde))
             .peek(new ForeachAction<String, String>() {
                 int messagesProcessed = 0;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
index c5195cb0768..47a78bdbb95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -28,11 +29,21 @@
      *  command := "run" | "process" | "verify"
      */
     public static void main(final String[] args) throws IOException {
-        final String kafka = args[0];
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final String command = args.length > 2 ? args[2] : null;
+        if (args.length < 2) {
+            System.err.println("StreamsEosTest are expecting two parameters: 
propFile, command; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
+        final String propFileName = args[0];
+        final String command = args[1];
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = 
streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + 
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
 
         System.out.println("StreamsTest instance started");
         System.out.println("kafka=" + kafka);
@@ -49,10 +60,10 @@ public static void main(final String[] args) throws 
IOException {
                 EosTestDriver.generate(kafka);
                 break;
             case "process":
-                new EosTestClient(kafka, streamsProperties, false).start();
+                new EosTestClient(streamsProperties, false).start();
                 break;
             case "process-complex":
-                new EosTestClient(kafka, streamsProperties, true).start();
+                new EosTestClient(streamsProperties, true).start();
                 break;
             case "verify":
                 EosTestDriver.verify(kafka, false);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
index 85ca077cca0..aa828e89d05 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
@@ -25,7 +25,9 @@
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
@@ -39,11 +41,24 @@
     private static volatile boolean keepProducing = true;
     private volatile static int messageCounter = 0;
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsStandByReplicaTest are expecting two 
parameters: propFile, additionalConfigs; but only see " + args.length + " 
parameter");
+            System.exit(1);
+        }
+
         System.out.println("StreamsTest instance started");
 
-        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String configString = args.length > 2 ? args[2] : null;
+        final String propFileName = args[0];
+        final String configString = args[1];
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = 
streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + 
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
 
         final Map<String, String> configs = 
SystemTestUtil.parseConfigs(configString);
         System.out.println("Using provided configs " + configs);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index 41c3f6c58f0..2409bd59643 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -30,17 +32,26 @@
      *
      * @param args
      */
-    public static void main(final String[] args) throws Exception {
-        final String kafka = args[0];
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final String command = args.length > 2 ? args[2] : null;
+    public static void main(final String[] args) throws InterruptedException, 
IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsSmokeTest are expecting two parameters: 
propFile, command; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
+        final String propFileName = args[0];
+        final String command = args[1];
         final boolean disableAutoTerminate = args.length > 3;
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = 
streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + 
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
 
         System.out.println("StreamsTest instance started (StreamsSmokeTest)");
         System.out.println("command=" + command);
-        System.out.println("kafka=" + kafka);
         System.out.println("props=" + streamsProperties);
         System.out.println("disableAutoTerminate=" + disableAutoTerminate);
 
@@ -61,7 +72,7 @@ public static void main(final String[] args) throws Exception 
{
                 break;
             case "process":
                 // this starts a KafkaStreams client
-                final SmokeTestClient client = new 
SmokeTestClient(streamsProperties, kafka);
+                final SmokeTestClient client = new 
SmokeTestClient(streamsProperties);
                 client.start();
                 break;
             case "close-deadlock-test":
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
index 1f44d617edc..a5bb4d52602 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -43,18 +43,18 @@
 public class StreamsStandByReplicaTest {
 
     public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsStandByReplicaTest are expecting two 
parameters: propFile, additionalConfigs; but only see " + args.length + " 
parameter");
+            System.exit(1);
+        }
 
         System.out.println("StreamsTest instance started");
 
-        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final String additionalConfigs = args.length > 2 ? args[2] : null;
-
-        final Serde<String> stringSerde = Serdes.String();
+        final String propFileName = args[0];
+        final String additionalConfigs = args[1];
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"kafka-streams-standby-tasks");
-        streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
0);
@@ -95,11 +95,13 @@ public static void main(final String[] args) throws 
IOException {
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        String inMemoryStoreName = "in-memory-store";
-        String persistentMemoryStoreName = "persistent-memory-store";
+        final String inMemoryStoreName = "in-memory-store";
+        final String persistentMemoryStoreName = "persistent-memory-store";
+
+        final KeyValueBytesStoreSupplier inMemoryStoreSupplier = 
Stores.inMemoryKeyValueStore(inMemoryStoreName);
+        final KeyValueBytesStoreSupplier persistentStoreSupplier = 
Stores.persistentKeyValueStore(persistentMemoryStoreName);
 
-        KeyValueBytesStoreSupplier inMemoryStoreSupplier = 
Stores.inMemoryKeyValueStore(inMemoryStoreName);
-        KeyValueBytesStoreSupplier persistentStoreSupplier = 
Stores.persistentKeyValueStore(persistentMemoryStoreName);
+        final Serde<String> stringSerde = Serdes.String();
 
         KStream<String, String> inputStream = builder.stream(sourceTopic, 
Consumed.with(stringSerde, stringSerde));
 
diff --git 
a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py 
b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index 06aec1448dd..ed9370195a0 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -23,6 +23,17 @@
 from kafkatest.services.kafka import KafkaService
 from kafkatest.version import DEV_BRANCH
 
+STREAMS_SIMPLE_TESTS = ["streamprocess", "streamprocesswithsink", 
"streamprocesswithstatestore"]
+STREAMS_COUNT_TESTS = ["streamcount", "streamcountwindowed", 
"streamprocesswithstatestore"]
+STREAMS_JOIN_TESTS = ["streamtablejoin", "streamstreamjoin", "tabletablejoin"]
+NON_STREAMS_TESTS = ["consume", "consumeproduce"]
+
+ALL_TEST = "all"
+STREAMS_SIMPLE_TEST = "streams-simple"
+STREAMS_COUNT_TEST = "streams-count"
+STREAMS_JOIN_TEST = "streams-join"
+
+
 class StreamsSimpleBenchmarkTest(Test):
     """
     Simple benchmark of Kafka Streams.
@@ -30,19 +41,24 @@ class StreamsSimpleBenchmarkTest(Test):
 
     def __init__(self, test_context):
         super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
+
+        # these values could be updated in ad-hoc benchmarks
+        self.key_skew = 0
+        self.value_size = 1024
         self.num_records = 10000000L
-        self.replication = 1
         self.num_threads = 1
 
-    @cluster(num_nodes=9)
-    @matrix(test=["count", "processstream", "processstreamwithsink", 
"processstreamwithstatestore", "processstreamwithcachedstatestore", 
"kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
+        self.replication = 1
+
+    @cluster(num_nodes=12)
+    @matrix(test=["consume", "consumeproduce", "streams-simple", 
"streams-count", "streams-join"], scale=[1])
     def test_simple_benchmark(self, test, scale):
         """
         Run simple Kafka Streams benchmark
         """
         self.driver = [None] * (scale + 1)
-        node = [None] * (scale)
-        data = [None] * (scale)
+
+        self.final = {}
 
         #############
         # SETUP PHASE
@@ -50,63 +66,95 @@ def test_simple_benchmark(self, test, scale):
         self.zk = ZookeeperService(self.test_context, num_nodes=1)
         self.zk.start()
         self.kafka = KafkaService(self.test_context, num_nodes=scale, 
zk=self.zk, version=DEV_BRANCH, topics={
-            'simpleBenchmarkSourceTopic' : { 'partitions': scale, 
'replication-factor': self.replication },
-            'countTopic' : { 'partitions': scale, 'replication-factor': 
self.replication },
-            'simpleBenchmarkSinkTopic' : { 'partitions': scale, 
'replication-factor': self.replication },
-            'joinSourceTopic1KStreamKStream' : { 'partitions': scale, 
'replication-factor': self.replication },
-            'joinSourceTopic2KStreamKStream' : { 'partitions': scale, 
'replication-factor': self.replication },
-            'joinSourceTopic1KStreamKTable' : { 'partitions': scale, 
'replication-factor': self.replication },
-            'joinSourceTopic2KStreamKTable' : { 'partitions': scale, 
'replication-factor': self.replication },
-            'joinSourceTopic1KTableKTable' : { 'partitions': scale, 
'replication-factor': self.replication },
-            'joinSourceTopic2KTableKTable' : { 'partitions': scale, 
'replication-factor': self.replication },
-            'yahooCampaigns' : { 'partitions': 20, 'replication-factor': 
self.replication },
-            'yahooEvents' : { 'partitions': 20, 'replication-factor': 
self.replication }
+            'simpleBenchmarkSourceTopic1' : { 'partitions': scale, 
'replication-factor': self.replication, "config": 
"message.timestamp.type=LogAppendTime" },
+            'simpleBenchmarkSourceTopic2' : { 'partitions': scale, 
'replication-factor': self.replication, "config": 
"message.timestamp.type=LogAppendTime" },
+            'simpleBenchmarkSinkTopic' : { 'partitions': scale, 
'replication-factor': self.replication, "config": 
"message.timestamp.type=LogAppendTime" },
+            'yahooCampaigns' : { 'partitions': 20, 'replication-factor': 
self.replication, "config": "message.timestamp.type=LogAppendTime" },
+            'yahooEvents' : { 'partitions': 20, 'replication-factor': 
self.replication, "config": "message.timestamp.type=LogAppendTime" }
         })
         self.kafka.log_level = "INFO"
         self.kafka.start()
- 
+
+
+        load_test = ""
+        if test == ALL_TEST:
+            load_test = "load-two"
+        if test in STREAMS_JOIN_TESTS or test == STREAMS_JOIN_TEST:
+            load_test = "load-two"
+        if test in STREAMS_COUNT_TESTS or test == STREAMS_COUNT_TEST:
+            load_test = "load-one"
+        if test in STREAMS_SIMPLE_TESTS or test == STREAMS_SIMPLE_TEST:
+            load_test = "load-one"
+        if test in NON_STREAMS_TESTS:
+            load_test = "load-one"
+
+
+
         ################
         # LOAD PHASE
         ################
-        self.load_driver = StreamsSimpleBenchmarkService(self.test_context, 
self.kafka,
-                                                         self.num_records * 
scale, "true", test,
-                                                         self.num_threads)
+        self.load_driver = StreamsSimpleBenchmarkService(self.test_context,
+                                                         self.kafka,
+                                                         load_test,
+                                                         self.num_threads,
+                                                         self.num_records,
+                                                         self.key_skew,
+                                                         self.value_size)
+
         self.load_driver.start()
-        self.load_driver.wait()
+        self.load_driver.wait(3600) # wait at most 30 minutes
         self.load_driver.stop()
 
+        if test == ALL_TEST:
+            for single_test in STREAMS_SIMPLE_TESTS + STREAMS_COUNT_TESTS + 
STREAMS_JOIN_TESTS:
+                self.run_test(single_test, scale)
+        elif test == STREAMS_SIMPLE_TEST:
+            for single_test in STREAMS_SIMPLE_TESTS:
+                self.run_test(single_test, scale)
+        elif test == STREAMS_COUNT_TEST:
+            for single_test in STREAMS_COUNT_TESTS:
+                self.run_test(single_test, scale)
+        elif test == STREAMS_JOIN_TEST:
+            for single_test in STREAMS_JOIN_TESTS:
+                self.run_test(single_test, scale)
+        else:
+            self.run_test(test, scale)
+
+        return self.final
 
+    def run_test(self, test, scale):
 
         ################
         # RUN PHASE
         ################
         for num in range(0, scale):
-            self.driver[num] = 
StreamsSimpleBenchmarkService(self.test_context, self.kafka,
-                                                             
self.num_records/(scale), "false", test,
-                                                             self.num_threads)
+            self.driver[num] = StreamsSimpleBenchmarkService(self.test_context,
+                                                             self.kafka,
+                                                             test,
+                                                             self.num_threads,
+                                                             self.num_records,
+                                                             self.key_skew,
+                                                             self.value_size)
             self.driver[num].start()
 
         #######################
         # STOP + COLLECT PHASE
         #######################
-        for num in range(0, scale):    
-            self.driver[num].wait()    
+        data = [None] * (scale)
+
+        for num in range(0, scale):
+            self.driver[num].wait()
             self.driver[num].stop()
-            node[num] = self.driver[num].node
-            node[num].account.ssh("grep Performance %s" % 
self.driver[num].STDOUT_FILE, allow_fail=False)
-            data[num] = self.driver[num].collect_data(node[num], "" )
+            self.driver[num].node.account.ssh("grep Performance %s" % 
self.driver[num].STDOUT_FILE, allow_fail=False)
+            data[num] = self.driver[num].collect_data(self.driver[num].node, 
"")
             self.driver[num].read_jmx_output_all_nodes()
 
-
-        final = {}
         for num in range(0, scale):
             for key in data[num]:
-                final[key + str(num)] = data[num][key]
+                self.final[key + "-" + str(num)] = data[num][key]
 
             for key in sorted(self.driver[num].jmx_stats[0]):
                 self.logger.info("%s: %s" % (key, 
self.driver[num].jmx_stats[0][key]))
 
-            final["jmx-avg" + str(num)] = self.driver[num].average_jmx_value
-            final["jmx-max" + str(num)] = self.driver[num].maximum_jmx_value
-
-        return final
+            self.final[test + "-jmx-avg-" + str(num)] = 
self.driver[num].average_jmx_value
+            self.final[test + "-jmx-max-" + str(num)] = 
self.driver[num].maximum_jmx_value
diff --git a/tests/kafkatest/services/performance/streams_performance.py 
b/tests/kafkatest/services/performance/streams_performance.py
index 9f791811c55..e2dd15bf16d 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -15,25 +15,28 @@
 
 from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.streams import StreamsTestBaseService
+from kafkatest.services.kafka import KafkaConfig
+from kafkatest.services import streams_property
 
 #
 # Class used to start the simple Kafka Streams benchmark
 #
+
 class StreamsSimpleBenchmarkService(StreamsTestBaseService):
     """Base class for simple Kafka Streams benchmark"""
 
-    def __init__(self, test_context, kafka, numrecs, load_phase, test_name, 
num_threads):
+    def __init__(self, test_context, kafka, test_name, num_threads, 
num_recs_or_wait_ms, key_skew, value_size):
         super(StreamsSimpleBenchmarkService, self).__init__(test_context,
                                                             kafka,
                                                             
"org.apache.kafka.streams.perf.SimpleBenchmark",
-                                                            numrecs,
-                                                            load_phase,
                                                             test_name,
-                                                            num_threads)
-
-        self.load_phase = load_phase
+                                                            
num_recs_or_wait_ms,
+                                                            key_skew,
+                                                            value_size)
 
-        if self.load_phase == "false":
+        self.jmx_option = ""
+        if test_name.startswith('stream') or test_name.startswith('table'):
+            self.jmx_option = "stream-jmx"
             JmxMixin.__init__(self,
                               num_nodes=1,
                               
jmx_object_names=['kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-%d'
 %(i+1) for i in range(num_threads)],
@@ -45,13 +48,27 @@ def __init__(self, test_context, kafka, numrecs, 
load_phase, test_name, num_thre
                                               'poll-rate'],
                               root=StreamsTestBaseService.PERSISTENT_ROOT)
 
-    def start_cmd(self, node):
-        cmd = super(StreamsSimpleBenchmarkService, self).start_cmd(node)
+        if test_name.startswith('consume'):
+            self.jmx_option = "consumer-jmx"
+            JmxMixin.__init__(self,
+                              num_nodes=1,
+                              
jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=simple-benchmark-consumer'],
+                              jmx_attributes=['records-consumed-rate'],
+                              root=StreamsTestBaseService.PERSISTENT_ROOT)
 
-        if self.load_phase == "false":
+        self.num_threads = num_threads
+
+    def prop_file(self):
+        cfg = KafkaConfig(**{streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+                             streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
+                             streams_property.NUM_THREADS: self.num_threads})
+        return cfg.render()
+
+
+    def start_cmd(self, node):
+        if self.jmx_option != "":
             args = self.args.copy()
             args['jmx_port'] = self.jmx_port
-            args['kafka'] = self.kafka.bootstrap_servers()
             args['config_file'] = self.CONFIG_FILE
             args['stdout'] = self.STDOUT_FILE
             args['stderr'] = self.STDERR_FILE
@@ -61,23 +78,24 @@ def start_cmd(self, node):
 
             cmd = "( export JMX_PORT=%(jmx_port)s; export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
                   "INCLUDE_TEST_JARS=true %(kafka_run_class)s 
%(streams_class_name)s " \
-                  " %(kafka)s %(config_file)s %(user_test_args)s 
%(user_test_args1)s %(user_test_args2)s" \
-                  " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> 
%(stderr)s 3> %(pidfile)s" % args
+                  " %(config_file)s %(user_test_args1)s %(user_test_args2)s 
%(user_test_args3)s" \
+                  " %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> 
%(stderr)s 3> %(pidfile)s" % args
 
-        self.logger.info("Executing streams simple benchmark cmd: " + cmd)
+        else:
+            cmd = super(StreamsSimpleBenchmarkService, self).start_cmd(node)
 
         return cmd
 
     def start_node(self, node):
         super(StreamsSimpleBenchmarkService, self).start_node(node)
 
-        if self.load_phase == "false":
+        if self.jmx_option != "":
             self.start_jmx_tool(1, node)
 
-
     def clean_node(self, node):
-        if self.load_phase == "false":
+        if self.jmx_option != "":
             JmxMixin.clean_node(self, node)
+
         super(StreamsSimpleBenchmarkService, self).clean_node(node)
 
     def collect_data(self, node, tag = None):
diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index a5be816c737..796ca31ea4c 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -16,6 +16,8 @@
 import os.path
 import signal
 
+import streams_property
+
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
@@ -23,7 +25,6 @@
 from kafkatest.services.kafka import KafkaConfig
 from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
 
-STATE_DIR = "state.dir"
 
 class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
     """Base class for Streams Test services providing some common settings and 
functionality"""
@@ -168,14 +169,14 @@ class StreamsTestBaseService(KafkaPathResolverMixin, 
JmxMixin, Service):
             "collect_default": True},
     }
 
-    def __init__(self, test_context, kafka, streams_class_name, 
user_test_args, user_test_args1=None, user_test_args2=None, 
user_test_args3=None):
+    def __init__(self, test_context, kafka, streams_class_name, 
user_test_args1, user_test_args2=None, user_test_args3=None, 
user_test_args4=None):
         Service.__init__(self, test_context, num_nodes=1)
         self.kafka = kafka
         self.args = {'streams_class_name': streams_class_name,
-                     'user_test_args': user_test_args,
                      'user_test_args1': user_test_args1,
                      'user_test_args2': user_test_args2,
-                     'user_test_args3': user_test_args3}
+                     'user_test_args3': user_test_args3,
+                     'user_test_args4': user_test_args4}
         self.log_level = "DEBUG"
 
     @property
@@ -236,7 +237,6 @@ def clean_node(self, node):
 
     def start_cmd(self, node):
         args = self.args.copy()
-        args['kafka'] = self.kafka.bootstrap_servers()
         args['config_file'] = self.CONFIG_FILE
         args['stdout'] = self.STDOUT_FILE
         args['stderr'] = self.STDERR_FILE
@@ -246,15 +246,15 @@ def start_cmd(self, node):
 
         cmd = "( export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true %(kafka_run_class)s 
%(streams_class_name)s " \
-              " %(kafka)s %(config_file)s %(user_test_args)s 
%(user_test_args1)s %(user_test_args2)s" \
-              " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> 
%(stderr)s 3> %(pidfile)s" % args
+              " %(config_file)s %(user_test_args1)s %(user_test_args2)s 
%(user_test_args3)s" \
+              " %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> 
%(stderr)s 3> %(pidfile)s" % args
 
         self.logger.info("Executing streams cmd: " + cmd)
 
         return cmd
 
     def prop_file(self):
-        cfg = KafkaConfig(**{STATE_DIR: self.PERSISTENT_ROOT})
+        cfg = KafkaConfig(**{streams_property.STATE_DIR: self.PERSISTENT_ROOT, 
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()})
         return cfg.render()
 
     def start_node(self, node):
@@ -374,7 +374,6 @@ def __init__(self, test_context, kafka, configs):
 
     def start_cmd(self, node):
         args = self.args.copy()
-        args['kafka'] = self.kafka.bootstrap_servers(validate=False)
         args['config_file'] = self.CONFIG_FILE
         args['stdout'] = self.STDOUT_FILE
         args['stderr'] = self.STDERR_FILE
@@ -384,8 +383,8 @@ def start_cmd(self, node):
 
         cmd = "( export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true %(kafka_run_class)s 
%(streams_class_name)s " \
-              " %(kafka)s %(config_file)s %(user_test_args)s 
%(user_test_args1)s %(user_test_args2)s" \
-              " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> 
%(stderr)s 3> %(pidfile)s" % args
+              " %(config_file)s %(user_test_args1)s %(user_test_args2)s 
%(user_test_args3)s" \
+              " %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> 
%(stderr)s 3> %(pidfile)s" % args
 
         self.logger.info("Executing: " + cmd)
 
diff --git a/tests/kafkatest/services/streams_property.py 
b/tests/kafkatest/services/streams_property.py
new file mode 100644
index 00000000000..054ea646250
--- /dev/null
+++ b/tests/kafkatest/services/streams_property.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Define Streams configuration property names here.
+"""
+
+STATE_DIR = "state.dir"
+KAFKA_SERVERS = "bootstrap.servers"
+NUM_THREADS = "num.stream.threads"
+
+
+
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Re-write simple benchmark in system tests with JMXTool
> ------------------------------------------------------
>
>                 Key: KAFKA-6611
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6611
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>             Fix For: 1.2.0
>
>
> The current SimpleBenchmark is recording the num.records actively in order to 
> compute throughput and latency, which introduces extra cost plus is less 
> accurate for benchmarking purposes; instead, it's better to use JmxMixin with 
> SimpleBenchmark to record metrics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to