Repository: kafka Updated Branches: refs/heads/trunk 14a3d69d9 -> 816578b5c
KAFKA-4222; QueryableIntegrationTest.queryOnRebalance transient failure Don't produce messages on a separate thread continuosly. Just produce one of each value and stop. Close the producer once finished. Author: Damian Guy <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Ismael Juma <[email protected]> Closes #3080 from dguy/qs-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/816578b5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/816578b5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/816578b5 Branch: refs/heads/trunk Commit: 816578b5c13d36d73af02d1c11787f789a69be3f Parents: 14a3d69 Author: Damian Guy <[email protected]> Authored: Thu May 18 11:28:53 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu May 18 11:28:53 2017 +0100 ---------------------------------------------------------------------- .../QueryableStateIntegrationTest.java | 27 ++++++++------------ 1 file changed, 11 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/816578b5/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 4b5ae17..f2d0427 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -329,19 +329,17 @@ public class QueryableStateIntegrationTest { final int numThreads = STREAM_TWO_PARTITIONS; final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads]; final Thread[] streamThreads = new Thread[numThreads]; - final int numIterations = 500000; - // create concurrent producer - final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, numIterations); - final Thread producerThread = new Thread(producerRunnable); + final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1); + producerRunnable.run(); - // create three stream threads + + // create stream threads for (int i = 0; i < numThreads; i++) { streamRunnables[i] = new StreamRunnable(streamThree, outputTopicThree, i); streamThreads[i] = new Thread(streamRunnables[i]); streamThreads[i].start(); } - producerThread.start(); try { waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1); @@ -375,9 +373,6 @@ public class QueryableStateIntegrationTest { streamThreads[i].join(); } } - producerRunnable.shutdown(); - producerThread.interrupt(); - producerThread.join(); } } @@ -913,15 +908,15 @@ public class QueryableStateIntegrationTest { producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - final KafkaProducer<String, String> - producer = - new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer()); + try (final KafkaProducer<String, String> producer = + new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) { - while (getCurrIteration() < numIterations && !shutdown) { - for (int i = 0; i < inputValues.size(); i++) { - producer.send(new ProducerRecord<String, String>(topic, inputValues.get(i))); + while (getCurrIteration() < numIterations && !shutdown) { + for (int i = 0; i < inputValues.size(); i++) { + producer.send(new ProducerRecord<String, String>(topic, inputValues.get(i))); + } + incrementInteration(); } - incrementInteration(); } } }
