This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 389a01050303c53179b5f5f9bdcd7087e1a450c7 Author: A. Sophie Blee-Goldman <[email protected]> AuthorDate: Tue Nov 17 16:57:53 2020 -0800 KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops (#9568) Fix infinite loop in assignor when trying to resolve the number of partitions in a topology with a windowed FKJ. Also adds a check to this loop to break out and fail the application if we detect that we are/will be stuck in an infinite loop Reviewers: Matthias Sax <[email protected]> --- checkstyle/suppressions.xml | 2 +- .../kstream/internals/graph/StreamSinkNode.java | 19 +++--- .../internals/StreamsPartitionAssignor.java | 29 +++++---- .../integration/InternalTopicIntegrationTest.java | 39 +++++++++++- .../internals/StreamsPartitionAssignorTest.java | 69 +++++++++++++++++++++- 5 files changed, 135 insertions(+), 23 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index d9eff63..4ff500b 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -185,7 +185,7 @@ <!-- Streams tests --> <suppress checks="ClassFanOutComplexity" - files="(StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/> + files="(StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/> <suppress checks="MethodLength" files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java index 40ce357..ec211f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java @@ -51,21 +51,24 @@ public class StreamSinkNode<K, V> extends StreamsGraphNode { } @Override + @SuppressWarnings("unchecked") public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { final Serializer<K> keySerializer = producedInternal.keySerde() == null ? null : producedInternal.keySerde().serializer(); final Serializer<V> valSerializer = producedInternal.valueSerde() == null ? null : producedInternal.valueSerde().serializer(); - final StreamPartitioner<? super K, ? super V> partitioner = producedInternal.streamPartitioner(); final String[] parentNames = parentNodeNames(); - if (partitioner == null && keySerializer instanceof WindowedSerializer) { - @SuppressWarnings("unchecked") - final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>((WindowedSerializer) keySerializer); - topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, windowedPartitioner, parentNames); - } else if (topicNameExtractor instanceof StaticTopicNameExtractor) { - final String topicName = ((StaticTopicNameExtractor) topicNameExtractor).topicName; + final StreamPartitioner<? super K, ? super V> partitioner; + if (producedInternal.streamPartitioner() == null && keySerializer instanceof WindowedSerializer) { + partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<K, V>((WindowedSerializer<K>) keySerializer); + } else { + partitioner = producedInternal.streamPartitioner(); + } + + if (topicNameExtractor instanceof StaticTopicNameExtractor) { + final String topicName = ((StaticTopicNameExtractor<K, V>) topicNameExtractor).topicName; topologyBuilder.addSink(nodeName(), topicName, keySerializer, valSerializer, partitioner, parentNames); } else { - topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, partitioner, parentNames); + topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, partitioner, parentNames); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 4004f51..d5ad693 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -523,10 +523,11 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf boolean numPartitionsNeeded; do { numPartitionsNeeded = false; + boolean progressMadeThisIteration = false; // avoid infinitely looping without making any progress on unknown repartitions for (final TopicsInfo topicsInfo : topicGroups.values()) { - for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) { - final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName) + for (final String repartitionSourceTopic : topicsInfo.repartitionSourceTopics.keySet()) { + final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(repartitionSourceTopic) .numberOfPartitions(); Integer numPartitions = null; @@ -535,24 +536,24 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf for (final TopicsInfo otherTopicsInfo : topicGroups.values()) { final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics; - if (otherSinkTopics.contains(topicName)) { + if (otherSinkTopics.contains(repartitionSourceTopic)) { // if this topic is one of the sink topics of this topology, // use the maximum of all its source topic partitions as the number of partitions - for (final String sourceTopicName : otherTopicsInfo.sourceTopics) { + for (final String upstreamSourceTopic : otherTopicsInfo.sourceTopics) { Integer numPartitionsCandidate = null; // It is possible the sourceTopic is another internal topic, i.e, // map().join().join(map()) - if (repartitionTopicMetadata.containsKey(sourceTopicName)) { - if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) { + if (repartitionTopicMetadata.containsKey(upstreamSourceTopic)) { + if (repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().isPresent()) { numPartitionsCandidate = - repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get(); + repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().get(); } } else { - final Integer count = metadata.partitionCountForTopic(sourceTopicName); + final Integer count = metadata.partitionCountForTopic(upstreamSourceTopic); if (count == null) { throw new TaskAssignmentException( "No partition count found for source topic " - + sourceTopicName + + upstreamSourceTopic + ", but it should have been." ); } @@ -568,16 +569,20 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf } } - // if we still have not found the right number of partitions, - // another iteration is needed if (numPartitions == null) { numPartitionsNeeded = true; + log.trace("Unable to determine number of partitions for {}, another iteration is needed", + repartitionSourceTopic); } else { - repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions); + repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions); + progressMadeThisIteration = true; } } } } + if (!progressMadeThisIteration && numPartitionsNeeded) { + throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics"); + } } while (numPartitionsNeeded); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index df7ad0f..e19c8b1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.integration; +import java.time.Duration; import kafka.log.LogConfig; import kafka.utils.MockTime; import org.apache.kafka.clients.admin.Admin; @@ -33,7 +34,9 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; @@ -59,6 +62,8 @@ import java.util.concurrent.TimeUnit; import static java.time.Duration.ofMillis; import static java.time.Duration.ofSeconds; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -73,6 +78,7 @@ public class InternalTopicIntegrationTest { private static final String APP_ID = "internal-topics-integration-test"; private static final String DEFAULT_INPUT_TOPIC = "inputTopic"; + private static final String DEFAULT_INPUT_TABLE_TOPIC = "inputTable"; private final MockTime mockTime = CLUSTER.time; @@ -80,7 +86,7 @@ public class InternalTopicIntegrationTest { @BeforeClass public static void startKafkaCluster() throws InterruptedException { - CLUSTER.createTopics(DEFAULT_INPUT_TOPIC); + CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC); } @Before @@ -135,6 +141,37 @@ public class InternalTopicIntegrationTest { return Admin.create(adminClientConfig); } + /* + * This test just ensures that that the assignor does not get stuck during partition number resolution + * for internal repartition topics. See KAFKA-10689 + */ + @Test + public void shouldGetToRunningWithWindowedTableInFKJ() throws Exception { + final String appID = APP_ID + "-windowed-FKJ"; + streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + final KStream<String, String> inputTopic = streamsBuilder.stream(DEFAULT_INPUT_TOPIC); + final KTable<String, String> inputTable = streamsBuilder.table(DEFAULT_INPUT_TABLE_TOPIC); + inputTopic + .groupBy( + (k, v) -> k, + Grouped.with("GroupName", Serdes.String(), Serdes.String()) + ) + .windowedBy(TimeWindows.of(Duration.ofMinutes(10))) + .aggregate( + () -> "", + (k, v, a) -> a + k) + .leftJoin( + inputTable, + v -> v, + (x, y) -> x + y + ); + + final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsProp); + startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(60)); + } + @Test public void shouldCompactTopicsForKeyValueStoreChangelogs() { final String appID = APP_ID + "-compact"; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 45d150a..15f4ea6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.time.Duration; import java.util.Properties; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; @@ -36,18 +37,24 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.kstream.internals.ConsumedInternal; +import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; +import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration; @@ -89,6 +96,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -1054,7 +1062,7 @@ public class StreamsPartitionAssignorTest { EasyMock.verify(streamsMetadataState); EasyMock.verify(taskManager); - assertEquals(Collections.singleton(t3p0.topic()), capturedCluster.getValue().topics()); + assertEquals(singleton(t3p0.topic()), capturedCluster.getValue().topics()); assertEquals(2, capturedCluster.getValue().partitionsForTopic(t3p0.topic()).size()); } @@ -2057,6 +2065,65 @@ public class StreamsPartitionAssignorTest { assertEquals(-128, partitionAssignor.uniqueField()); } + @Test + public void shouldThrowTaskAssignmentExceptionWhenUnableToResolvePartitionCount() { + builder = new CorruptedInternalTopologyBuilder(); + final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder); + + final KStream<String, String> inputTopic = streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>()); + final KTable<String, String> inputTable = streamsBuilder.table("topic2", new ConsumedInternal<>(), new MaterializedInternal<>(Materialized.as("store"))); + inputTopic + .groupBy( + (k, v) -> k, + Grouped.with("GroupName", Serdes.String(), Serdes.String()) + ) + .windowedBy(TimeWindows.of(Duration.ofMinutes(10))) + .aggregate( + () -> "", + (k, v, a) -> a + k) + .leftJoin( + inputTable, + v -> v, + (x, y) -> x + y + ); + streamsBuilder.buildAndOptimizeTopology(); + + configureDefault(); + + subscriptions.put("consumer", + new Subscription( + singletonList("topic"), + defaultSubscriptionInfo.encode() + )); + final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(), + equalTo(AssignorError.ASSIGNMENT_ERROR.code())); + } + + private static class CorruptedInternalTopologyBuilder extends InternalTopologyBuilder { + private Map<Integer, TopicsInfo> corruptedTopicGroups; + + @Override + public synchronized Map<Integer, TopicsInfo> topicGroups() { + if (corruptedTopicGroups == null) { + corruptedTopicGroups = new HashMap<>(); + for (final Map.Entry<Integer, TopicsInfo> topicGroupEntry : super.topicGroups().entrySet()) { + final TopicsInfo originalInfo = topicGroupEntry.getValue(); + corruptedTopicGroups.put( + topicGroupEntry.getKey(), + new TopicsInfo( + emptySet(), + originalInfo.sourceTopics, + originalInfo.repartitionSourceTopics, + originalInfo.stateChangelogTopics + )); + } + } + + return corruptedTopicGroups; + } + } + private static ByteBuffer encodeFutureSubscription() { final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */); buf.putInt(LATEST_SUPPORTED_VERSION + 1);
