This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit c853929c3a7c57f2292e676efe4286e34ddeea10 Author: Boyang Chen <boy...@confluent.io> AuthorDate: Thu Feb 6 10:58:05 2020 -0800 HOTFIX: Fix spotsbug failure in Kafka examples (#8051) Reviewers: Jason Gustafson <ja...@confluent.io> --- .../kafka/examples/KafkaConsumerProducerDemo.java | 8 +++++++- .../java/kafka/examples/KafkaExactlyOnceDemo.java | 20 ++++++++++++++------ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index 561732b..21d85c3 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -16,6 +16,8 @@ */ package kafka.examples; +import org.apache.kafka.common.errors.TimeoutException; + import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -28,7 +30,11 @@ public class KafkaConsumerProducerDemo { Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", false, 10000, latch); consumerThread.start(); - latch.await(5, TimeUnit.MINUTES); + + if (!latch.await(5, TimeUnit.MINUTES)) { + throw new TimeoutException("Timeout after 5 minutes waiting for demo producer and consumer to finish"); + } + consumerThread.shutdown(); System.out.println("All finished!"); } diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java index d418eba..288b786 100644 --- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java @@ -19,6 +19,7 @@ package kafka.examples; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -77,9 +78,9 @@ public class KafkaExactlyOnceDemo { } String mode = args[0]; - int numPartitions = Integer.valueOf(args[1]); - int numInstances = Integer.valueOf(args[2]); - int numRecords = Integer.valueOf(args[3]); + int numPartitions = Integer.parseInt(args[1]); + int numInstances = Integer.parseInt(args[2]); + int numRecords = Integer.parseInt(args[3]); /* Stage 1: topic cleanup and recreation */ recreateTopics(numPartitions); @@ -90,7 +91,9 @@ public class KafkaExactlyOnceDemo { Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, prePopulateLatch); producerThread.start(); - prePopulateLatch.await(5, TimeUnit.MINUTES); + if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) { + throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population"); + } CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances); @@ -102,7 +105,9 @@ public class KafkaExactlyOnceDemo { messageProcessor.start(); } - transactionalCopyLatch.await(5, TimeUnit.MINUTES); + if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) { + throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy"); + } CountDownLatch consumeLatch = new CountDownLatch(1); @@ -110,7 +115,10 @@ public class KafkaExactlyOnceDemo { Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", true, numRecords, consumeLatch); consumerThread.start(); - consumeLatch.await(5, TimeUnit.MINUTES); + if (!consumeLatch.await(5, TimeUnit.MINUTES)) { + throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption"); + } + consumerThread.shutdown(); System.out.println("All finished!"); }