This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bb3111f472f KAFKA-14580: Moving EndToEndLatency from core to tools
module (#13095)
bb3111f472f is described below
commit bb3111f472f5007e47c0a01b24a4d61f44550ab9
Author: vamossagar12 <[email protected]>
AuthorDate: Thu Mar 2 16:35:22 2023 +0530
KAFKA-14580: Moving EndToEndLatency from core to tools module (#13095)
Reviewers: Mickael Maison <[email protected]>, Federico Valeri
<[email protected]>, Ismael Juma <[email protected]>
---
checkstyle/import-control.xml | 1 +
.../main/scala/kafka/tools/EndToEndLatency.scala | 179 ----------------
tests/kafkatest/benchmarks/core/benchmark_test.py | 2 +-
.../services/performance/end_to_end_latency.py | 9 +-
.../org/apache/kafka/tools/EndToEndLatency.java | 224 +++++++++++++++++++++
.../apache/kafka/tools/EndToEndLatencyTest.java | 100 +++++++++
6 files changed, 333 insertions(+), 182 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 71989a9daa4..7d078121b25 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -408,6 +408,7 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.server.util" />
+ <allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
deleted file mode 100755
index cba1c301caa..00000000000
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.nio.charset.StandardCharsets
-import java.time.Duration
-import java.util.{Arrays, Collections, Properties}
-
-import kafka.utils.Exit
-import org.apache.kafka.clients.admin.{Admin, NewTopic}
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
-import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
-
-import scala.jdk.CollectionConverters._
-import scala.util.Random
-
-
-/**
- * This class records the average end to end latency for a single message to
travel through Kafka
- *
- * broker_list = location of the bootstrap broker for both the producer and
the consumer
- * num_messages = # messages to send
- * producer_acks = See ProducerConfig.ACKS_DOC
- * message_size_bytes = size of each message in bytes
- *
- * e.g. [localhost:9092 test 10000 1 20]
- */
-
-object EndToEndLatency {
- private val timeout: Long = 60000
- private val defaultReplicationFactor: Short = 1
- private val defaultNumPartitions: Int = 1
-
- def main(args: Array[String]): Unit = {
- if (args.length != 5 && args.length != 6) {
- System.err.println("USAGE: java " + getClass.getName + " broker_list
topic num_messages producer_acks message_size_bytes [optional] properties_file")
- Exit.exit(1)
- }
-
- val brokerList = args(0)
- val topic = args(1)
- val numMessages = args(2).toInt
- val producerAcks = args(3)
- val messageLen = args(4).toInt
- val propsFile = if (args.length > 5) Some(args(5)).filter(_.nonEmpty) else
None
-
- if (!List("1", "all").contains(producerAcks))
- throw new IllegalArgumentException("Latency testing requires synchronous
acknowledgement. Please use 1 or all")
-
- def loadPropsWithBootstrapServers: Properties = {
- val props = propsFile.map(Utils.loadProps).getOrElse(new Properties())
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
- props
- }
-
- val consumerProps = loadPropsWithBootstrapServers
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" +
System.currentTimeMillis())
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
- consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure
we have no temporal batching
- val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
-
- val producerProps = loadPropsWithBootstrapServers
- producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes
are synchronous
- producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,
Long.MaxValue.toString)
- producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks)
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
- val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
-
- def finalise(): Unit = {
- consumer.commitSync()
- producer.close()
- consumer.close()
- }
-
- // create topic if it does not exist
- if (!consumer.listTopics().containsKey(topic)) {
- try {
- createTopic(topic, loadPropsWithBootstrapServers)
- } catch {
- case t: Throwable =>
- finalise()
- throw new RuntimeException(s"Failed to create topic $topic", t)
- }
- }
-
- val topicPartitions = consumer.partitionsFor(topic).asScala
- .map(p => new TopicPartition(p.topic(), p.partition())).asJava
- consumer.assign(topicPartitions)
- consumer.seekToEnd(topicPartitions)
- consumer.assignment.forEach(consumer.position(_))
-
- var totalTime = 0.0
- val latencies = new Array[Long](numMessages)
- val random = new Random(0)
-
- for (i <- 0 until numMessages) {
- val message = randomBytesOfLen(random, messageLen)
- val begin = System.nanoTime
-
- //Send message (of random bytes) synchronously then immediately poll for
it
- producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
message)).get()
- val recordIter = consumer.poll(Duration.ofMillis(timeout)).iterator
-
- val elapsed = System.nanoTime - begin
-
- //Check we got results
- if (!recordIter.hasNext) {
- finalise()
- throw new RuntimeException(s"poll() timed out before finding a result
(timeout:[$timeout])")
- }
-
- //Check result matches the original record
- val sent = new String(message, StandardCharsets.UTF_8)
- val read = new String(recordIter.next().value(), StandardCharsets.UTF_8)
- if (!read.equals(sent)) {
- finalise()
- throw new RuntimeException(s"The message read [$read] did not match
the message sent [$sent]")
- }
-
- //Check we only got the one message
- if (recordIter.hasNext) {
- val count = 1 + recordIter.asScala.size
- throw new RuntimeException(s"Only one result was expected during this
test. We found [$count]")
- }
-
- //Report progress
- if (i % 1000 == 0)
- println(i.toString + "\t" + elapsed / 1000.0 / 1000.0)
- totalTime += elapsed
- latencies(i) = elapsed / 1000 / 1000
- }
-
- //Results
- println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 /
1000.0))
- Arrays.sort(latencies)
- val p50 = latencies((latencies.length * 0.5).toInt)
- val p99 = latencies((latencies.length * 0.99).toInt)
- val p999 = latencies((latencies.length * 0.999).toInt)
- println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99,
p999))
-
- finalise()
- }
-
- def randomBytesOfLen(random: Random, len: Int): Array[Byte] = {
- Array.fill(len)((random.nextInt(26) + 65).toByte)
- }
-
- def createTopic(topic: String, props: Properties): Unit = {
- println("Topic \"%s\" does not exist. Will create topic with %d
partition(s) and replication factor = %d"
- .format(topic, defaultNumPartitions, defaultReplicationFactor))
-
- val adminClient = Admin.create(props)
- val newTopic = new NewTopic(topic, defaultNumPartitions,
defaultReplicationFactor)
- try adminClient.createTopics(Collections.singleton(newTopic)).all().get()
- finally Utils.closeQuietly(adminClient, "AdminClient")
- }
-}
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py
b/tests/kafkatest/benchmarks/core/benchmark_test.py
index 5e7bddeb5d1..959f23a366c 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -173,7 +173,7 @@ class Benchmark(Test):
Return aggregate latency statistics.
- (Under the hood, this simply runs EndToEndLatency.scala)
+ (Under the hood, this simply runs EndToEndLatency.java)
"""
client_version = KafkaVersion(client_version)
broker_version = KafkaVersion(broker_version)
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py
b/tests/kafkatest/services/performance/end_to_end_latency.py
index 3cde3ef1a5d..a35d2e14274 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -17,7 +17,7 @@ import os
from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.version import DEV_BRANCH
+from kafkatest.version import get_version, V_3_4_0, DEV_BRANCH
@@ -50,6 +50,7 @@ class EndToEndLatencyService(PerformanceService):
root=EndToEndLatencyService.PERSISTENT_ROOT)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
+ self.version = ''
security_protocol = self.security_config.security_protocol
@@ -73,6 +74,7 @@ class EndToEndLatencyService(PerformanceService):
def start_cmd(self, node):
args = self.args.copy()
+ self.version = get_version(node)
args.update({
'bootstrap_servers':
self.kafka.bootstrap_servers(self.security_config.security_protocol),
'config_file': EndToEndLatencyService.CONFIG_FILE,
@@ -124,4 +126,7 @@ class EndToEndLatencyService(PerformanceService):
self.results[idx-1] = results
def java_class_name(self):
- return "kafka.tools.EndToEndLatency"
+ if self.version <= V_3_4_0:
+ return "kafka.tools.EndToEndLatency"
+ else:
+ return "org.apache.kafka.tools.EndToEndLatency"
diff --git a/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
new file mode 100644
index 00000000000..cb6af45cabd
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to
travel through Kafka.
+ * Following are the required arguments
+ * <p> broker_list = location of the bootstrap broker for both the producer
and the consumer </p>
+ * <p> topic = topic name used by both the producer and the consumer to
send/receive messages </p>
+ * <p> num_messages = # messages to send </p>
+ * <p> producer_acks = See ProducerConfig.ACKS_DOC </p>
+ * <p> message_size_bytes = size of each message in bytes </p>
+ *
+ * <p> e.g. [localhost:9092 test 10000 1 20] </p>
+ */
+public class EndToEndLatency {
+ private final static long POLL_TIMEOUT_MS = 60000;
+ private final static short DEFAULT_REPLICATION_FACTOR = 1;
+ private final static int DEFAULT_NUM_PARTITIONS = 1;
+
+ public static void main(String... args) {
+ Exit.exit(mainNoExit(args));
+ }
+
+ static int mainNoExit(String... args) {
+ try {
+ execute(args);
+ return 0;
+ } catch (TerseException e) {
+ System.err.println(e.getMessage());
+ return 1;
+ } catch (Throwable e) {
+ System.err.println(e.getMessage());
+ System.err.println(Utils.stackTrace(e));
+ return 1;
+ }
+ }
+
+ // Visible for testing
+ static void execute(String... args) throws Exception {
+ if (args.length != 5 && args.length != 6) {
+ throw new TerseException("USAGE: java " +
EndToEndLatency.class.getName()
+ + " broker_list topic num_messages producer_acks
message_size_bytes [optional] properties_file");
+ }
+
+ String brokers = args[0];
+ String topic = args[1];
+ int numMessages = Integer.parseInt(args[2]);
+ String acks = args[3];
+ int messageSizeBytes = Integer.parseInt(args[4]);
+ Optional<String> propertiesFile = (args.length > 5 &&
!Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();
+
+ if (!Arrays.asList("1", "all").contains(acks)) {
+ throw new IllegalArgumentException("Latency testing requires
synchronous acknowledgement. Please use 1 or all");
+ }
+
+ try (KafkaConsumer<byte[], byte[]> consumer =
createKafkaConsumer(propertiesFile, brokers);
+ KafkaProducer<byte[], byte[]> producer =
createKafkaProducer(propertiesFile, brokers, acks)) {
+
+ if (!consumer.listTopics().containsKey(topic)) {
+ createTopic(propertiesFile, brokers, topic);
+ }
+ setupConsumer(topic, consumer);
+ double totalTime = 0.0;
+ long[] latencies = new long[numMessages];
+ Random random = new Random(0);
+
+ for (int i = 0; i < numMessages; i++) {
+ byte[] message = randomBytesOfLen(random, messageSizeBytes);
+ long begin = System.nanoTime();
+ //Send message (of random bytes) synchronously then
immediately poll for it
+ producer.send(new ProducerRecord<>(topic, message)).get();
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+ long elapsed = System.nanoTime() - begin;
+
+ validate(consumer, message, records);
+
+ //Report progress
+ if (i % 1000 == 0)
+ System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+ totalTime += elapsed;
+ latencies[i] = elapsed / 1000 / 1000;
+ }
+
+ printResults(numMessages, totalTime, latencies);
+ consumer.commitSync();
+ }
+ }
+
+ // Visible for testing
+ static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[]
message, ConsumerRecords<byte[], byte[]> records) {
+ if (records.isEmpty()) {
+ consumer.commitSync();
+ throw new RuntimeException("poll() timed out before finding a
result (timeout:[" + POLL_TIMEOUT_MS + "])");
+ }
+
+ //Check result matches the original record
+ String sent = new String(message, StandardCharsets.UTF_8);
+ String read = new String(records.iterator().next().value(),
StandardCharsets.UTF_8);
+
+ if (!read.equals(sent)) {
+ consumer.commitSync();
+ throw new RuntimeException("The message read [" + read + "] did
not match the message sent [" + sent + "]");
+ }
+
+ //Check we only got the one message
+ if (records.count() != 1) {
+ int count = records.count();
+ consumer.commitSync();
+ throw new RuntimeException("Only one result was expected during
this test. We found [" + count + "]");
+ }
+ }
+
+ private static void setupConsumer(String topic, KafkaConsumer<byte[],
byte[]> consumer) {
+ List<TopicPartition> topicPartitions = consumer
+ .partitionsFor(topic)
+ .stream()
+ .map(p -> new TopicPartition(p.topic(), p.partition()))
+ .collect(Collectors.toList());
+ consumer.assign(topicPartitions);
+ consumer.seekToEnd(topicPartitions);
+ consumer.assignment().forEach(consumer::position);
+ }
+
+ private static void printResults(int numMessages, double totalTime, long[]
latencies) {
+ System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages /
1000.0 / 1000.0);
+ Arrays.sort(latencies);
+ int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+ int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+ int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+ System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d%n",
p50, p99, p999);
+ }
+
+ private static byte[] randomBytesOfLen(Random random, int length) {
+ byte[] randomBytes = new byte[length];
+ Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) +
65).byteValue());
+ return randomBytes;
+ }
+
+ private static void createTopic(Optional<String> propertiesFile, String
brokers, String topic) throws IOException {
+ System.out.printf("Topic \"%s\" does not exist. "
+ + "Will create topic with %d partition(s) and
replication factor = %d%n",
+ topic, DEFAULT_NUM_PARTITIONS, DEFAULT_REPLICATION_FACTOR);
+
+ Properties adminProps = loadPropsWithBootstrapServers(propertiesFile,
brokers);
+ Admin adminClient = Admin.create(adminProps);
+ NewTopic newTopic = new NewTopic(topic, DEFAULT_NUM_PARTITIONS,
DEFAULT_REPLICATION_FACTOR);
+ try {
+
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+ } catch (ExecutionException | InterruptedException e) {
+ System.out.printf("Creation of topic %s failed%n", topic);
+ throw new RuntimeException(e);
+ } finally {
+ Utils.closeQuietly(adminClient, "AdminClient");
+ }
+ }
+
+ private static Properties loadPropsWithBootstrapServers(Optional<String>
propertiesFile, String brokers) throws IOException {
+ Properties properties = propertiesFile.isPresent() ?
Utils.loadProps(propertiesFile.get()) : new Properties();
+ properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ return properties;
+ }
+
+ private static KafkaConsumer<byte[], byte[]>
createKafkaConsumer(Optional<String> propsFile, String brokers) throws
IOException {
+ Properties consumerProps = loadPropsWithBootstrapServers(propsFile,
brokers);
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" +
System.currentTimeMillis());
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0");
//ensure we have no temporal batching
+ return new KafkaConsumer<>(consumerProps);
+ }
+
+ private static KafkaProducer<byte[], byte[]>
createKafkaProducer(Optional<String> propsFile, String brokers, String acks)
throws IOException {
+ Properties producerProps = loadPropsWithBootstrapServers(propsFile,
brokers);
+ producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0"); //ensure
writes are synchronous
+ producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MAX_VALUE);
+ producerProps.put(ProducerConfig.ACKS_CONFIG, acks);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
+ return new KafkaProducer<>(producerProps);
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java
b/tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java
new file mode 100644
index 00000000000..9496770a822
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class EndToEndLatencyTest {
+
+ @Mock
+ KafkaConsumer<byte[], byte[]> consumer;
+
+ @Mock
+ ConsumerRecords<byte[], byte[]> records;
+
+ @Test
+ public void shouldFailWhenSuppliedUnexpectedArgs() {
+ String[] args = new String[] {"localhost:9092", "test", "10000", "1",
"200", "propsfile.properties", "random"};
+ assertThrows(TerseException.class, () ->
EndToEndLatency.execute(args));
+ }
+
+ @Test
+ public void shouldFailWhenProducerAcksAreNotSynchronised() {
+ String[] args = new String[] {"localhost:9092", "test", "10000", "0",
"200"};
+ assertThrows(IllegalArgumentException.class, () ->
EndToEndLatency.execute(args));
+ }
+
+ @Test
+ public void shouldFailWhenConsumerRecordsIsEmpty() {
+ when(records.isEmpty()).thenReturn(true);
+ assertThrows(RuntimeException.class, () ->
EndToEndLatency.validate(consumer, new byte[0], records));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldFailWhenSentIsNotEqualToReceived() {
+ Iterator<ConsumerRecord<byte[], byte[]>> iterator =
mock(Iterator.class);
+ ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
+ when(records.isEmpty()).thenReturn(false);
+ when(records.iterator()).thenReturn(iterator);
+ when(iterator.next()).thenReturn(record);
+
when(record.value()).thenReturn("kafkab".getBytes(StandardCharsets.UTF_8));
+ assertThrows(RuntimeException.class, () ->
EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8),
records));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldFailWhenReceivedMoreThanOneRecord() {
+ Iterator<ConsumerRecord<byte[], byte[]>> iterator =
mock(Iterator.class);
+ ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
+ when(records.isEmpty()).thenReturn(false);
+ when(records.iterator()).thenReturn(iterator);
+ when(iterator.next()).thenReturn(record);
+
when(record.value()).thenReturn("kafkaa".getBytes(StandardCharsets.UTF_8));
+ when(records.count()).thenReturn(2);
+ assertThrows(RuntimeException.class, () ->
EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8),
records));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldPassInValidation() {
+ Iterator<ConsumerRecord<byte[], byte[]>> iterator =
mock(Iterator.class);
+ ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
+ when(records.isEmpty()).thenReturn(false);
+ when(records.iterator()).thenReturn(iterator);
+ when(iterator.next()).thenReturn(record);
+
when(record.value()).thenReturn("kafkaa".getBytes(StandardCharsets.UTF_8));
+ when(records.count()).thenReturn(1);
+ assertDoesNotThrow(() -> EndToEndLatency.validate(consumer,
"kafkaa".getBytes(StandardCharsets.UTF_8), records));
+ }
+
+}