Repository: kafka Updated Branches: refs/heads/0.11.0 8ea4a2826 -> c89c6b873
MINOR: improve flaky Streams tests Use TestUtil test directory for state directory instead of default /tmp/kafka-streams Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy <[email protected]> Closes #4246 from mjsax/improve-flaky-streams-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c89c6b87 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c89c6b87 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c89c6b87 Branch: refs/heads/0.11.0 Commit: c89c6b87365c8a8482e1ddac23079af7f9faff0c Parents: 8ea4a28 Author: Matthias J. Sax <[email protected]> Authored: Wed Nov 22 10:55:42 2017 +0000 Committer: Damian Guy <[email protected]> Committed: Wed Nov 22 10:55:42 2017 +0000 ---------------------------------------------------------------------- .../apache/kafka/streams/KafkaStreamsTest.java | 81 ++++---------------- .../integration/FanoutIntegrationTest.java | 2 + 2 files changed, 16 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c89c6b87/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 8eea60c..064d7b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.config.ConfigException; @@ -63,6 +62,7 @@ public class KafkaStreamsTest { // quick enough) @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final KStreamBuilder builder = new KStreamBuilder(); private KafkaStreams streams; private Properties props; @@ -80,9 +80,6 @@ public class KafkaStreamsTest { @Test public void testStateChanges() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - final KafkaStreams streams = new KafkaStreams(builder, props); - StateListenerStub stateListener = new StateListenerStub(); streams.setStateListener(stateListener); Assert.assertEquals(streams.state(), KafkaStreams.State.CREATED); @@ -101,9 +98,6 @@ public class KafkaStreamsTest { @Test public void testStateCloseAfterCreate() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - final KafkaStreams streams = new KafkaStreams(builder, props); - StateListenerStub stateListener = new StateListenerStub(); streams.setStateListener(stateListener); streams.close(); @@ -159,25 +153,20 @@ public class KafkaStreamsTest { @Test public void testStateThreadClose() throws Exception { - final int numThreads = 2; - final KStreamBuilder builder = new KStreamBuilder(); // make sure we have the global state thread running too builder.globalTable("anyTopic"); - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads); - final KafkaStreams streams = new KafkaStreams(builder, props); - testStateThreadCloseHelper(numThreads); + streams = new KafkaStreams(new KStreamBuilder(), props); + + testStateThreadCloseHelper(NUM_THREADS); } @Test public void testStateGlobalThreadClose() throws Exception { - final int numThreads = 2; final KStreamBuilder builder = new KStreamBuilder(); // make sure we have the global state thread running too builder.globalTable("anyTopic"); - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads); - final KafkaStreams streams = new KafkaStreams(builder, props); - + streams = new KafkaStreams(builder, props); streams.start(); TestUtils.waitForCondition(new TestCondition() { @@ -260,7 +249,8 @@ public class KafkaStreamsTest { @Test public void testNumberDefaultMetrics() { - final KafkaStreams streams = createKafkaStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + streams = new KafkaStreams(builder, props); final Map<MetricName, ? extends Metric> metrics = streams.metrics(); // all 15 default StreamThread metrics + 1 metric that keeps track of number of metrics assertEquals(metrics.size(), 16); @@ -268,11 +258,7 @@ public class KafkaStreamsTest { @Test public void testIllegalMetricsConfig() { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig"); - final KStreamBuilder builder = new KStreamBuilder(); try { new KafkaStreams(builder, props); @@ -282,17 +268,12 @@ public class KafkaStreamsTest { @Test public void testLegalMetricsConfig() { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString()); - final KStreamBuilder builder1 = new KStreamBuilder(); - final KafkaStreams streams1 = new KafkaStreams(builder1, props); + final KafkaStreams streams1 = new KafkaStreams(builder, props); streams1.close(); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString()); - final KStreamBuilder builder2 = new KStreamBuilder(); - final KafkaStreams streams2 = new KafkaStreams(builder2, props); + new KafkaStreams(builder, props); } @@ -325,11 +306,6 @@ public class KafkaStreamsTest { public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception { final AtomicBoolean keepRunning = new AtomicBoolean(true); try { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - final KStreamBuilder builder = new KStreamBuilder(); final CountDownLatch latch = new CountDownLatch(1); final String topic = "input"; @@ -349,7 +325,8 @@ public class KafkaStreamsTest { } } }); - final KafkaStreams streams = new KafkaStreams(builder, props); + + streams = new KafkaStreams(builder, props); streams.start(); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, Collections.singletonList(new KeyValue<>("A", "A")), @@ -369,24 +346,8 @@ public class KafkaStreamsTest { } - private KafkaStreams createKafkaStreams() { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - - final KStreamBuilder builder = new KStreamBuilder(); - return new KafkaStreams(builder, props); - } - @Test public void testCleanup() throws Exception { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - - final KStreamBuilder builder = new KStreamBuilder(); - final KafkaStreams streams = new KafkaStreams(builder, props); - streams.cleanUp(); streams.start(); streams.close(); @@ -395,13 +356,6 @@ public class KafkaStreamsTest { @Test public void testCannotCleanupWhileRunning() throws Exception { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - - final KStreamBuilder builder = new KStreamBuilder(); - final KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); try { streams.cleanUp(); @@ -428,22 +382,15 @@ public class KafkaStreamsTest { @Test public void shouldCleanupOldStateDirs() throws InterruptedException { - final Properties props = new Properties(); - final String appId = "cleanupOldStateDirs"; - final String stateDir = TestUtils.tempDirectory().getPath(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1"); - props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); - final String topic = "topic"; CLUSTER.createTopic(topic); - final KStreamBuilder builder = new KStreamBuilder(); + final KStreamBuilder builder = new KStreamBuilder(); builder.table(Serdes.String(), Serdes.String(), topic, topic); - final KafkaStreams streams = new KafkaStreams(builder, props); + streams = new KafkaStreams(builder, props); final CountDownLatch latch = new CountDownLatch(1); streams.setStateListener(new KafkaStreams.StateListener() { @Override @@ -453,7 +400,7 @@ public class KafkaStreamsTest { } } }); - final String appDir = stateDir + File.separator + appId; + final String appDir = props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); final File oldTaskDir = new File(appDir, "10_1"); assertTrue(oldTaskDir.mkdirs()); try { http://git-wip-us.apache.org/repos/asf/kafka/blob/c89c6b87/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java index f1f09a4..5f3be85 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -101,6 +102,7 @@ public class FanoutIntegrationTest { streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
