This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 3865f63  KAFKA-9523: Migrate 
BranchedMultiLevelRepartitionConnectedTopologyTest into a unit test (#8081)
3865f63 is described below

commit 3865f636c89ba9c063bf707e358223b5e7c3629f
Author: Boyang Chen <boy...@confluent.io>
AuthorDate: Mon Feb 10 13:09:26 2020 -0800

    KAFKA-9523: Migrate BranchedMultiLevelRepartitionConnectedTopologyTest into 
a unit test (#8081)
    
    Relying on integration test to catch an algorithm bug introduces more 
flakiness, reduce the test into a unit test to reduce the flakiness until we 
upgrade Java/Scala libs.
    
    Checked the test shall fail with older version of StreamsPartitionAssignor.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 ...MultiLevelRepartitionConnectedTopologyTest.java | 151 ---------------------
 .../internals/StreamsPartitionAssignorTest.java    |  86 ++++++++++++
 2 files changed, 86 insertions(+), 151 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/BranchedMultiLevelRepartitionConnectedTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/BranchedMultiLevelRepartitionConnectedTopologyTest.java
deleted file mode 100644
index cfff070..0000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/BranchedMultiLevelRepartitionConnectedTopologyTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import kafka.utils.MockTime;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Test out a topology with 3 level of sub-topology as:
- *           0
- *         /   \
- *        1    3
- *         \  /
- *          2
- * where each pair of the sub topology is connected by repartition topic.
- * The purpose of this test is to verify the robustness of the stream 
partition assignor algorithm,
- * especially whether it could build the repartition topic counts (step zero) 
with a complex topology.
- * The traversal path 0 -> 1 -> 2 -> 3 hits the case where sub-topology 2 will 
be initialized while its
- * parent 3 hasn't been initialized yet.
- */
-@Category({IntegrationTest.class})
-public class BranchedMultiLevelRepartitionConnectedTopologyTest {
-
-    private static final Logger log = 
LoggerFactory.getLogger(BranchedMultiLevelRepartitionConnectedTopologyTest.class);
-
-    private static String inputStream;
-
-    private KafkaStreams kafkaStreams;
-
-    private Properties streamsConfiguration;
-
-    @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
-    private final MockTime mockTime = CLUSTER.time;
-
-    @Before
-    public void setUp() throws Exception {
-        final Properties props = new Properties();
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
-        streamsConfiguration = StreamsTestUtils.getStreamsConfig(
-            "branched-repartition-topic-test",
-            CLUSTER.bootstrapServers(),
-            Serdes.ByteArray().getClass().getName(),
-            Serdes.ByteArray().getClass().getName(),
-            props);
-
-        inputStream = "input-stream";
-        CLUSTER.createTopic(inputStream, 3, 1);
-
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-    }
-
-    @Test
-    @SuppressWarnings("unchecked")
-    public void testTopologyBuild() throws InterruptedException, 
ExecutionException {
-
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KStream<byte[], byte[]> input = builder.stream(inputStream);
-
-        final KStream<byte[], byte[]>[] branches = input
-            .flatMapValues(value -> Collections.singletonList(new byte[0]))
-            .branch((k, v) -> true, (k, v) -> false);
-
-        final KTable<byte[], byte[]> b1 = branches[0]
-            .map(KeyValue::new)
-            .groupByKey()
-            .reduce((k, v) -> v, Materialized.as("odd_store"))
-            .toStream()
-            .peek((k, v) -> { })
-            .map(KeyValue::new)
-            .groupByKey()
-            .reduce((k, v) -> v, Materialized.as("odd_store_2"));
-
-        final KTable<byte[], byte[]> b2 = branches[1]
-            .map(KeyValue::new)
-            .groupByKey()
-            .reduce((k, v) -> v, Materialized.as("even_store"))
-            .toStream()
-            .peek((k, v) -> { })
-            .map(KeyValue::new)
-            .groupByKey()
-            .reduce((k, v) -> v, Materialized.as("even_store_2"));
-
-        b1.join(b2, (v1, v2) -> v1, Materialized.as("joined_store"))
-            .toStream();
-
-        final Topology topology = builder.build(streamsConfiguration);
-        log.info("Built topology: {}", topology.describe());
-
-        final Properties producerConfig = TestUtils.producerConfig(
-            CLUSTER.bootstrapServers(), ByteArraySerializer.class, 
ByteArraySerializer.class);
-
-        final List<KeyValue<byte[], byte[]>> initialKeyValues = 
Collections.singletonList(
-            KeyValue.pair(new byte[1], new byte[1]));
-
-        IntegrationTestUtils.produceKeyValuesSynchronously(
-            inputStream, initialKeyValues, producerConfig, mockTime);
-
-        kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
-
-        kafkaStreams.cleanUp();
-        kafkaStreams.start();
-
-        TestUtils.waitForCondition(() -> kafkaStreams.state() == 
KafkaStreams.State.RUNNING,
-                                   "Failed to observe stream transits to 
RUNNING");
-
-        kafkaStreams.close();
-    }
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index ffc422e..12fc9c7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -1692,6 +1692,92 @@ public class StreamsPartitionAssignorTest {
         
shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
     }
 
+    @Test
+    public void 
shouldNotFailOnBranchedMultiLevelRepartitionConnectedTopology() {
+        // Test out a topology with 3 level of sub-topology as:
+        //            0
+        //          /   \
+        //         1    3
+        //          \  /
+        //           2
+        //  where each pair of the sub topology is connected by repartition 
topic.
+        //  The purpose of this test is to verify the robustness of the stream 
partition assignor algorithm,
+        //  especially whether it could build the repartition topic counts 
(step zero) with a complex topology.
+        //  The traversal path 0 -> 1 -> 2 -> 3 hits the case where 
sub-topology 2 will be initialized while its
+        //  parent 3 hasn't been initialized yet.
+        final String applicationId = "test";
+        builder.setApplicationId(applicationId);
+        builder.addSource(null, "KSTREAM-SOURCE-0000000000",  null, null, 
null, "input-stream");
+        builder.addProcessor("KSTREAM-FLATMAPVALUES-0000000001", new 
MockProcessorSupplier(), "KSTREAM-SOURCE-0000000000");
+        builder.addProcessor("KSTREAM-BRANCH-0000000002", new 
MockProcessorSupplier(), "KSTREAM-FLATMAPVALUES-0000000001");
+        builder.addProcessor("KSTREAM-BRANCHCHILD-0000000003", new 
MockProcessorSupplier(), "KSTREAM-BRANCH-0000000002");
+        builder.addProcessor("KSTREAM-BRANCHCHILD-0000000004", new 
MockProcessorSupplier(), "KSTREAM-BRANCH-0000000002");
+        builder.addProcessor("KSTREAM-MAP-0000000005", new 
MockProcessorSupplier(), "KSTREAM-BRANCHCHILD-0000000003");
+
+        builder.addInternalTopic("odd_store-repartition");
+        builder.addProcessor("odd_store-repartition-filter", new 
MockProcessorSupplier(), "KSTREAM-MAP-0000000005");
+        builder.addSink("odd_store-repartition-sink", "odd_store-repartition", 
null, null, null, "odd_store-repartition-filter");
+        builder.addSource(null, "odd_store-repartition-source", null, null, 
null, "odd_store-repartition");
+        builder.addProcessor("KSTREAM-REDUCE-0000000006", new 
MockProcessorSupplier(), "odd_store-repartition-source");
+        builder.addProcessor("KTABLE-TOSTREAM-0000000010", new 
MockProcessorSupplier(), "KSTREAM-REDUCE-0000000006");
+        builder.addProcessor("KSTREAM-PEEK-0000000011", new 
MockProcessorSupplier(), "KTABLE-TOSTREAM-0000000010");
+        builder.addProcessor("KSTREAM-MAP-0000000012", new 
MockProcessorSupplier(), "KSTREAM-PEEK-0000000011");
+
+        builder.addInternalTopic("odd_store_2-repartition");
+        builder.addProcessor("odd_store_2-repartition-filter", new 
MockProcessorSupplier(), "KSTREAM-MAP-0000000012");
+        builder.addSink("odd_store_2-repartition-sink", 
"odd_store_2-repartition", null, null, null, "odd_store_2-repartition-filter");
+        builder.addSource(null, "odd_store_2-repartition-source", null, null, 
null, "odd_store_2-repartition");
+        builder.addProcessor("KSTREAM-REDUCE-0000000013", new 
MockProcessorSupplier(), "odd_store_2-repartition-source");
+        builder.addProcessor("KSTREAM-MAP-0000000017", new 
MockProcessorSupplier(), "KSTREAM-BRANCHCHILD-0000000004");
+
+        builder.addInternalTopic("even_store-repartition");
+        builder.addProcessor("even_store-repartition-filter", new 
MockProcessorSupplier(), "KSTREAM-MAP-0000000017");
+        builder.addSink("even_store-repartition-sink", 
"even_store-repartition", null, null, null, "even_store-repartition-filter");
+        builder.addSource(null, "even_store-repartition-source", null, null, 
null, "even_store-repartition");
+        builder.addProcessor("KSTREAM-REDUCE-0000000018", new 
MockProcessorSupplier(), "even_store-repartition-source");
+        builder.addProcessor("KTABLE-TOSTREAM-0000000022", new 
MockProcessorSupplier(), "KSTREAM-REDUCE-0000000018");
+        builder.addProcessor("KSTREAM-PEEK-0000000023", new 
MockProcessorSupplier(), "KTABLE-TOSTREAM-0000000022");
+        builder.addProcessor("KSTREAM-MAP-0000000024", new 
MockProcessorSupplier(), "KSTREAM-PEEK-0000000023");
+
+        builder.addInternalTopic("even_store_2-repartition");
+        builder.addProcessor("even_store_2-repartition-filter", new 
MockProcessorSupplier(), "KSTREAM-MAP-0000000024");
+        builder.addSink("even_store_2-repartition-sink", 
"even_store_2-repartition", null, null, null, 
"even_store_2-repartition-filter");
+        builder.addSource(null, "even_store_2-repartition-source", null, null, 
null, "even_store_2-repartition");
+        builder.addProcessor("KSTREAM-REDUCE-0000000025", new 
MockProcessorSupplier(), "even_store_2-repartition-source");
+        builder.addProcessor("KTABLE-JOINTHIS-0000000030", new 
MockProcessorSupplier(), "KSTREAM-REDUCE-0000000013");
+        builder.addProcessor("KTABLE-JOINOTHER-0000000031", new 
MockProcessorSupplier(), "KSTREAM-REDUCE-0000000025");
+        builder.addProcessor("KTABLE-MERGE-0000000029", new 
MockProcessorSupplier(), "KTABLE-JOINTHIS-0000000030", 
"KTABLE-JOINOTHER-0000000031");
+        builder.addProcessor("KTABLE-TOSTREAM-0000000032", new 
MockProcessorSupplier(), "KTABLE-MERGE-0000000029");
+
+        final List<String> topics = asList("input-stream", 
"test-even_store-repartition", "test-even_store_2-repartition", 
"test-odd_store-repartition", "test-odd_store_2-repartition");
+
+        final UUID uuid1 = UUID.randomUUID();
+        createMockTaskManager(emptyTasks, emptyTasks, uuid1, builder);
+        EasyMock.replay(taskManager);
+
+        streamsMetadataState = 
EasyMock.createNiceMock(StreamsMetadataState.class);
+        configurePartitionAssignor(emptyMap());
+        final MockInternalTopicManager internalTopicManager =
+            new MockInternalTopicManager(streamsConfig, 
mockClientSupplier.restoreConsumer);
+        partitionAssignor.setInternalTopicManager(internalTopicManager);
+
+        subscriptions.put("consumer10",
+            new ConsumerPartitionAssignor.Subscription(
+                topics,
+                getInfo(uuid1, emptyTasks, emptyTasks).encode())
+        );
+
+        final Cluster metadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            Collections.singletonList(new PartitionInfo("input-stream", 0, 
Node.noNode(), new Node[0], new Node[0])),
+            Collections.emptySet(),
+            Collections.emptySet());
+
+        // This shall fail if we have bugs in the repartition topic creation 
due to the inconsistent order of sub-topologies.
+        partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions));
+    }
+
     private static ByteBuffer encodeFutureSubscription() {
         final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* 
supported version */);
         buf.putInt(LATEST_SUPPORTED_VERSION + 1);

Reply via email to