This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 389a01050303c53179b5f5f9bdcd7087e1a450c7
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Nov 17 16:57:53 2020 -0800

    KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid 
infinite loops (#9568)
    
    Fix infinite loop in assignor when trying to resolve the number of 
partitions in a topology with a windowed FKJ. Also adds a check to this loop to 
break out and fail the application if we detect that we are/will be stuck in an 
infinite loop
    
    Reviewers: Matthias Sax <[email protected]>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../kstream/internals/graph/StreamSinkNode.java    | 19 +++---
 .../internals/StreamsPartitionAssignor.java        | 29 +++++----
 .../integration/InternalTopicIntegrationTest.java  | 39 +++++++++++-
 .../internals/StreamsPartitionAssignorTest.java    | 69 +++++++++++++++++++++-
 5 files changed, 135 insertions(+), 23 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index d9eff63..4ff500b 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -185,7 +185,7 @@
 
     <!-- Streams tests -->
     <suppress checks="ClassFanOutComplexity"
-              
files="(StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>
+              
files="(StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>
 
     <suppress checks="MethodLength"
               
files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
index 40ce357..ec211f7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
@@ -51,21 +51,24 @@ public class StreamSinkNode<K, V> extends StreamsGraphNode {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
         final Serializer<K> keySerializer = producedInternal.keySerde() == 
null ? null : producedInternal.keySerde().serializer();
         final Serializer<V> valSerializer = producedInternal.valueSerde() == 
null ? null : producedInternal.valueSerde().serializer();
-        final StreamPartitioner<? super K, ? super V> partitioner = 
producedInternal.streamPartitioner();
         final String[] parentNames = parentNodeNames();
 
-        if (partitioner == null && keySerializer instanceof 
WindowedSerializer) {
-            @SuppressWarnings("unchecked")
-            final StreamPartitioner<K, V> windowedPartitioner = 
(StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, 
V>((WindowedSerializer) keySerializer);
-            topologyBuilder.addSink(nodeName(), topicNameExtractor, 
keySerializer, valSerializer, windowedPartitioner, parentNames);
-        } else if (topicNameExtractor instanceof StaticTopicNameExtractor) {
-            final String topicName = ((StaticTopicNameExtractor) 
topicNameExtractor).topicName;
+        final StreamPartitioner<? super K, ? super V> partitioner;
+        if (producedInternal.streamPartitioner() == null && keySerializer 
instanceof WindowedSerializer) {
+            partitioner = (StreamPartitioner<K, V>) new 
WindowedStreamPartitioner<K, V>((WindowedSerializer<K>) keySerializer);
+        } else {
+            partitioner = producedInternal.streamPartitioner();
+        }
+
+        if (topicNameExtractor instanceof StaticTopicNameExtractor) {
+            final String topicName = ((StaticTopicNameExtractor<K, V>) 
topicNameExtractor).topicName;
             topologyBuilder.addSink(nodeName(), topicName, keySerializer, 
valSerializer, partitioner, parentNames);
         } else {
-            topologyBuilder.addSink(nodeName(), topicNameExtractor, 
keySerializer, valSerializer, partitioner,  parentNames);
+            topologyBuilder.addSink(nodeName(), topicNameExtractor, 
keySerializer, valSerializer, partitioner, parentNames);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 4004f51..d5ad693 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -523,10 +523,11 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         boolean numPartitionsNeeded;
         do {
             numPartitionsNeeded = false;
+            boolean progressMadeThisIteration = false;  // avoid infinitely 
looping without making any progress on unknown repartitions
 
             for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : 
topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = 
repartitionTopicMetadata.get(topicName)
+                for (final String repartitionSourceTopic : 
topicsInfo.repartitionSourceTopics.keySet()) {
+                    final Optional<Integer> maybeNumPartitions = 
repartitionTopicMetadata.get(repartitionSourceTopic)
                                                                      
.numberOfPartitions();
                     Integer numPartitions = null;
 
@@ -535,24 +536,24 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
                         for (final TopicsInfo otherTopicsInfo : 
topicGroups.values()) {
                             final Set<String> otherSinkTopics = 
otherTopicsInfo.sinkTopics;
 
-                            if (otherSinkTopics.contains(topicName)) {
+                            if 
(otherSinkTopics.contains(repartitionSourceTopic)) {
                                 // if this topic is one of the sink topics of 
this topology,
                                 // use the maximum of all its source topic 
partitions as the number of partitions
-                                for (final String sourceTopicName : 
otherTopicsInfo.sourceTopics) {
+                                for (final String upstreamSourceTopic : 
otherTopicsInfo.sourceTopics) {
                                     Integer numPartitionsCandidate = null;
                                     // It is possible the sourceTopic is 
another internal topic, i.e,
                                     // map().join().join(map())
-                                    if 
(repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if 
(repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent())
 {
+                                    if 
(repartitionTopicMetadata.containsKey(upstreamSourceTopic)) {
+                                        if 
(repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().isPresent())
 {
                                             numPartitionsCandidate =
-                                                
repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
+                                                
repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().get();
                                         }
                                     } else {
-                                        final Integer count = 
metadata.partitionCountForTopic(sourceTopicName);
+                                        final Integer count = 
metadata.partitionCountForTopic(upstreamSourceTopic);
                                         if (count == null) {
                                             throw new TaskAssignmentException(
                                                 "No partition count found for 
source topic "
-                                                    + sourceTopicName
+                                                    + upstreamSourceTopic
                                                     + ", but it should have 
been."
                                             );
                                         }
@@ -568,16 +569,20 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
                             }
                         }
 
-                        // if we still have not found the right number of 
partitions,
-                        // another iteration is needed
                         if (numPartitions == null) {
                             numPartitionsNeeded = true;
+                            log.trace("Unable to determine number of 
partitions for {}, another iteration is needed",
+                                      repartitionSourceTopic);
                         } else {
-                            
repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
+                            
repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions);
+                            progressMadeThisIteration = true;
                         }
                     }
                 }
             }
+            if (!progressMadeThisIteration && numPartitionsNeeded) {
+                throw new TaskAssignmentException("Failed to compute number of 
partitions for all repartition topics");
+            }
         } while (numPartitionsNeeded);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index df7ad0f..e19c8b1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import java.time.Duration;
 import kafka.log.LogConfig;
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.admin.Admin;
@@ -33,7 +34,9 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -59,6 +62,8 @@ import java.util.concurrent.TimeUnit;
 
 import static java.time.Duration.ofMillis;
 import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -73,6 +78,7 @@ public class InternalTopicIntegrationTest {
 
     private static final String APP_ID = "internal-topics-integration-test";
     private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
+    private static final String DEFAULT_INPUT_TABLE_TOPIC = "inputTable";
 
     private final MockTime mockTime = CLUSTER.time;
 
@@ -80,7 +86,7 @@ public class InternalTopicIntegrationTest {
 
     @BeforeClass
     public static void startKafkaCluster() throws InterruptedException {
-        CLUSTER.createTopics(DEFAULT_INPUT_TOPIC);
+        CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC);
     }
 
     @Before
@@ -135,6 +141,37 @@ public class InternalTopicIntegrationTest {
         return Admin.create(adminClientConfig);
     }
 
+    /*
+     * This test just ensures that that the assignor does not get stuck during 
partition number resolution
+     * for internal repartition topics. See KAFKA-10689
+     */
+    @Test
+    public void shouldGetToRunningWithWindowedTableInFKJ() throws Exception {
+        final String appID = APP_ID + "-windowed-FKJ";
+        streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final KStream<String, String> inputTopic = 
streamsBuilder.stream(DEFAULT_INPUT_TOPIC);
+        final KTable<String, String> inputTable = 
streamsBuilder.table(DEFAULT_INPUT_TABLE_TOPIC);
+        inputTopic
+            .groupBy(
+                (k, v) -> k,
+                Grouped.with("GroupName", Serdes.String(), Serdes.String())
+            )
+            .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
+            .aggregate(
+                () -> "",
+                (k, v, a) -> a + k)
+            .leftJoin(
+                inputTable,
+                v -> v,
+                (x, y) -> x + y
+            );
+
+        final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), 
streamsProp);
+        startApplicationAndWaitUntilRunning(singletonList(streams), 
Duration.ofSeconds(60));
+    }
+
     @Test
     public void shouldCompactTopicsForKeyValueStoreChangelogs() {
         final String appID = APP_ID + "-compact";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 45d150a..15f4ea6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.time.Duration;
 import java.util.Properties;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClient;
@@ -36,18 +37,24 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
@@ -89,6 +96,7 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -1054,7 +1062,7 @@ public class StreamsPartitionAssignorTest {
         EasyMock.verify(streamsMetadataState);
         EasyMock.verify(taskManager);
 
-        assertEquals(Collections.singleton(t3p0.topic()), 
capturedCluster.getValue().topics());
+        assertEquals(singleton(t3p0.topic()), 
capturedCluster.getValue().topics());
         assertEquals(2, 
capturedCluster.getValue().partitionsForTopic(t3p0.topic()).size());
     }
 
@@ -2057,6 +2065,65 @@ public class StreamsPartitionAssignorTest {
         assertEquals(-128, partitionAssignor.uniqueField());
     }
 
+    @Test
+    public void 
shouldThrowTaskAssignmentExceptionWhenUnableToResolvePartitionCount() {
+        builder = new CorruptedInternalTopologyBuilder();
+        final InternalStreamsBuilder streamsBuilder = new 
InternalStreamsBuilder(builder);
+
+        final KStream<String, String> inputTopic = 
streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>());
+        final KTable<String, String> inputTable = 
streamsBuilder.table("topic2", new ConsumedInternal<>(), new 
MaterializedInternal<>(Materialized.as("store")));
+        inputTopic
+            .groupBy(
+                (k, v) -> k,
+                Grouped.with("GroupName", Serdes.String(), Serdes.String())
+            )
+            .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
+            .aggregate(
+                () -> "",
+                (k, v, a) -> a + k)
+            .leftJoin(
+                inputTable,
+                v -> v,
+                (x, y) -> x + y
+            );
+        streamsBuilder.buildAndOptimizeTopology();
+
+        configureDefault();
+
+        subscriptions.put("consumer",
+                          new Subscription(
+                              singletonList("topic"),
+                              defaultSubscriptionInfo.encode()
+                          ));
+        final Map<String, Assignment> assignments = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
+        
assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(),
+                   equalTo(AssignorError.ASSIGNMENT_ERROR.code()));
+    }
+
+    private static class CorruptedInternalTopologyBuilder extends 
InternalTopologyBuilder {
+        private Map<Integer, TopicsInfo> corruptedTopicGroups;
+
+        @Override
+        public synchronized Map<Integer, TopicsInfo> topicGroups() {
+            if (corruptedTopicGroups == null) {
+                corruptedTopicGroups = new HashMap<>();
+                for (final Map.Entry<Integer, TopicsInfo> topicGroupEntry : 
super.topicGroups().entrySet()) {
+                    final TopicsInfo originalInfo = topicGroupEntry.getValue();
+                    corruptedTopicGroups.put(
+                        topicGroupEntry.getKey(),
+                        new TopicsInfo(
+                            emptySet(),
+                            originalInfo.sourceTopics,
+                            originalInfo.repartitionSourceTopics,
+                            originalInfo.stateChangelogTopics
+                        ));
+                }
+            }
+
+            return corruptedTopicGroups;
+        }
+    }
+
     private static ByteBuffer encodeFutureSubscription() {
         final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* 
supported version */);
         buf.putInt(LATEST_SUPPORTED_VERSION + 1);

Reply via email to