Repository: kafka Updated Branches: refs/heads/trunk b982eefd3 -> 7371bf7f6
KAFKA-5063: Fix flaky o.a.k.streams.integration.ResetIntegrationTest Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy, Eno Thereska, Guozhang Wang Closes #2931 from mjsax/kafka-5140-flaky-reset-integration-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7371bf7f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7371bf7f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7371bf7f Branch: refs/heads/trunk Commit: 7371bf7f65d1245b084f17e534b5728d5929e207 Parents: b982eef Author: Matthias J. Sax <[email protected]> Authored: Tue May 9 14:35:57 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue May 9 14:35:57 2017 -0700 ---------------------------------------------------------------------- .../integration/ResetIntegrationTest.java | 21 +++++++-------- .../integration/utils/EmbeddedKafkaCluster.java | 28 ++++++++++++++++---- 2 files changed, 33 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7371bf7f/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 2b4f14c..775ac8d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -74,7 +74,10 @@ public class ResetIntegrationTest { // expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially // very long sleep times props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L); - CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props); + // we align time to seconds to get clean window boundaries and thus ensure the same result for each run + // otherwise, input records could fall into different windows for different runs depending on the initial mock time + final long alignedTime = (System.currentTimeMillis() / 1000) * 1000; + CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props, alignedTime); } private static final String APP_ID = "cleanup-integration-test"; @@ -143,14 +146,13 @@ public class ResetIntegrationTest { final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( resultTopicConsumerConfig, OUTPUT_TOPIC, - 10, - 60000); + 10); // receive only first values to make sure intermediate user topic is not consumed completely // => required to test "seekToEnd" for intermediate topics final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( resultTopicConsumerConfig, OUTPUT_TOPIC_2, - 10 + 40 ); streams.close(); @@ -177,12 +179,11 @@ public class ResetIntegrationTest { final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( resultTopicConsumerConfig, OUTPUT_TOPIC, - 10, - 60000); + 10); final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( resultTopicConsumerConfig, OUTPUT_TOPIC_2_RERUN, - 10 + 40 ); streams.close(); @@ -229,8 +230,7 @@ public class ResetIntegrationTest { final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( resultTopicConsumerConfig, OUTPUT_TOPIC, - 10, - 60000); + 10); streams.close(); TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, @@ -250,8 +250,7 @@ public class ResetIntegrationTest { final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( resultTopicConsumerConfig, OUTPUT_TOPIC, - 10, - 60000); + 10); streams.close(); assertThat(resultRerun, equalTo(result)); http://git-wip-us.apache.org/repos/asf/kafka/blob/7371bf7f/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 70d271c..6a0fc51 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -37,21 +37,36 @@ public class EmbeddedKafkaCluster extends ExternalResource { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected - public static final int TOPIC_CREATION_TIMEOUT = 30000; + private static final int TOPIC_CREATION_TIMEOUT = 30000; private EmbeddedZookeeper zookeeper = null; private final KafkaEmbedded[] brokers; private final Properties brokerConfig; + public final MockTime time; public EmbeddedKafkaCluster(final int numBrokers) { this(numBrokers, new Properties()); } - public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { + public EmbeddedKafkaCluster(final int numBrokers, + final Properties brokerConfig) { + this(numBrokers, brokerConfig, System.currentTimeMillis()); + } + + public EmbeddedKafkaCluster(final int numBrokers, + final Properties brokerConfig, + final long mockTimeMillisStart) { + this(numBrokers, brokerConfig, mockTimeMillisStart, System.nanoTime()); + } + + public EmbeddedKafkaCluster(final int numBrokers, + final Properties brokerConfig, + final long mockTimeMillisStart, + final long mockTimeNanoStart) { brokers = new KafkaEmbedded[numBrokers]; this.brokerConfig = brokerConfig; - } + time = new MockTime(mockTimeMillisStart, mockTimeNanoStart); - public final MockTime time = new MockTime(); + } /** * Creates and starts a Kafka cluster. @@ -82,8 +97,9 @@ public class EmbeddedKafkaCluster extends ExternalResource { } private void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { - if (!props.containsKey(propertyKey)) + if (!props.containsKey(propertyKey)) { brokerConfig.put(propertyKey, propertyValue); + } } /** @@ -115,10 +131,12 @@ public class EmbeddedKafkaCluster extends ExternalResource { return brokers[0].brokerList(); } + @Override protected void before() throws Throwable { start(); } + @Override protected void after() { stop(); }
