Repository: kafka Updated Branches: refs/heads/trunk adb70da13 -> b7378d567
MINOR: Standardised benchmark params for consumer and streams There were some minor differences in the basic consumer config and streams config that are now rectified. In addition, in AWS environments the socket size makes a big difference to performance and I've tuned it up accordingly. I've also increased the number of records now that perf is higher. Author: Eno Thereska <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #2634 from enothereska/minor-standardize-params Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b7378d56 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b7378d56 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b7378d56 Branch: refs/heads/trunk Commit: b7378d567fffd06395f5babc36cebd64bdf539d1 Parents: adb70da Author: Eno Thereska <[email protected]> Authored: Sat Mar 4 20:55:16 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Sat Mar 4 20:55:16 2017 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/perf/SimpleBenchmark.java | 17 ++++++++++++++++- .../streams/streams_simple_benchmark_test.py | 2 +- tests/kafkatest/services/streams.py | 2 +- 3 files changed, 18 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index cf593e2..7a36d70 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -104,6 +104,9 @@ public class SimpleBenchmark { private static int processedRecords = 0; private static long processedBytes = 0; private static final int VALUE_SIZE = 100; + private static final long POLL_MS = 500L; + private static final int MAX_POLL_RECORDS = 1000; + private static final int SOCKET_SIZE_BYTES = 1 * 1024 * 1024; private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray(); private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer(); @@ -207,8 +210,13 @@ public class SimpleBenchmark { props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + // the socket buffer needs to be large, especially when running in AWS with + // high latency. if running locally the default is fine. + props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass()); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); + props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS); return props; } @@ -218,9 +226,16 @@ public class SimpleBenchmark { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + // the socket buffer needs to be large, especially when running in AWS with + // high latency. if running locally the default is fine. + props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + // the socket buffer needs to be large, especially when running in AWS with + // high latency. if running locally the default is fine. + props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); return props; } @@ -516,7 +531,7 @@ public class SimpleBenchmark { long startTime = System.currentTimeMillis(); while (true) { - ConsumerRecords<Integer, byte[]> records = consumer.poll(500); + ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS); if (records.isEmpty()) { if (processedRecords == numRecords) break; http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py index c1db8c8..c9f970e 100644 --- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -30,7 +30,7 @@ class StreamsSimpleBenchmarkTest(Test): def __init__(self, test_context): super(StreamsSimpleBenchmarkTest, self).__init__(test_context) - self.num_records = 2000000L + self.num_records = 10000000L self.replication = 1 http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/tests/kafkatest/services/streams.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 1e1c676..4f8f1a3 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -97,7 +97,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): self.logger.info("Restarting Kafka Streams on " + str(node.account)) self.start_node(node) - def wait(self, timeout_sec=360): + def wait(self, timeout_sec=720): for node in self.nodes: self.wait_node(node, timeout_sec)
