Repository: kafka
Updated Branches:
  refs/heads/trunk adb70da13 -> b7378d567


MINOR: Standardised benchmark params for consumer and streams

There were some minor differences in the basic consumer config and streams 
config that are now rectified. In addition, in AWS environments the socket size 
makes a big difference to performance and I've tuned it up accordingly. I've 
also increased the number of records now that perf is higher.

Author: Eno Thereska <[email protected]>

Reviewers: Guozhang Wang <[email protected]>

Closes #2634 from enothereska/minor-standardize-params


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b7378d56
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b7378d56
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b7378d56

Branch: refs/heads/trunk
Commit: b7378d567fffd06395f5babc36cebd64bdf539d1
Parents: adb70da
Author: Eno Thereska <[email protected]>
Authored: Sat Mar 4 20:55:16 2017 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Sat Mar 4 20:55:16 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/perf/SimpleBenchmark.java | 17 ++++++++++++++++-
 .../streams/streams_simple_benchmark_test.py       |  2 +-
 tests/kafkatest/services/streams.py                |  2 +-
 3 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index cf593e2..7a36d70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -104,6 +104,9 @@ public class SimpleBenchmark {
     private static int processedRecords = 0;
     private static long processedBytes = 0;
     private static final int VALUE_SIZE = 100;
+    private static final long POLL_MS = 500L;
+    private static final int MAX_POLL_RECORDS = 1000;
+    private static final int SOCKET_SIZE_BYTES = 1 * 1024 * 1024;
 
     private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
     private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
@@ -207,8 +210,13 @@ public class SimpleBenchmark {
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        // the socket buffer needs to be large, especially when running in AWS 
with
+        // high latency. if running locally the default is fine.
+        props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArray().getClass());
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
+        props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
         return props;
     }
 
@@ -218,9 +226,16 @@ public class SimpleBenchmark {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        // the socket buffer needs to be large, especially when running in AWS 
with
+        // high latency. if running locally the default is fine.
+        props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
IntegerDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        // the socket buffer needs to be large, especially when running in AWS 
with
+        // high latency. if running locally the default is fine.
+        props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
         return props;
     }
 
@@ -516,7 +531,7 @@ public class SimpleBenchmark {
         long startTime = System.currentTimeMillis();
 
         while (true) {
-            ConsumerRecords<Integer, byte[]> records = consumer.poll(500);
+            ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
             if (records.isEmpty()) {
                 if (processedRecords == numRecords)
                     break;

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git 
a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py 
b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index c1db8c8..c9f970e 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -30,7 +30,7 @@ class StreamsSimpleBenchmarkTest(Test):
 
     def __init__(self, test_context):
         super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
-        self.num_records = 2000000L
+        self.num_records = 10000000L
         self.replication = 1
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index 1e1c676..4f8f1a3 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -97,7 +97,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
             self.logger.info("Restarting Kafka Streams on " + 
str(node.account))
             self.start_node(node)
 
-    def wait(self, timeout_sec=360):
+    def wait(self, timeout_sec=720):
         for node in self.nodes:
             self.wait_node(node, timeout_sec)
 

Reply via email to