Repository: kafka
Updated Branches:
  refs/heads/trunk a81ad2582 -> a5f1158c3


KAFKA-3558; Add compression_type parameter to benchmarks in benchmark_test.py

* Use a fixed `Random` seed in `EndToEndLatency.scala` for determinism
* Add `compression_type` to and remove `consumer_fetch_max_wait` from 
`end_to_end_latency.py`. The latter was never used.
* Tweak logging of `end_to_end_latency.py` to be similar to 
`consumer_performance.py`.
* Add `compression_type` to `benchmark_test.py` methods and add `snappy` to 
`matrix` annotation
* Use randomly generated bytes from a restricted range for 
`ProducerPerformance` payload. This is a simple fix for now. It can be improved 
in the PR for KAFKA-3554.

Author: Ismael Juma <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes #1225 from ijuma/kafka-3558-add-compression_type-benchmark_test.py


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5f1158c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5f1158c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5f1158c

Branch: refs/heads/trunk
Commit: a5f1158c317e22a79c4186d1acb04fb25ce6e56a
Parents: a81ad25
Author: Ismael Juma <[email protected]>
Authored: Mon Apr 18 14:23:46 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Mon Apr 18 14:23:46 2016 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsumerPerformance.scala | 14 ++--
 .../scala/kafka/tools/EndToEndLatency.scala     | 18 +++--
 .../kafkatest/benchmarks/core/benchmark_test.py | 56 +++++++++----
 .../services/performance/end_to_end_latency.py  | 84 +++++++++++---------
 .../apache/kafka/tools/ProducerPerformance.java |  7 +-
 5 files changed, 105 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a5f1158c/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala 
b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index a38c04b..6480ff5 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -19,8 +19,6 @@ package kafka.tools
 
 import java.util
 
-import org.apache.kafka.common.TopicPartition
-
 import scala.collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicLong
 import java.nio.channels.ClosedByInterruptException
@@ -85,10 +83,9 @@ object ConsumerPerformance {
         thread.start
       for (thread <- threadList)
         thread.join
-      if(consumerTimeout.get())
-       endMs = System.currentTimeMillis - consumerConfig.consumerTimeoutMs
-      else
-       endMs = System.currentTimeMillis
+      endMs =
+        if (consumerTimeout.get()) System.currentTimeMillis - 
consumerConfig.consumerTimeoutMs
+        else System.currentTimeMillis
       consumerConnector.shutdown()
     }
     val elapsedSecs = (endMs - startMs) / 1000.0
@@ -279,9 +276,8 @@ object ConsumerPerformance {
       } catch {
         case _: InterruptedException =>
         case _: ClosedByInterruptException =>
-        case _: ConsumerTimeoutException => {
-          consumerTimeout.set(true);
-        }
+        case _: ConsumerTimeoutException =>
+          consumerTimeout.set(true)
         case e: Throwable => e.printStackTrace()
       }
       totalMessagesRead.addAndGet(messagesRead)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5f1158c/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala 
b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 584d4fb..1c92088 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.JavaConversions._
+import scala.util.Random
 
 
 /**
@@ -43,7 +44,7 @@ object EndToEndLatency {
 
   def main(args: Array[String]) {
     if (args.length != 5 && args.length != 6) {
-      System.err.println("USAGE: java " + getClass.getName + " broker_list 
topic num_messages producer_acks message_size_bytes [optional] 
ssl_properties_file")
+      System.err.println("USAGE: java " + getClass.getName + " broker_list 
topic num_messages producer_acks message_size_bytes [optional] properties_file")
       System.exit(1)
     }
 
@@ -52,12 +53,14 @@ object EndToEndLatency {
     val numMessages = args(2).toInt
     val producerAcks = args(3)
     val messageLen = args(4).toInt
-    val sslPropsFile = if (args.length == 6) args(5) else ""
+    val propsFile = if (args.length > 5) Some(args(5)).filter(_.nonEmpty) else 
None
 
     if (!List("1", "all").contains(producerAcks))
       throw new IllegalArgumentException("Latency testing requires synchronous 
acknowledgement. Please use 1 or all")
 
-    val consumerProps = if (sslPropsFile.equals("")) new Properties() else 
Utils.loadProps(sslPropsFile)
+    def loadProps: Properties = propsFile.map(Utils.loadProps).getOrElse(new 
Properties())
+
+    val consumerProps = loadProps
     consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + 
System.currentTimeMillis())
     consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -69,7 +72,7 @@ object EndToEndLatency {
     val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
     consumer.subscribe(List(topic))
 
-    val producerProps = if (sslPropsFile.equals("")) new Properties() else 
Utils.loadProps(sslPropsFile)
+    val producerProps = loadProps
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes 
are synchronous
     producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 
Long.MaxValue.toString)
@@ -91,9 +94,10 @@ object EndToEndLatency {
 
     var totalTime = 0.0
     val latencies = new Array[Long](numMessages)
+    val random = new Random(0)
 
     for (i <- 0 until numMessages) {
-      val message = randomBytesOfLen(messageLen)
+      val message = randomBytesOfLen(random, messageLen)
       val begin = System.nanoTime
 
       //Send message (of random bytes) synchronously then immediately poll for 
it
@@ -141,7 +145,7 @@ object EndToEndLatency {
     finalise()
   }
 
-  def randomBytesOfLen(len: Int): Array[Byte] = {
-    Array.fill(len)((scala.util.Random.nextInt(26) + 65).toByte)
+  def randomBytesOfLen(random: Random, len: Int): Array[Byte] = {
+    Array.fill(len)((random.nextInt(26) + 65).toByte)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5f1158c/tests/kafkatest/benchmarks/core/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py 
b/tests/kafkatest/benchmarks/core/benchmark_test.py
index d252e5d..83f4b2a 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -68,9 +68,10 @@ class Benchmark(Test):
     @parametrize(acks=1, topic=TOPIC_REP_THREE)
     @parametrize(acks=-1, topic=TOPIC_REP_THREE)
     @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
-    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 
10000, 100000], security_protocol=['PLAINTEXT', 'SSL'])
-    def test_producer_throughput(self, acks, topic, num_producers=1, 
message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT',
-                                 client_version=str(TRUNK), 
broker_version=str(TRUNK)):
+    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 
10000, 100000], compression_type=["none", "snappy"], 
security_protocol=['PLAINTEXT', 'SSL'])
+    def test_producer_throughput(self, acks, topic, num_producers=1, 
message_size=DEFAULT_RECORD_SIZE,
+                                 compression_type="none", 
security_protocol='PLAINTEXT', client_version=str(TRUNK),
+                                 broker_version=str(TRUNK)):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Produce ~128MB worth of messages to a topic with 6 partitions. 
Required acks, topic replication factor,
@@ -91,15 +92,17 @@ class Benchmark(Test):
             num_records=nrecords, record_size=message_size,  throughput=-1, 
version=client_version,
             settings={
                 'acks': acks,
+                'compression.type': compression_type,
                 'batch.size': self.batch_size,
                 'buffer.memory': self.buffer_memory})
         self.producer.run()
         return compute_aggregate_throughput(self.producer)
 
     @parametrize(security_protocol='SSL', 
interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_long_term_producer_throughput(self, security_protocol, 
interbroker_security_protocol=None,
-                                           client_version=str(TRUNK), 
broker_version=str(TRUNK)):
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", 
"snappy"])
+    def test_long_term_producer_throughput(self, compression_type="none", 
security_protocol='PLAINTEXT',
+                                           interbroker_security_protocol=None, 
client_version=str(TRUNK),
+                                           broker_version=str(TRUNK)):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Produce 10e6 100 byte messages to a topic with 6 partitions, 
replication-factor 3, and acks=1.
@@ -117,7 +120,12 @@ class Benchmark(Test):
         self.producer = ProducerPerformanceService(
             self.test_context, 1, self.kafka,
             topic=TOPIC_REP_THREE, num_records=self.msgs_large, 
record_size=DEFAULT_RECORD_SIZE,
-            throughput=-1, version=client_version, settings={'acks': 1, 
'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
+            throughput=-1, version=client_version, settings={
+                'acks': 1,
+                'compression.type': compression_type,
+                'batch.size': self.batch_size,
+                'buffer.memory': self.buffer_memory
+            },
             intermediate_stats=True
         )
         self.producer.run()
@@ -146,9 +154,10 @@ class Benchmark(Test):
         return data
 
     @parametrize(security_protocol='SSL', 
interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 
'SASL_SSL'])
-    def test_end_to_end_latency(self, security_protocol, 
interbroker_security_protocol=None,
-                                client_version=str(TRUNK), 
broker_version=str(TRUNK)):
+    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 
'SASL_SSL'], compression_type=["none", "snappy"])
+    def test_end_to_end_latency(self, compression_type="none", 
security_protocol="PLAINTEXT",
+                                interbroker_security_protocol=None, 
client_version=str(TRUNK),
+                                broker_version=str(TRUNK)):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Produce (acks = 1) and consume 10e3 messages to a topic with 6 
partitions and replication-factor 3,
@@ -167,15 +176,17 @@ class Benchmark(Test):
         self.logger.info("BENCHMARK: End to end latency")
         self.perf = EndToEndLatencyService(
             self.test_context, 1, self.kafka,
-            topic=TOPIC_REP_THREE, num_records=10000, version=client_version
+            topic=TOPIC_REP_THREE, num_records=10000,
+            compression_type=compression_type, version=client_version
         )
         self.perf.run()
         return latency(self.perf.results[0]['latency_50th_ms'],  
self.perf.results[0]['latency_99th_ms'], 
self.perf.results[0]['latency_999th_ms'])
 
     @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
     @parametrize(security_protocol='SSL', 
interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_producer_and_consumer(self, security_protocol, 
interbroker_security_protocol=None, new_consumer=True,
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", 
"snappy"])
+    def test_producer_and_consumer(self, compression_type="none", 
security_protocol="PLAINTEXT",
+                                   interbroker_security_protocol=None, 
new_consumer=True,
                                    client_version=str(TRUNK), 
broker_version=str(TRUNK)):
         """
         Setup: 1 node zk + 3 node kafka cluster
@@ -198,7 +209,12 @@ class Benchmark(Test):
             self.test_context, 1, self.kafka,
             topic=TOPIC_REP_THREE,
             num_records=num_records, record_size=DEFAULT_RECORD_SIZE, 
throughput=-1, version=client_version,
-            settings={'acks': 1, 'batch.size': self.batch_size, 
'buffer.memory': self.buffer_memory}
+            settings={
+                'acks': 1,
+                'compression.type': compression_type,
+                'batch.size': self.batch_size,
+                'buffer.memory': self.buffer_memory
+            }
         )
         self.consumer = ConsumerPerformanceService(
             self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, 
new_consumer=new_consumer, messages=num_records)
@@ -216,8 +232,9 @@ class Benchmark(Test):
 
     @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
     @parametrize(security_protocol='SSL', 
interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_consumer_throughput(self, security_protocol, 
interbroker_security_protocol=None, new_consumer=True, num_consumers=1,
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", 
"snappy"])
+    def test_consumer_throughput(self, compression_type="none", 
security_protocol="PLAINTEXT",
+                                 interbroker_security_protocol=None, 
new_consumer=True, num_consumers=1,
                                  client_version=str(TRUNK), 
broker_version=str(TRUNK)):
         """
         Consume 10e6 100-byte messages with 1 or more consumers from a topic 
with 6 partitions
@@ -236,7 +253,12 @@ class Benchmark(Test):
             self.test_context, 1, self.kafka,
             topic=TOPIC_REP_THREE,
             num_records=num_records, record_size=DEFAULT_RECORD_SIZE, 
throughput=-1, version=client_version,
-            settings={'acks': 1, 'batch.size': self.batch_size, 
'buffer.memory': self.buffer_memory}
+            settings={
+                'acks': 1,
+                'compression.type': compression_type,
+                'batch.size': self.batch_size,
+                'buffer.memory': self.buffer_memory
+            }
         )
         self.producer.run()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5f1158c/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py 
b/tests/kafkatest/services/performance/end_to_end_latency.py
index 08eff70..6d21151 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -17,32 +17,53 @@ from kafkatest.services.performance import 
PerformanceService
 from kafkatest.services.security.security_config import SecurityConfig
 
 from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
+from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0, V_0_10_0_0
 
+import os
 
 class EndToEndLatencyService(PerformanceService):
     MESSAGE_BYTES = 21  # 0.8.X messages are fixed at 21 bytes, so we'll match 
that for other versions
 
+    # Root directory for persistent output
+    PERSISTENT_ROOT = "/mnt/end_to_end_latency"
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "end_to_end_latency.stdout")
+    STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "end_to_end_latency.stderr")
+    LOG_FILE = os.path.join(LOG_DIR, "end_to_end_latency.log")
+    LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "client.properties")
+
     logs = {
-        "end_to_end_latency_log": {
-            "path": "/mnt/end-to-end-latency.log",
+        "end_to_end_latency_output": {
+            "path": STDOUT_CAPTURE,
+            "collect_default": True},
+        "end_to_end_latency_stderr": {
+            "path": STDERR_CAPTURE,
             "collect_default": True},
+        "end_to_end_latency_log": {
+            "path": LOG_FILE,
+            "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, num_records, 
version=TRUNK, consumer_fetch_max_wait=100, acks=1):
+
+    def __init__(self, context, num_nodes, kafka, topic, num_records, 
compression_type="none", version=TRUNK, acks=1):
         super(EndToEndLatencyService, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()
 
         security_protocol = self.security_config.security_protocol
-        assert version >= V_0_9_0_0 or security_protocol == 
SecurityConfig.PLAINTEXT, \
-            "Security protocol %s is only supported if version >= 0.9.0.0, 
version %s" % (self.security_config, str(version))
+
+        if version < V_0_9_0_0:
+            assert security_protocol == SecurityConfig.PLAINTEXT, \
+                "Security protocol %s is only supported if version >= 0.9.0.0, 
version %s" % (self.security_config, str(version))
+            assert compression_type == "none", \
+                "Compression type %s is only supported if version >= 0.9.0.0, 
version %s" % (compression_type, str(version))
 
         self.args = {
             'topic': topic,
             'num_records': num_records,
-            'consumer_fetch_max_wait': consumer_fetch_max_wait,
             'acks': acks,
+            'compression_type': compression_type,
             'kafka_opts': self.security_config.kafka_opts,
             'message_bytes': EndToEndLatencyService.MESSAGE_BYTES
         }
@@ -50,56 +71,41 @@ class EndToEndLatencyService(PerformanceService):
         for node in self.nodes:
             node.version = version
 
-    @property
-    def security_config_file(self):
-        if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
-            security_config_file = SecurityConfig.CONFIG_DIR + 
"/security.properties"
-        else:
-            security_config_file = ""
-        return security_config_file
-
     def start_cmd(self, node):
         args = self.args.copy()
         args.update({
             'zk_connect': self.kafka.zk.connect_setting(),
             'bootstrap_servers': 
self.kafka.bootstrap_servers(self.security_config.security_protocol),
-            'security_config_file': self.security_config_file,
+            'config_file': EndToEndLatencyService.CONFIG_FILE,
             'kafka_dir': kafka_dir(node)
         })
 
+        cmd = "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % 
EndToEndLatencyService.LOG4J_CONFIG
         if node.version >= V_0_9_0_0:
-            """
-            val brokerList = args(0)
-            val topic = args(1)
-            val numMessages = args(2).toInt
-            val producerAcks = args(3)
-            val messageLen = args(4).toInt
-            """
-
-            cmd = "KAFKA_OPTS=%(kafka_opts)s 
/opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args
-            cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 
%(message_bytes)d %(security_config_file)s" % args
+            cmd += "KAFKA_OPTS=%(kafka_opts)s 
/opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args
+            cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 
%(message_bytes)d %(config_file)s" % args
         else:
-            """
-            val brokerList = args(0)
-            val zkConnect = args(1)
-            val topic = args(2)
-            val numMessages = args(3).toInt
-            val consumerFetchMaxWait = args(4).toInt
-            val producerAcks = args(5).toInt
-            """
-
             # Set fetch max wait to 0 to match behavior in later versions
-            cmd = "KAFKA_OPTS=%(kafka_opts)s 
/opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency " % 
args
+            cmd += "KAFKA_OPTS=%(kafka_opts)s 
/opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency " % 
args
             cmd += "%(bootstrap_servers)s %(zk_connect)s %(topic)s 
%(num_records)d 0 %(acks)d" % args
 
-        cmd += " | tee /mnt/end-to-end-latency.log"
+        cmd += " 2>> %(stderr)s | tee -a %(stdout)s" % {'stdout': 
EndToEndLatencyService.STDOUT_CAPTURE,
+                                                        'stderr': 
EndToEndLatencyService.STDERR_CAPTURE}
 
         return cmd
 
     def _worker(self, idx, node):
+        node.account.ssh("mkdir -p %s" % 
EndToEndLatencyService.PERSISTENT_ROOT, allow_fail=False)
+
+        log_config = self.render('tools_log4j.properties', 
log_file=EndToEndLatencyService.LOG_FILE)
+
+        node.account.create_file(EndToEndLatencyService.LOG4J_CONFIG, 
log_config)
+        client_config = str(self.security_config)
+        if node.version >= V_0_9_0_0:
+            client_config += "compression_type=%(compression_type)s" % 
self.args
+        node.account.create_file(EndToEndLatencyService.CONFIG_FILE, 
client_config)
+        
         self.security_config.setup_node(node)
-        if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
-            node.account.create_file(self.security_config_file, 
str(self.security_config))
 
         cmd = self.start_cmd(node)
         self.logger.debug("End-to-end latency %d command: %s", idx, cmd)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5f1158c/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 18daf09..b83227f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -17,6 +17,7 @@ import static net.sourceforge.argparse4j.impl.Arguments.store;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+import java.util.Random;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -59,8 +60,10 @@ public class ProducerPerformance {
 
             /* setup perf test */
             byte[] payload = new byte[recordSize];
-            Arrays.fill(payload, (byte) 1);
-            ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], 
byte[]>(topicName, payload);
+            Random random = new Random(0);
+            for (int i = 0; i < payload.length; ++i)
+                payload[i] = (byte) (random.nextInt(26) + 65);
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(topicName, payload);
             Stats stats = new Stats(numRecords, 5000);
             long startMs = System.currentTimeMillis();
 

Reply via email to