This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push: new de202a6 KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest (#7472) de202a6 is described below commit de202a669777becd4a79fcdf6c0597e5a12d2370 Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Thu Oct 10 16:23:18 2019 -0700 KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest (#7472) All four flavors of the repartition/optimization tests have been reported as flaky and failed in one place or another: * RepartitionOptimizingIntegrationTest.shouldSendCorrectRecords_OPTIMIZED * RepartitionOptimizingIntegrationTest.shouldSendCorrectRecords_NO_OPTIMIZATION * RepartitionWithMergeOptimizingIntegrationTest.shouldSendCorrectRecords_OPTIMIZED * RepartitionWithMergeOptimizingIntegrationTest.shouldSendCorrectRecords_NO_OPTIMIZATION They're pretty similar so it makes sense to knock them all out at once. This PR does three things: * Switch to in-memory stores wherever possible * Name all operators and update the Topology accordingly (not really a flaky test fix, but had to update the topology names anyway because of the IM stores so figured might as well) * Port to TopologyTestDriver -- this is the "real" fix, should make a big difference as these repartition tests required multiple roundtrips with the Kafka cluster (while using only the default timeout) Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../internals/RepartitionOptimizingTest.java} | 548 +++++++++++---------- .../RepartitionWithMergeOptimizingTest.java} | 328 ++++++------ .../tests/StreamsBrokerDownResilienceTest.java | 15 +- .../tests/streams/streams_upgrade_test.py | 2 +- 4 files changed, 465 insertions(+), 428 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java similarity index 54% rename from streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java index bea32f2..8425ad7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java @@ -15,40 +15,41 @@ * limitations under the License. */ -package org.apache.kafka.streams.integration; +package org.apache.kafka.streams.processor.internals; - -import kafka.utils.MockTime; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.StreamsTestUtils; -import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; -import org.junit.ClassRule; import org.junit.Test; -import org.junit.experimental.categories.Category; import java.util.ArrayList; import java.util.Arrays; @@ -57,17 +58,19 @@ import java.util.Locale; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static java.time.Duration.ofDays; import static java.time.Duration.ofMillis; -import static java.time.Duration.ofSeconds; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -@Category({IntegrationTest.class}) -public class RepartitionOptimizingIntegrationTest { +public class RepartitionOptimizingTest { + + private final Logger log = LoggerFactory.getLogger(RepartitionOptimizingTest.class); - private static final int NUM_BROKERS = 1; private static final String INPUT_TOPIC = "input"; private static final String COUNT_TOPIC = "outputTopic_0"; private static final String AGGREGATION_TOPIC = "outputTopic_1"; @@ -77,143 +80,168 @@ public class RepartitionOptimizingIntegrationTest { private static final int ONE_REPARTITION_TOPIC = 1; private static final int FOUR_REPARTITION_TOPICS = 4; + private final Serializer<String> stringSerializer = new StringSerializer(); + private final Deserializer<String> stringDeserializer = new StringDeserializer(); + private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); private Properties streamsConfiguration; + private TopologyTestDriver topologyTestDriver; + private final Initializer<Integer> initializer = () -> 0; + private final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length(); + private final Reducer<String> reducer = (v1, v2) -> v1 + ":" + v2; - @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - private final MockTime mockTime = CLUSTER.time; + private final List<String> processorValueCollector = new ArrayList<>(); + + private final List<KeyValue<String, Long>> expectedCountKeyValues = + Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 3L), KeyValue.pair("C", 3L)); + private final List<KeyValue<String, Integer>> expectedAggKeyValues = + Arrays.asList(KeyValue.pair("A", 9), KeyValue.pair("B", 9), KeyValue.pair("C", 9)); + private final List<KeyValue<String, String>> expectedReduceKeyValues = + Arrays.asList(KeyValue.pair("A", "foo:bar:baz"), KeyValue.pair("B", "foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz")); + private final List<KeyValue<String, String>> expectedJoinKeyValues = + Arrays.asList(KeyValue.pair("A", "foo:3"), KeyValue.pair("A", "bar:3"), KeyValue.pair("A", "baz:3")); + private final List<String> expectedCollectedProcessorValues = + Arrays.asList("FOO", "BAR", "BAZ"); @Before - public void setUp() throws Exception { + public void setUp() { final Properties props = new Properties(); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); streamsConfiguration = StreamsTestUtils.getStreamsConfig( "maybe-optimized-test-app", - CLUSTER.bootstrapServers(), + "dummy-bootstrap-servers-config", Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), props); - CLUSTER.createTopics(INPUT_TOPIC, - COUNT_TOPIC, - AGGREGATION_TOPIC, - REDUCE_TOPIC, - JOINED_TOPIC); - - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + processorValueCollector.clear(); } @After - public void tearDown() throws Exception { - CLUSTER.deleteAllTopicsAndWait(30_000L); + public void tearDown() { + try { + topologyTestDriver.close(); + } catch (final RuntimeException e) { + log.warn("The following exception was thrown while trying to close the TopologyTestDriver (note that " + + "KAFKA-6647 causes this when running on Windows):", e); + } } @Test - public void shouldSendCorrectRecords_OPTIMIZED() throws Exception { - runIntegrationTest(StreamsConfig.OPTIMIZE, - ONE_REPARTITION_TOPIC); + public void shouldSendCorrectRecords_OPTIMIZED() { + runTest(StreamsConfig.OPTIMIZE, ONE_REPARTITION_TOPIC); } @Test - public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception { - runIntegrationTest(StreamsConfig.NO_OPTIMIZATION, - FOUR_REPARTITION_TOPICS); + public void shouldSendCorrectResults_NO_OPTIMIZATION() { + runTest(StreamsConfig.NO_OPTIMIZATION, FOUR_REPARTITION_TOPICS); } - private void runIntegrationTest(final String optimizationConfig, - final int expectedNumberRepartitionTopics) throws Exception { - - final Initializer<Integer> initializer = () -> 0; - final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length(); - - final Reducer<String> reducer = (v1, v2) -> v1 + ":" + v2; - - final List<String> processorValueCollector = new ArrayList<>(); + private void runTest(final String optimizationConfig, final int expectedNumberRepartitionTopics) { final StreamsBuilder builder = new StreamsBuilder(); - final KStream<String, String> sourceStream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); - - final KStream<String, String> mappedStream = sourceStream.map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v)); - - mappedStream.filter((k, v) -> k.equals("B")).mapValues(v -> v.toUpperCase(Locale.getDefault())) - .process(() -> new SimpleProcessor(processorValueCollector)); - - final KStream<String, Long> countStream = mappedStream.groupByKey().count(Materialized.with(Serdes.String(), Serdes.Long())).toStream(); - - countStream.to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); - - mappedStream.groupByKey().aggregate(initializer, - aggregator, - Materialized.with(Serdes.String(), Serdes.Integer())) - .toStream().to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), Serdes.Integer())); + final KStream<String, String> sourceStream = + builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceStream")); + + final KStream<String, String> mappedStream = sourceStream + .map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v), Named.as("source-map")); + + mappedStream + .filter((k, v) -> k.equals("B"), Named.as("process-filter")) + .mapValues(v -> v.toUpperCase(Locale.getDefault()), Named.as("process-mapValues")) + .process(() -> new SimpleProcessor(processorValueCollector), Named.as("process")); + + final KStream<String, Long> countStream = mappedStream + .groupByKey(Grouped.as("count-groupByKey")) + .count(Named.as("count"), Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("count-store")) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long())) + .toStream(Named.as("count-toStream")); + + countStream.to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()).withName("count-to")); + + mappedStream + .groupByKey(Grouped.as("aggregate-groupByKey")) + .aggregate(initializer, + aggregator, + Named.as("aggregate"), + Materialized.<String, Integer>as(Stores.inMemoryKeyValueStore("aggregate-store")) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Integer())) + .toStream(Named.as("aggregate-toStream")) + .to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()).withName("reduce-to")); // adding operators for case where the repartition node is further downstream - mappedStream.filter((k, v) -> true).peek((k, v) -> System.out.println(k + ":" + v)).groupByKey() - .reduce(reducer, Materialized.with(Serdes.String(), Serdes.String())) - .toStream().to(REDUCE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); - - mappedStream.filter((k, v) -> k.equals("A")) + mappedStream + .filter((k, v) -> true, Named.as("reduce-filter")) + .peek((k, v) -> System.out.println(k + ":" + v), Named.as("reduce-peek")) + .groupByKey(Grouped.as("reduce-groupByKey")) + .reduce(reducer, + Named.as("reducer"), + Materialized.as(Stores.inMemoryKeyValueStore("reduce-store"))) + .toStream(Named.as("reduce-toStream")) + .to(REDUCE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + + mappedStream + .filter((k, v) -> k.equals("A"), Named.as("join-filter")) .join(countStream, (v1, v2) -> v1 + ":" + v2.toString(), JoinWindows.of(ofMillis(5000)), - StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.Long())) - .to(JOINED_TOPIC); + StreamJoined.<String, String, Long>with(Stores.inMemoryWindowStore("join-store", ofDays(1), ofMillis(10000), true), + Stores.inMemoryWindowStore("other-join-store", ofDays(1), ofMillis(10000), true)) + .withName("join") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) + .withOtherValueSerde(Serdes.Long())) + .to(JOINED_TOPIC, Produced.as("join-to")); streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig); + final Topology topology = builder.build(streamsConfiguration); - final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); - IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC, getKeyValues(), producerConfig, mockTime); + final TestInputTopic<String, String> inputTopicA = topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, stringSerializer); + final TestOutputTopic<String, Long> countOutputTopic = topologyTestDriver.createOutputTopic(COUNT_TOPIC, stringDeserializer, new LongDeserializer()); + final TestOutputTopic<String, Integer> aggregationOutputTopic = topologyTestDriver.createOutputTopic(AGGREGATION_TOPIC, stringDeserializer, new IntegerDeserializer()); + final TestOutputTopic<String, String> reduceOutputTopic = topologyTestDriver.createOutputTopic(REDUCE_TOPIC, stringDeserializer, stringDeserializer); + final TestOutputTopic<String, String> joinedOutputTopic = topologyTestDriver.createOutputTopic(JOINED_TOPIC, stringDeserializer, stringDeserializer); - final Properties consumerConfig1 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class); - final Properties consumerConfig2 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, IntegerDeserializer.class); - final Properties consumerConfig3 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + inputTopicA.pipeKeyValueList(getKeyValues()); - final Topology topology = builder.build(streamsConfiguration); + // Verify the topology final String topologyString = topology.describe().toString(); - if (optimizationConfig.equals(StreamsConfig.OPTIMIZE)) { assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, topologyString); } else { assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, topologyString); } - - /* - confirming number of expected repartition topics here - */ + // Verify the number of repartition topics assertEquals(expectedNumberRepartitionTopics, getCountOfRepartitionTopicsFound(topologyString)); - final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); - streams.start(); - - final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 3L), KeyValue.pair("C", 3L)); - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues); - - final List<KeyValue<String, Integer>> expectedAggKeyValues = Arrays.asList(KeyValue.pair("A", 9), KeyValue.pair("B", 9), KeyValue.pair("C", 9)); - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, AGGREGATION_TOPIC, expectedAggKeyValues); - - final List<KeyValue<String, String>> expectedReduceKeyValues = Arrays.asList(KeyValue.pair("A", "foo:bar:baz"), KeyValue.pair("B", "foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz")); - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, REDUCE_TOPIC, expectedReduceKeyValues); - - final List<KeyValue<String, String>> expectedJoinKeyValues = Arrays.asList(KeyValue.pair("A", "foo:3"), KeyValue.pair("A", "bar:3"), KeyValue.pair("A", "baz:3")); - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, JOINED_TOPIC, expectedJoinKeyValues); - - - final List<String> expectedCollectedProcessorValues = Arrays.asList("FOO", "BAR", "BAZ"); - + // Verify the values collected by the processor assertThat(3, equalTo(processorValueCollector.size())); assertThat(processorValueCollector, equalTo(expectedCollectedProcessorValues)); - streams.close(ofSeconds(5)); + // Verify the expected output + assertThat(countOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedCountKeyValues))); + assertThat(aggregationOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedAggKeyValues))); + assertThat(reduceOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedReduceKeyValues))); + assertThat(joinedOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedJoinKeyValues))); } + private <K, V> Map<K, V> keyValueListToMap(final List<KeyValue<K, V>> keyValuePairs) { + final Map<K, V> map = new HashMap<>(); + for (final KeyValue<K, V> pair : keyValuePairs) { + map.put(pair.key, pair.value); + } + return map; + } private int getCountOfRepartitionTopicsFound(final String topologyString) { final Matcher matcher = repartitionTopicPattern.matcher(topologyString); @@ -224,7 +252,6 @@ public class RepartitionOptimizingIntegrationTest { return repartitionTopicsFound.size(); } - private List<KeyValue<String, String>> getKeyValues() { final List<KeyValue<String, String>> keyValueList = new ArrayList<>(); final String[] keys = new String[]{"a", "b", "c"}; @@ -237,7 +264,6 @@ public class RepartitionOptimizingIntegrationTest { return keyValueList; } - private static class SimpleProcessor extends AbstractProcessor<String, String> { final List<String> valueList; @@ -252,183 +278,183 @@ public class RepartitionOptimizingIntegrationTest { } } - private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n" + " Sub-topology: 0\n" - + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" - + " --> KSTREAM-MAP-0000000001\n" - + " Processor: KSTREAM-MAP-0000000001 (stores: [])\n" - + " --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000040\n" - + " <-- KSTREAM-SOURCE-0000000000\n" - + " Processor: KSTREAM-FILTER-0000000002 (stores: [])\n" - + " --> KSTREAM-MAPVALUES-0000000003\n" - + " <-- KSTREAM-MAP-0000000001\n" - + " Processor: KSTREAM-FILTER-0000000040 (stores: [])\n" - + " --> KSTREAM-SINK-0000000039\n" - + " <-- KSTREAM-MAP-0000000001\n" - + " Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n" - + " --> KSTREAM-PROCESSOR-0000000004\n" - + " <-- KSTREAM-FILTER-0000000002\n" - + " Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n" - + " --> none\n" - + " <-- KSTREAM-MAPVALUES-0000000003\n" - + " Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n" - + " <-- KSTREAM-FILTER-0000000040\n" + + " Source: KSTREAM-SOURCE-0000000036 (topics: [count-groupByKey-repartition])\n" + + " --> aggregate, count, join-filter, reduce-filter\n" + + " Processor: count (stores: [count-store])\n" + + " --> count-toStream\n" + + " <-- KSTREAM-SOURCE-0000000036\n" + + " Processor: count-toStream (stores: [])\n" + + " --> join-other-windowed, count-to\n" + + " <-- count\n" + + " Processor: join-filter (stores: [])\n" + + " --> join-this-windowed\n" + + " <-- KSTREAM-SOURCE-0000000036\n" + + " Processor: reduce-filter (stores: [])\n" + + " --> reduce-peek\n" + + " <-- KSTREAM-SOURCE-0000000036\n" + + " Processor: join-other-windowed (stores: [other-join-store])\n" + + " --> join-other-join\n" + + " <-- count-toStream\n" + + " Processor: join-this-windowed (stores: [join-store])\n" + + " --> join-this-join\n" + + " <-- join-filter\n" + + " Processor: reduce-peek (stores: [])\n" + + " --> reducer\n" + + " <-- reduce-filter\n" + + " Processor: aggregate (stores: [aggregate-store])\n" + + " --> aggregate-toStream\n" + + " <-- KSTREAM-SOURCE-0000000036\n" + + " Processor: join-other-join (stores: [join-store])\n" + + " --> join-merge\n" + + " <-- join-other-windowed\n" + + " Processor: join-this-join (stores: [other-join-store])\n" + + " --> join-merge\n" + + " <-- join-this-windowed\n" + + " Processor: reducer (stores: [reduce-store])\n" + + " --> reduce-toStream\n" + + " <-- reduce-peek\n" + + " Processor: aggregate-toStream (stores: [])\n" + + " --> reduce-to\n" + + " <-- aggregate\n" + + " Processor: join-merge (stores: [])\n" + + " --> join-to\n" + + " <-- join-this-join, join-other-join\n" + + " Processor: reduce-toStream (stores: [])\n" + + " --> KSTREAM-SINK-0000000023\n" + + " <-- reducer\n" + + " Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n" + + " <-- reduce-toStream\n" + + " Sink: count-to (topic: outputTopic_0)\n" + + " <-- count-toStream\n" + + " Sink: join-to (topic: joinedOutputTopic)\n" + + " <-- join-merge\n" + + " Sink: reduce-to (topic: outputTopic_1)\n" + + " <-- aggregate-toStream\n" + "\n" + " Sub-topology: 1\n" - + " Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n" - + " --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n" - + " Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n" - + " --> KTABLE-TOSTREAM-0000000011\n" - + " <-- KSTREAM-SOURCE-0000000041\n" - + " Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" - + " --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" - + " <-- KSTREAM-AGGREGATE-0000000007\n" - + " Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" - + " --> KSTREAM-PEEK-0000000021\n" - + " <-- KSTREAM-SOURCE-0000000041\n" - + " Processor: KSTREAM-FILTER-0000000029 (stores: [])\n" - + " --> KSTREAM-WINDOWED-0000000033\n" - + " <-- KSTREAM-SOURCE-0000000041\n" - + " Processor: KSTREAM-PEEK-0000000021 (stores: [])\n" - + " --> KSTREAM-REDUCE-0000000023\n" - + " <-- KSTREAM-FILTER-0000000020\n" - + " Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" - + " --> KSTREAM-JOINTHIS-0000000035\n" - + " <-- KSTREAM-FILTER-0000000029\n" - + " Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" - + " --> KSTREAM-JOINOTHER-0000000036\n" - + " <-- KTABLE-TOSTREAM-0000000011\n" - + " Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n" - + " --> KTABLE-TOSTREAM-0000000018\n" - + " <-- KSTREAM-SOURCE-0000000041\n" - + " Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" - + " --> KSTREAM-MERGE-0000000037\n" - + " <-- KSTREAM-WINDOWED-0000000034\n" - + " Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" - + " --> KSTREAM-MERGE-0000000037\n" - + " <-- KSTREAM-WINDOWED-0000000033\n" - + " Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n" - + " --> KTABLE-TOSTREAM-0000000027\n" - + " <-- KSTREAM-PEEK-0000000021\n" - + " Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" - + " --> KSTREAM-SINK-0000000038\n" - + " <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" - + " Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n" - + " --> KSTREAM-SINK-0000000019\n" - + " <-- KSTREAM-AGGREGATE-0000000014\n" - + " Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n" - + " --> KSTREAM-SINK-0000000028\n" - + " <-- KSTREAM-REDUCE-0000000023\n" - + " Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" - + " <-- KTABLE-TOSTREAM-0000000011\n" - + " Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" - + " <-- KTABLE-TOSTREAM-0000000018\n" - + " Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n" - + " <-- KTABLE-TOSTREAM-0000000027\n" - + " Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n" - + " <-- KSTREAM-MERGE-0000000037\n\n"; + + " Source: sourceStream (topics: [input])\n" + + " --> source-map\n" + + " Processor: source-map (stores: [])\n" + + " --> process-filter, KSTREAM-FILTER-0000000035\n" + + " <-- sourceStream\n" + + " Processor: process-filter (stores: [])\n" + + " --> process-mapValues\n" + + " <-- source-map\n" + + " Processor: KSTREAM-FILTER-0000000035 (stores: [])\n" + + " --> KSTREAM-SINK-0000000034\n" + + " <-- source-map\n" + + " Processor: process-mapValues (stores: [])\n" + + " --> process\n" + + " <-- process-filter\n" + + " Sink: KSTREAM-SINK-0000000034 (topic: count-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000035\n" + + " Processor: process (stores: [])\n" + + " --> none\n" + + " <-- process-mapValues\n\n"; + private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n" + " Sub-topology: 0\n" - + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" - + " --> KSTREAM-MAP-0000000001\n" - + " Processor: KSTREAM-MAP-0000000001 (stores: [])\n" - + " --> KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n" - + " <-- KSTREAM-SOURCE-0000000000\n" - + " Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" - + " --> KSTREAM-PEEK-0000000021\n" - + " <-- KSTREAM-MAP-0000000001\n" - + " Processor: KSTREAM-FILTER-0000000002 (stores: [])\n" - + " --> KSTREAM-MAPVALUES-0000000003\n" - + " <-- KSTREAM-MAP-0000000001\n" - + " Processor: KSTREAM-FILTER-0000000029 (stores: [])\n" - + " --> KSTREAM-FILTER-0000000031\n" - + " <-- KSTREAM-MAP-0000000001\n" - + " Processor: KSTREAM-PEEK-0000000021 (stores: [])\n" - + " --> KSTREAM-FILTER-0000000025\n" - + " <-- KSTREAM-FILTER-0000000020\n" - + " Processor: KSTREAM-FILTER-0000000009 (stores: [])\n" - + " --> KSTREAM-SINK-0000000008\n" - + " <-- KSTREAM-MAP-0000000001\n" - + " Processor: KSTREAM-FILTER-0000000016 (stores: [])\n" - + " --> KSTREAM-SINK-0000000015\n" - + " <-- KSTREAM-MAP-0000000001\n" - + " Processor: KSTREAM-FILTER-0000000025 (stores: [])\n" - + " --> KSTREAM-SINK-0000000024\n" - + " <-- KSTREAM-PEEK-0000000021\n" - + " Processor: KSTREAM-FILTER-0000000031 (stores: [])\n" - + " --> KSTREAM-SINK-0000000030\n" - + " <-- KSTREAM-FILTER-0000000029\n" - + " Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n" - + " --> KSTREAM-PROCESSOR-0000000004\n" - + " <-- KSTREAM-FILTER-0000000002\n" - + " Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n" - + " --> none\n" - + " <-- KSTREAM-MAPVALUES-0000000003\n" - + " Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n" - + " <-- KSTREAM-FILTER-0000000009\n" - + " Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition)\n" - + " <-- KSTREAM-FILTER-0000000016\n" - + " Sink: KSTREAM-SINK-0000000024 (topic: KSTREAM-REDUCE-STATE-STORE-0000000022-repartition)\n" - + " <-- KSTREAM-FILTER-0000000025\n" - + " Sink: KSTREAM-SINK-0000000030 (topic: KSTREAM-FILTER-0000000029-repartition)\n" - + " <-- KSTREAM-FILTER-0000000031\n" + + " Source: KSTREAM-SOURCE-0000000007 (topics: [count-groupByKey-repartition])\n" + + " --> count\n" + + " Processor: count (stores: [count-store])\n" + + " --> count-toStream\n" + + " <-- KSTREAM-SOURCE-0000000007\n" + + " Processor: count-toStream (stores: [])\n" + + " --> join-other-windowed, count-to\n" + + " <-- count\n" + + " Source: KSTREAM-SOURCE-0000000027 (topics: [join-left-repartition])\n" + + " --> join-this-windowed\n" + + " Processor: join-other-windowed (stores: [other-join-store])\n" + + " --> join-other-join\n" + + " <-- count-toStream\n" + + " Processor: join-this-windowed (stores: [join-store])\n" + + " --> join-this-join\n" + + " <-- KSTREAM-SOURCE-0000000027\n" + + " Processor: join-other-join (stores: [join-store])\n" + + " --> join-merge\n" + + " <-- join-other-windowed\n" + + " Processor: join-this-join (stores: [other-join-store])\n" + + " --> join-merge\n" + + " <-- join-this-windowed\n" + + " Processor: join-merge (stores: [])\n" + + " --> join-to\n" + + " <-- join-this-join, join-other-join\n" + + " Sink: count-to (topic: outputTopic_0)\n" + + " <-- count-toStream\n" + + " Sink: join-to (topic: joinedOutputTopic)\n" + + " <-- join-merge\n" + "\n" + " Sub-topology: 1\n" - + " Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n" - + " --> KSTREAM-AGGREGATE-0000000007\n" - + " Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n" - + " --> KTABLE-TOSTREAM-0000000011\n" - + " <-- KSTREAM-SOURCE-0000000010\n" - + " Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" - + " --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" - + " <-- KSTREAM-AGGREGATE-0000000007\n" - + " Source: KSTREAM-SOURCE-0000000032 (topics: [KSTREAM-FILTER-0000000029-repartition])\n" - + " --> KSTREAM-WINDOWED-0000000033\n" - + " Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" - + " --> KSTREAM-JOINTHIS-0000000035\n" - + " <-- KSTREAM-SOURCE-0000000032\n" - + " Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" - + " --> KSTREAM-JOINOTHER-0000000036\n" - + " <-- KTABLE-TOSTREAM-0000000011\n" - + " Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" - + " --> KSTREAM-MERGE-0000000037\n" - + " <-- KSTREAM-WINDOWED-0000000034\n" - + " Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" - + " --> KSTREAM-MERGE-0000000037\n" - + " <-- KSTREAM-WINDOWED-0000000033\n" - + " Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" - + " --> KSTREAM-SINK-0000000038\n" - + " <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" - + " Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" - + " <-- KTABLE-TOSTREAM-0000000011\n" - + " Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n" - + " <-- KSTREAM-MERGE-0000000037\n" + + " Source: KSTREAM-SOURCE-0000000013 (topics: [aggregate-groupByKey-repartition])\n" + + " --> aggregate\n" + + " Processor: aggregate (stores: [aggregate-store])\n" + + " --> aggregate-toStream\n" + + " <-- KSTREAM-SOURCE-0000000013\n" + + " Processor: aggregate-toStream (stores: [])\n" + + " --> reduce-to\n" + + " <-- aggregate\n" + + " Sink: reduce-to (topic: outputTopic_1)\n" + + " <-- aggregate-toStream\n" + "\n" + " Sub-topology: 2\n" - + " Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition])\n" - + " --> KSTREAM-AGGREGATE-0000000014\n" - + " Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n" - + " --> KTABLE-TOSTREAM-0000000018\n" - + " <-- KSTREAM-SOURCE-0000000017\n" - + " Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n" - + " --> KSTREAM-SINK-0000000019\n" - + " <-- KSTREAM-AGGREGATE-0000000014\n" - + " Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" - + " <-- KTABLE-TOSTREAM-0000000018\n" + + " Source: KSTREAM-SOURCE-0000000021 (topics: [reduce-groupByKey-repartition])\n" + + " --> reducer\n" + + " Processor: reducer (stores: [reduce-store])\n" + + " --> reduce-toStream\n" + + " <-- KSTREAM-SOURCE-0000000021\n" + + " Processor: reduce-toStream (stores: [])\n" + + " --> KSTREAM-SINK-0000000023\n" + + " <-- reducer\n" + + " Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n" + + " <-- reduce-toStream\n" + "\n" + " Sub-topology: 3\n" - + " Source: KSTREAM-SOURCE-0000000026 (topics: [KSTREAM-REDUCE-STATE-STORE-0000000022-repartition])\n" - + " --> KSTREAM-REDUCE-0000000023\n" - + " Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n" - + " --> KTABLE-TOSTREAM-0000000027\n" - + " <-- KSTREAM-SOURCE-0000000026\n" - + " Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n" - + " --> KSTREAM-SINK-0000000028\n" - + " <-- KSTREAM-REDUCE-0000000023\n" - + " Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n" - + " <-- KTABLE-TOSTREAM-0000000027\n\n"; + + " Source: sourceStream (topics: [input])\n" + + " --> source-map\n" + + " Processor: source-map (stores: [])\n" + + " --> reduce-filter, process-filter, KSTREAM-FILTER-0000000006, join-filter, KSTREAM-FILTER-0000000012\n" + + " <-- sourceStream\n" + + " Processor: reduce-filter (stores: [])\n" + + " --> reduce-peek\n" + + " <-- source-map\n" + + " Processor: join-filter (stores: [])\n" + + " --> KSTREAM-FILTER-0000000026\n" + + " <-- source-map\n" + + " Processor: process-filter (stores: [])\n" + + " --> process-mapValues\n" + + " <-- source-map\n" + + " Processor: reduce-peek (stores: [])\n" + + " --> KSTREAM-FILTER-0000000020\n" + + " <-- reduce-filter\n" + + " Processor: KSTREAM-FILTER-0000000006 (stores: [])\n" + + " --> KSTREAM-SINK-0000000005\n" + + " <-- source-map\n" + + " Processor: KSTREAM-FILTER-0000000012 (stores: [])\n" + + " --> KSTREAM-SINK-0000000011\n" + + " <-- source-map\n" + + " Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" + + " --> KSTREAM-SINK-0000000019\n" + + " <-- reduce-peek\n" + + " Processor: KSTREAM-FILTER-0000000026 (stores: [])\n" + + " --> KSTREAM-SINK-0000000025\n" + + " <-- join-filter\n" + + " Processor: process-mapValues (stores: [])\n" + + " --> process\n" + + " <-- process-filter\n" + + " Sink: KSTREAM-SINK-0000000005 (topic: count-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000006\n" + + " Sink: KSTREAM-SINK-0000000011 (topic: aggregate-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000012\n" + + " Sink: KSTREAM-SINK-0000000019 (topic: reduce-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000020\n" + + " Sink: KSTREAM-SINK-0000000025 (topic: join-left-repartition)\n" + + " <-- KSTREAM-FILTER-0000000026\n" + + " Processor: process (stores: [])\n" + + " --> none\n" + + " <-- process-mapValues\n\n"; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java similarity index 54% rename from streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java index 473a626..0d081f7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java @@ -15,33 +15,35 @@ * limitations under the License. */ -package org.apache.kafka.streams.integration; +package org.apache.kafka.streams.processor.internals; - -import java.time.Duration; -import kafka.utils.MockTime; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.StreamsTestUtils; -import org.apache.kafka.test.TestUtils; + import org.junit.After; import org.junit.Before; -import org.junit.ClassRule; import org.junit.Test; -import org.junit.experimental.categories.Category; import java.util.ArrayList; import java.util.Arrays; @@ -49,126 +51,142 @@ import java.util.List; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -@Category({IntegrationTest.class}) -public class RepartitionWithMergeOptimizingIntegrationTest { +public class RepartitionWithMergeOptimizingTest { + + private final Logger log = LoggerFactory.getLogger(RepartitionWithMergeOptimizingTest.class); - private static final int NUM_BROKERS = 1; private static final String INPUT_A_TOPIC = "inputA"; private static final String INPUT_B_TOPIC = "inputB"; private static final String COUNT_TOPIC = "outputTopic_0"; - private static final String COUNT_STRING_TOPIC = "outputTopic_1"; - + private static final String STRING_COUNT_TOPIC = "outputTopic_1"; private static final int ONE_REPARTITION_TOPIC = 1; private static final int TWO_REPARTITION_TOPICS = 2; + private final Serializer<String> stringSerializer = new StringSerializer(); + private final Deserializer<String> stringDeserializer = new StringDeserializer(); + private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); private Properties streamsConfiguration; + private TopologyTestDriver topologyTestDriver; - - @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - private final MockTime mockTime = CLUSTER.time; + private final List<KeyValue<String, Long>> expectedCountKeyValues = + Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L)); + private final List<KeyValue<String, String>> expectedStringCountKeyValues = + Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6")); @Before - public void setUp() throws Exception { + public void setUp() { final Properties props = new Properties(); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); streamsConfiguration = StreamsTestUtils.getStreamsConfig( "maybe-optimized-with-merge-test-app", - CLUSTER.bootstrapServers(), + "dummy-bootstrap-servers-config", Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), props); - - CLUSTER.createTopics(COUNT_TOPIC, - COUNT_STRING_TOPIC, - INPUT_A_TOPIC, - INPUT_B_TOPIC); - - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } @After - public void tearDown() throws Exception { - CLUSTER.deleteAllTopicsAndWait(30_000L); + public void tearDown() { + try { + topologyTestDriver.close(); + } catch (final RuntimeException e) { + log.warn("The following exception was thrown while trying to close the TopologyTestDriver (note that " + + "KAFKA-6647 causes this when running on Windows):", e); + } } @Test - public void shouldSendCorrectRecords_OPTIMIZED() throws Exception { - runIntegrationTest(StreamsConfig.OPTIMIZE, - ONE_REPARTITION_TOPIC); + public void shouldSendCorrectRecords_OPTIMIZED() { + runTest(StreamsConfig.OPTIMIZE, ONE_REPARTITION_TOPIC); } @Test - public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception { - runIntegrationTest(StreamsConfig.NO_OPTIMIZATION, - TWO_REPARTITION_TOPICS); + public void shouldSendCorrectResults_NO_OPTIMIZATION() { + runTest(StreamsConfig.NO_OPTIMIZATION, TWO_REPARTITION_TOPICS); } - private void runIntegrationTest(final String optimizationConfig, - final int expectedNumberRepartitionTopics) throws Exception { + private void runTest(final String optimizationConfig, final int expectedNumberRepartitionTopics) { + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig); final StreamsBuilder builder = new StreamsBuilder(); - final KStream<String, String> sourceAStream = builder.stream(INPUT_A_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); + final KStream<String, String> sourceAStream = + builder.stream(INPUT_A_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceAStream")); - final KStream<String, String> sourceBStream = builder.stream(INPUT_B_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); + final KStream<String, String> sourceBStream = + builder.stream(INPUT_B_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceBStream")); - final KStream<String, String> mappedAStream = sourceAStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v)); - final KStream<String, String> mappedBStream = sourceBStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v)); + final KStream<String, String> mappedAStream = + sourceAStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v), Named.as("mappedAStream")); + final KStream<String, String> mappedBStream = + sourceBStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v), Named.as("mappedBStream")); - final KStream<String, String> mergedStream = mappedAStream.merge(mappedBStream); + final KStream<String, String> mergedStream = mappedAStream.merge(mappedBStream, Named.as("mergedStream")); - mergedStream.groupByKey().count().toStream().to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); - mergedStream.groupByKey().count().toStream().mapValues(v -> v.toString()).to(COUNT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + mergedStream + .groupByKey(Grouped.as("long-groupByKey")) + .count(Named.as("long-count"), Materialized.as(Stores.inMemoryKeyValueStore("long-store"))) + .toStream(Named.as("long-toStream")) + .to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()).withName("long-to")); - streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig); + mergedStream + .groupByKey(Grouped.as("string-groupByKey")) + .count(Named.as("string-count"), Materialized.as(Stores.inMemoryKeyValueStore("string-store"))) + .toStream(Named.as("string-toStream")) + .mapValues(v -> v.toString(), Named.as("string-mapValues")) + .to(STRING_COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.String()).withName("string-to")); - final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + final Topology topology = builder.build(streamsConfiguration); - IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_A_TOPIC, getKeyValues(), producerConfig, mockTime); - IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_B_TOPIC, getKeyValues(), producerConfig, mockTime); + topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); - final Properties consumerConfig1 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class); - final Properties consumerConfig2 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + final TestInputTopic<String, String> inputTopicA = topologyTestDriver.createInputTopic(INPUT_A_TOPIC, stringSerializer, stringSerializer); + final TestInputTopic<String, String> inputTopicB = topologyTestDriver.createInputTopic(INPUT_B_TOPIC, stringSerializer, stringSerializer); + + final TestOutputTopic<String, Long> countOutputTopic = topologyTestDriver.createOutputTopic(COUNT_TOPIC, stringDeserializer, new LongDeserializer()); + final TestOutputTopic<String, String> stringCountOutputTopic = topologyTestDriver.createOutputTopic(STRING_COUNT_TOPIC, stringDeserializer, stringDeserializer); + + inputTopicA.pipeKeyValueList(getKeyValues()); + inputTopicB.pipeKeyValueList(getKeyValues()); - final Topology topology = builder.build(streamsConfiguration); final String topologyString = topology.describe().toString(); - System.out.println(topologyString); + // Verify the topology if (optimizationConfig.equals(StreamsConfig.OPTIMIZE)) { assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, topologyString); } else { assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, topologyString); } - - /* - confirming number of expected repartition topics here - */ + // Verify the number of repartition topics assertEquals(expectedNumberRepartitionTopics, getCountOfRepartitionTopicsFound(topologyString)); - final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); - streams.start(); - - final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L)); - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues); - - final List<KeyValue<String, String>> expectedStringCountKeyValues = Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6")); - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, expectedStringCountKeyValues); - - streams.close(Duration.ofSeconds(5)); + // Verify the expected output + assertThat(countOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedCountKeyValues))); + assertThat(stringCountOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedStringCountKeyValues))); } + private <K, V> Map<K, V> keyValueListToMap(final List<KeyValue<K, V>> keyValuePairs) { + final Map<K, V> map = new HashMap<>(); + for (final KeyValue<K, V> pair : keyValuePairs) { + map.put(pair.key, pair.value); + } + return map; + } private int getCountOfRepartitionTopicsFound(final String topologyString) { final Matcher matcher = repartitionTopicPattern.matcher(topologyString); @@ -179,7 +197,6 @@ public class RepartitionWithMergeOptimizingIntegrationTest { return repartitionTopicsFound.size(); } - private List<KeyValue<String, String>> getKeyValues() { final List<KeyValue<String, String>> keyValueList = new ArrayList<>(); final String[] keys = new String[]{"X", "Y", "Z"}; @@ -192,104 +209,103 @@ public class RepartitionWithMergeOptimizingIntegrationTest { return keyValueList; } - - private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n" + " Sub-topology: 0\n" - + " Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n" - + " --> KSTREAM-MAP-0000000002\n" - + " Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n" - + " --> KSTREAM-MAP-0000000003\n" - + " Processor: KSTREAM-MAP-0000000002 (stores: [])\n" - + " --> KSTREAM-MERGE-0000000004\n" - + " <-- KSTREAM-SOURCE-0000000000\n" - + " Processor: KSTREAM-MAP-0000000003 (stores: [])\n" - + " --> KSTREAM-MERGE-0000000004\n" - + " <-- KSTREAM-SOURCE-0000000001\n" - + " Processor: KSTREAM-MERGE-0000000004 (stores: [])\n" - + " --> KSTREAM-FILTER-0000000021\n" - + " <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n" - + " Processor: KSTREAM-FILTER-0000000021 (stores: [])\n" - + " --> KSTREAM-SINK-0000000020\n" - + " <-- KSTREAM-MERGE-0000000004\n" - + " Sink: KSTREAM-SINK-0000000020 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n" - + " <-- KSTREAM-FILTER-0000000021\n" + + " Source: KSTREAM-SOURCE-0000000020 (topics: [long-groupByKey-repartition])\n" + + " --> long-count, string-count\n" + + " Processor: string-count (stores: [string-store])\n" + + " --> string-toStream\n" + + " <-- KSTREAM-SOURCE-0000000020\n" + + " Processor: long-count (stores: [long-store])\n" + + " --> long-toStream\n" + + " <-- KSTREAM-SOURCE-0000000020\n" + + " Processor: string-toStream (stores: [])\n" + + " --> string-mapValues\n" + + " <-- string-count\n" + + " Processor: long-toStream (stores: [])\n" + + " --> long-to\n" + + " <-- long-count\n" + + " Processor: string-mapValues (stores: [])\n" + + " --> string-to\n" + + " <-- string-toStream\n" + + " Sink: long-to (topic: outputTopic_0)\n" + + " <-- long-toStream\n" + + " Sink: string-to (topic: outputTopic_1)\n" + + " <-- string-mapValues\n" + "\n" + " Sub-topology: 1\n" - + " Source: KSTREAM-SOURCE-0000000022 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n" - + " --> KSTREAM-AGGREGATE-0000000006, KSTREAM-AGGREGATE-0000000013\n" - + " Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n" - + " --> KTABLE-TOSTREAM-0000000017\n" - + " <-- KSTREAM-SOURCE-0000000022\n" - + " Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n" - + " --> KTABLE-TOSTREAM-0000000010\n" - + " <-- KSTREAM-SOURCE-0000000022\n" - + " Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n" - + " --> KSTREAM-MAPVALUES-0000000018\n" - + " <-- KSTREAM-AGGREGATE-0000000013\n" - + " Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n" - + " --> KSTREAM-SINK-0000000019\n" - + " <-- KTABLE-TOSTREAM-0000000017\n" - + " Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n" - + " --> KSTREAM-SINK-0000000011\n" - + " <-- KSTREAM-AGGREGATE-0000000006\n" - + " Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n" - + " <-- KTABLE-TOSTREAM-0000000010\n" - + " Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" - + " <-- KSTREAM-MAPVALUES-0000000018\n\n"; + + " Source: sourceAStream (topics: [inputA])\n" + + " --> mappedAStream\n" + + " Source: sourceBStream (topics: [inputB])\n" + + " --> mappedBStream\n" + + " Processor: mappedAStream (stores: [])\n" + + " --> mergedStream\n" + + " <-- sourceAStream\n" + + " Processor: mappedBStream (stores: [])\n" + + " --> mergedStream\n" + + " <-- sourceBStream\n" + + " Processor: mergedStream (stores: [])\n" + + " --> KSTREAM-FILTER-0000000019\n" + + " <-- mappedAStream, mappedBStream\n" + + " Processor: KSTREAM-FILTER-0000000019 (stores: [])\n" + + " --> KSTREAM-SINK-0000000018\n" + + " <-- mergedStream\n" + + " Sink: KSTREAM-SINK-0000000018 (topic: long-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000019\n\n"; private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n" + " Sub-topology: 0\n" - + " Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n" - + " --> KSTREAM-MAP-0000000002\n" - + " Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n" - + " --> KSTREAM-MAP-0000000003\n" - + " Processor: KSTREAM-MAP-0000000002 (stores: [])\n" - + " --> KSTREAM-MERGE-0000000004\n" - + " <-- KSTREAM-SOURCE-0000000000\n" - + " Processor: KSTREAM-MAP-0000000003 (stores: [])\n" - + " --> KSTREAM-MERGE-0000000004\n" - + " <-- KSTREAM-SOURCE-0000000001\n" - + " Processor: KSTREAM-MERGE-0000000004 (stores: [])\n" - + " --> KSTREAM-FILTER-0000000008, KSTREAM-FILTER-0000000015\n" - + " <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n" - + " Processor: KSTREAM-FILTER-0000000008 (stores: [])\n" - + " --> KSTREAM-SINK-0000000007\n" - + " <-- KSTREAM-MERGE-0000000004\n" - + " Processor: KSTREAM-FILTER-0000000015 (stores: [])\n" - + " --> KSTREAM-SINK-0000000014\n" - + " <-- KSTREAM-MERGE-0000000004\n" - + " Sink: KSTREAM-SINK-0000000007 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n" - + " <-- KSTREAM-FILTER-0000000008\n" - + " Sink: KSTREAM-SINK-0000000014 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition)\n" - + " <-- KSTREAM-FILTER-0000000015\n" + + " Source: KSTREAM-SOURCE-0000000008 (topics: [long-groupByKey-repartition])\n" + + " --> long-count\n" + + " Processor: long-count (stores: [long-store])\n" + + " --> long-toStream\n" + + " <-- KSTREAM-SOURCE-0000000008\n" + + " Processor: long-toStream (stores: [])\n" + + " --> long-to\n" + + " <-- long-count\n" + + " Sink: long-to (topic: outputTopic_0)\n" + + " <-- long-toStream\n" + "\n" + " Sub-topology: 1\n" - + " Source: KSTREAM-SOURCE-0000000009 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n" - + " --> KSTREAM-AGGREGATE-0000000006\n" - + " Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n" - + " --> KTABLE-TOSTREAM-0000000010\n" - + " <-- KSTREAM-SOURCE-0000000009\n" - + " Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n" - + " --> KSTREAM-SINK-0000000011\n" - + " <-- KSTREAM-AGGREGATE-0000000006\n" - + " Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n" - + " <-- KTABLE-TOSTREAM-0000000010\n" + + " Source: KSTREAM-SOURCE-0000000014 (topics: [string-groupByKey-repartition])\n" + + " --> string-count\n" + + " Processor: string-count (stores: [string-store])\n" + + " --> string-toStream\n" + + " <-- KSTREAM-SOURCE-0000000014\n" + + " Processor: string-toStream (stores: [])\n" + + " --> string-mapValues\n" + + " <-- string-count\n" + + " Processor: string-mapValues (stores: [])\n" + + " --> string-to\n" + + " <-- string-toStream\n" + + " Sink: string-to (topic: outputTopic_1)\n" + + " <-- string-mapValues\n" + "\n" + " Sub-topology: 2\n" - + " Source: KSTREAM-SOURCE-0000000016 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition])\n" - + " --> KSTREAM-AGGREGATE-0000000013\n" - + " Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n" - + " --> KTABLE-TOSTREAM-0000000017\n" - + " <-- KSTREAM-SOURCE-0000000016\n" - + " Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n" - + " --> KSTREAM-MAPVALUES-0000000018\n" - + " <-- KSTREAM-AGGREGATE-0000000013\n" - + " Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n" - + " --> KSTREAM-SINK-0000000019\n" - + " <-- KTABLE-TOSTREAM-0000000017\n" - + " Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" - + " <-- KSTREAM-MAPVALUES-0000000018\n\n"; + + " Source: sourceAStream (topics: [inputA])\n" + + " --> mappedAStream\n" + + " Source: sourceBStream (topics: [inputB])\n" + + " --> mappedBStream\n" + + " Processor: mappedAStream (stores: [])\n" + + " --> mergedStream\n" + + " <-- sourceAStream\n" + + " Processor: mappedBStream (stores: [])\n" + + " --> mergedStream\n" + + " <-- sourceBStream\n" + + " Processor: mergedStream (stores: [])\n" + + " --> KSTREAM-FILTER-0000000007, KSTREAM-FILTER-0000000013\n" + + " <-- mappedAStream, mappedBStream\n" + + " Processor: KSTREAM-FILTER-0000000007 (stores: [])\n" + + " --> KSTREAM-SINK-0000000006\n" + + " <-- mergedStream\n" + + " Processor: KSTREAM-FILTER-0000000013 (stores: [])\n" + + " --> KSTREAM-SINK-0000000012\n" + + " <-- mergedStream\n" + + " Sink: KSTREAM-SINK-0000000006 (topic: long-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000007\n" + + " Sink: KSTREAM-SINK-0000000012 (topic: string-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000013\n\n"; + } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java index 25c642e..4dd4954 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java @@ -104,27 +104,22 @@ public class StreamsBrokerDownResilienceTest { final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties); - streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { + streams.setUncaughtExceptionHandler( (t,e) -> { System.err.println("FATAL: An unexpected exception " + e); System.err.flush(); streams.close(Duration.ofSeconds(30)); } - }); + ); + System.out.println("Start Kafka Streams"); streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { + Runtime.getRuntime().addShutdownHook(new Thread( () -> { streams.close(Duration.ofSeconds(30)); System.out.println("Complete shutdown of streams resilience test app now"); System.out.flush(); } - })); - - + )); } private static boolean confirmCorrectConfigs(final Properties properties) { diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index ab57090..00dbe35 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -441,7 +441,7 @@ class StreamsUpgradeTest(Test): if upgrade_from is None: # upgrade disabled -- second round of rolling bounces roll_counter = ".1-" # second round of rolling bounces else: - roll_counter = ".0-" # first round of rolling boundes + roll_counter = ".0-" # first round of rolling bounces node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False) node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)