This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push: new 3865f63 KAFKA-9523: Migrate BranchedMultiLevelRepartitionConnectedTopologyTest into a unit test (#8081) 3865f63 is described below commit 3865f636c89ba9c063bf707e358223b5e7c3629f Author: Boyang Chen <boy...@confluent.io> AuthorDate: Mon Feb 10 13:09:26 2020 -0800 KAFKA-9523: Migrate BranchedMultiLevelRepartitionConnectedTopologyTest into a unit test (#8081) Relying on integration test to catch an algorithm bug introduces more flakiness, reduce the test into a unit test to reduce the flakiness until we upgrade Java/Scala libs. Checked the test shall fail with older version of StreamsPartitionAssignor. Reviewers: Guozhang Wang <wangg...@gmail.com> --- ...MultiLevelRepartitionConnectedTopologyTest.java | 151 --------------------- .../internals/StreamsPartitionAssignorTest.java | 86 ++++++++++++ 2 files changed, 86 insertions(+), 151 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/BranchedMultiLevelRepartitionConnectedTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/BranchedMultiLevelRepartitionConnectedTopologyTest.java deleted file mode 100644 index cfff070..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/BranchedMultiLevelRepartitionConnectedTopologyTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration; - -import kafka.utils.MockTime; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.StreamsTestUtils; -import org.apache.kafka.test.TestUtils; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutionException; - -/** - * Test out a topology with 3 level of sub-topology as: - * 0 - * / \ - * 1 3 - * \ / - * 2 - * where each pair of the sub topology is connected by repartition topic. - * The purpose of this test is to verify the robustness of the stream partition assignor algorithm, - * especially whether it could build the repartition topic counts (step zero) with a complex topology. - * The traversal path 0 -> 1 -> 2 -> 3 hits the case where sub-topology 2 will be initialized while its - * parent 3 hasn't been initialized yet. - */ -@Category({IntegrationTest.class}) -public class BranchedMultiLevelRepartitionConnectedTopologyTest { - - private static final Logger log = LoggerFactory.getLogger(BranchedMultiLevelRepartitionConnectedTopologyTest.class); - - private static String inputStream; - - private KafkaStreams kafkaStreams; - - private Properties streamsConfiguration; - - @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - private final MockTime mockTime = CLUSTER.time; - - @Before - public void setUp() throws Exception { - final Properties props = new Properties(); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); - streamsConfiguration = StreamsTestUtils.getStreamsConfig( - "branched-repartition-topic-test", - CLUSTER.bootstrapServers(), - Serdes.ByteArray().getClass().getName(), - Serdes.ByteArray().getClass().getName(), - props); - - inputStream = "input-stream"; - CLUSTER.createTopic(inputStream, 3, 1); - - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); - } - - @Test - @SuppressWarnings("unchecked") - public void testTopologyBuild() throws InterruptedException, ExecutionException { - - final StreamsBuilder builder = new StreamsBuilder(); - - final KStream<byte[], byte[]> input = builder.stream(inputStream); - - final KStream<byte[], byte[]>[] branches = input - .flatMapValues(value -> Collections.singletonList(new byte[0])) - .branch((k, v) -> true, (k, v) -> false); - - final KTable<byte[], byte[]> b1 = branches[0] - .map(KeyValue::new) - .groupByKey() - .reduce((k, v) -> v, Materialized.as("odd_store")) - .toStream() - .peek((k, v) -> { }) - .map(KeyValue::new) - .groupByKey() - .reduce((k, v) -> v, Materialized.as("odd_store_2")); - - final KTable<byte[], byte[]> b2 = branches[1] - .map(KeyValue::new) - .groupByKey() - .reduce((k, v) -> v, Materialized.as("even_store")) - .toStream() - .peek((k, v) -> { }) - .map(KeyValue::new) - .groupByKey() - .reduce((k, v) -> v, Materialized.as("even_store_2")); - - b1.join(b2, (v1, v2) -> v1, Materialized.as("joined_store")) - .toStream(); - - final Topology topology = builder.build(streamsConfiguration); - log.info("Built topology: {}", topology.describe()); - - final Properties producerConfig = TestUtils.producerConfig( - CLUSTER.bootstrapServers(), ByteArraySerializer.class, ByteArraySerializer.class); - - final List<KeyValue<byte[], byte[]>> initialKeyValues = Collections.singletonList( - KeyValue.pair(new byte[1], new byte[1])); - - IntegrationTestUtils.produceKeyValuesSynchronously( - inputStream, initialKeyValues, producerConfig, mockTime); - - kafkaStreams = new KafkaStreams(topology, streamsConfiguration); - - kafkaStreams.cleanUp(); - kafkaStreams.start(); - - TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, - "Failed to observe stream transits to RUNNING"); - - kafkaStreams.close(); - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index ffc422e..12fc9c7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -1692,6 +1692,92 @@ public class StreamsPartitionAssignorTest { shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2); } + @Test + public void shouldNotFailOnBranchedMultiLevelRepartitionConnectedTopology() { + // Test out a topology with 3 level of sub-topology as: + // 0 + // / \ + // 1 3 + // \ / + // 2 + // where each pair of the sub topology is connected by repartition topic. + // The purpose of this test is to verify the robustness of the stream partition assignor algorithm, + // especially whether it could build the repartition topic counts (step zero) with a complex topology. + // The traversal path 0 -> 1 -> 2 -> 3 hits the case where sub-topology 2 will be initialized while its + // parent 3 hasn't been initialized yet. + final String applicationId = "test"; + builder.setApplicationId(applicationId); + builder.addSource(null, "KSTREAM-SOURCE-0000000000", null, null, null, "input-stream"); + builder.addProcessor("KSTREAM-FLATMAPVALUES-0000000001", new MockProcessorSupplier(), "KSTREAM-SOURCE-0000000000"); + builder.addProcessor("KSTREAM-BRANCH-0000000002", new MockProcessorSupplier(), "KSTREAM-FLATMAPVALUES-0000000001"); + builder.addProcessor("KSTREAM-BRANCHCHILD-0000000003", new MockProcessorSupplier(), "KSTREAM-BRANCH-0000000002"); + builder.addProcessor("KSTREAM-BRANCHCHILD-0000000004", new MockProcessorSupplier(), "KSTREAM-BRANCH-0000000002"); + builder.addProcessor("KSTREAM-MAP-0000000005", new MockProcessorSupplier(), "KSTREAM-BRANCHCHILD-0000000003"); + + builder.addInternalTopic("odd_store-repartition"); + builder.addProcessor("odd_store-repartition-filter", new MockProcessorSupplier(), "KSTREAM-MAP-0000000005"); + builder.addSink("odd_store-repartition-sink", "odd_store-repartition", null, null, null, "odd_store-repartition-filter"); + builder.addSource(null, "odd_store-repartition-source", null, null, null, "odd_store-repartition"); + builder.addProcessor("KSTREAM-REDUCE-0000000006", new MockProcessorSupplier(), "odd_store-repartition-source"); + builder.addProcessor("KTABLE-TOSTREAM-0000000010", new MockProcessorSupplier(), "KSTREAM-REDUCE-0000000006"); + builder.addProcessor("KSTREAM-PEEK-0000000011", new MockProcessorSupplier(), "KTABLE-TOSTREAM-0000000010"); + builder.addProcessor("KSTREAM-MAP-0000000012", new MockProcessorSupplier(), "KSTREAM-PEEK-0000000011"); + + builder.addInternalTopic("odd_store_2-repartition"); + builder.addProcessor("odd_store_2-repartition-filter", new MockProcessorSupplier(), "KSTREAM-MAP-0000000012"); + builder.addSink("odd_store_2-repartition-sink", "odd_store_2-repartition", null, null, null, "odd_store_2-repartition-filter"); + builder.addSource(null, "odd_store_2-repartition-source", null, null, null, "odd_store_2-repartition"); + builder.addProcessor("KSTREAM-REDUCE-0000000013", new MockProcessorSupplier(), "odd_store_2-repartition-source"); + builder.addProcessor("KSTREAM-MAP-0000000017", new MockProcessorSupplier(), "KSTREAM-BRANCHCHILD-0000000004"); + + builder.addInternalTopic("even_store-repartition"); + builder.addProcessor("even_store-repartition-filter", new MockProcessorSupplier(), "KSTREAM-MAP-0000000017"); + builder.addSink("even_store-repartition-sink", "even_store-repartition", null, null, null, "even_store-repartition-filter"); + builder.addSource(null, "even_store-repartition-source", null, null, null, "even_store-repartition"); + builder.addProcessor("KSTREAM-REDUCE-0000000018", new MockProcessorSupplier(), "even_store-repartition-source"); + builder.addProcessor("KTABLE-TOSTREAM-0000000022", new MockProcessorSupplier(), "KSTREAM-REDUCE-0000000018"); + builder.addProcessor("KSTREAM-PEEK-0000000023", new MockProcessorSupplier(), "KTABLE-TOSTREAM-0000000022"); + builder.addProcessor("KSTREAM-MAP-0000000024", new MockProcessorSupplier(), "KSTREAM-PEEK-0000000023"); + + builder.addInternalTopic("even_store_2-repartition"); + builder.addProcessor("even_store_2-repartition-filter", new MockProcessorSupplier(), "KSTREAM-MAP-0000000024"); + builder.addSink("even_store_2-repartition-sink", "even_store_2-repartition", null, null, null, "even_store_2-repartition-filter"); + builder.addSource(null, "even_store_2-repartition-source", null, null, null, "even_store_2-repartition"); + builder.addProcessor("KSTREAM-REDUCE-0000000025", new MockProcessorSupplier(), "even_store_2-repartition-source"); + builder.addProcessor("KTABLE-JOINTHIS-0000000030", new MockProcessorSupplier(), "KSTREAM-REDUCE-0000000013"); + builder.addProcessor("KTABLE-JOINOTHER-0000000031", new MockProcessorSupplier(), "KSTREAM-REDUCE-0000000025"); + builder.addProcessor("KTABLE-MERGE-0000000029", new MockProcessorSupplier(), "KTABLE-JOINTHIS-0000000030", "KTABLE-JOINOTHER-0000000031"); + builder.addProcessor("KTABLE-TOSTREAM-0000000032", new MockProcessorSupplier(), "KTABLE-MERGE-0000000029"); + + final List<String> topics = asList("input-stream", "test-even_store-repartition", "test-even_store_2-repartition", "test-odd_store-repartition", "test-odd_store_2-repartition"); + + final UUID uuid1 = UUID.randomUUID(); + createMockTaskManager(emptyTasks, emptyTasks, uuid1, builder); + EasyMock.replay(taskManager); + + streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class); + configurePartitionAssignor(emptyMap()); + final MockInternalTopicManager internalTopicManager = + new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); + partitionAssignor.setInternalTopicManager(internalTopicManager); + + subscriptions.put("consumer10", + new ConsumerPartitionAssignor.Subscription( + topics, + getInfo(uuid1, emptyTasks, emptyTasks).encode()) + ); + + final Cluster metadata = new Cluster( + "cluster", + Collections.singletonList(Node.noNode()), + Collections.singletonList(new PartitionInfo("input-stream", 0, Node.noNode(), new Node[0], new Node[0])), + Collections.emptySet(), + Collections.emptySet()); + + // This shall fail if we have bugs in the repartition topic creation due to the inconsistent order of sub-topologies. + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); + } + private static ByteBuffer encodeFutureSubscription() { final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */); buf.putInt(LATEST_SUPPORTED_VERSION + 1);