Repository: kafka Updated Branches: refs/heads/trunk 8e9e17767 -> 1d586cb50
KAFKA-4476: Kafka Streams gets stuck if metadata is missing - break loop in StreamPartitionAssigner.assign() in case partition metadata is missing - fit state transition issue (follow up to KAFKA-3637: Add method that checks if streams are initialised) - some test improvements Author: Matthias J. Sax <[email protected]> Reviewers: Eno Thereska, Ismael Juma, Guozhang Wang Closes #2209 from mjsax/kafka-4476-stuck-on-missing-metadata Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d586cb5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d586cb5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d586cb5 Branch: refs/heads/trunk Commit: 1d586cb50a94540f6b931a8d525ba75273f314f0 Parents: 8e9e177 Author: Matthias J. Sax <[email protected]> Authored: Sat Dec 10 21:48:44 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Sat Dec 10 21:48:44 2016 -0800 ---------------------------------------------------------------------- .../internals/StreamPartitionAssignor.java | 28 +++++--- .../integration/ResetIntegrationTest.java | 54 ++++++--------- .../internals/StreamPartitionAssignorTest.java | 73 ++++++++++++++++++++ 3 files changed, 114 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1d586cb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 84f78dc..7e15f70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -59,6 +59,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class); + public final static int UNKNOWN = -1; + public final static int NOT_AVAILABLE = -2; + private static class AssignedPartition implements Comparable<AssignedPartition> { public final TaskId taskId; public final TopicPartition partition; @@ -128,7 +131,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable InternalTopicMetadata(final InternalTopicConfig config) { this.config = config; - this.numPartitions = -1; + this.numPartitions = UNKNOWN; } } @@ -140,7 +143,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable if (result != 0) { return result; } else { - return p1.partition() < p2.partition() ? -1 : (p1.partition() > p2.partition() ? 1 : 0); + return p1.partition() < p2.partition() ? UNKNOWN : (p1.partition() > p2.partition() ? 1 : 0); } } }; @@ -311,7 +314,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions; // try set the number of partitions for this repartition topic if it is not set yet - if (numPartitions == -1) { + if (numPartitions == UNKNOWN) { for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) { Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics; @@ -326,6 +329,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numPartitions; } else { numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName); + if (numPartitionsCandidate == null) { + repartitionTopicMetadata.get(topicName).numPartitions = NOT_AVAILABLE; + } } if (numPartitionsCandidate != null && numPartitionsCandidate > numPartitions) { @@ -337,7 +343,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable // if we still have not find the right number of partitions, // another iteration is needed - if (numPartitions == -1) + if (numPartitions == UNKNOWN) numPartitionsNeeded = true; else repartitionTopicMetadata.get(topicName).numPartitions = numPartitions; @@ -429,7 +435,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) { // the expected number of partitions is the max value of TaskId.partition + 1 - int numPartitions = -1; + int numPartitions = UNKNOWN; if (tasksByTopicGroup.get(topicGroupId) != null) { for (TaskId task : tasksByTopicGroup.get(topicGroupId)) { if (numPartitions < task.partition + 1) @@ -607,8 +613,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable InternalTopicConfig topic = entry.getValue().config; Integer numPartitions = entry.getValue().numPartitions; - if (numPartitions < 0) + if (numPartitions == NOT_AVAILABLE) { + continue; + } + if (numPartitions < 0) { throw new TopologyBuilderException(String.format("stream-thread [%s] Topic [%s] number of partitions not defined", streamThread.getName(), topic.name())); + } internalTopicManager.makeReady(topic, numPartitions); @@ -647,7 +657,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable private void ensureCopartitioning(Set<String> copartitionGroup, Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions, Cluster metadata) { - int numPartitions = -1; + int numPartitions = UNKNOWN; for (String topic : copartitionGroup) { if (!allRepartitionTopicsNumPartitions.containsKey(topic)) { @@ -656,7 +666,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable if (partitions == null) throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", streamThread.getName(), topic)); - if (numPartitions == -1) { + if (numPartitions == UNKNOWN) { numPartitions = partitions; } else if (numPartitions != partitions) { String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); @@ -668,7 +678,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable // if all topics for this co-partition group is repartition topics, // then set the number of partitions to be the maximum of the number of partitions. - if (numPartitions == -1) { + if (numPartitions == UNKNOWN) { for (Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) { if (copartitionGroup.contains(entry.getKey())) { int partitions = entry.getValue().numPartitions; http://git-wip-us.apache.org/repos/asf/kafka/blob/1d586cb5/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 6ed2ffd..efb5c81 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -36,7 +36,6 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; @@ -47,7 +46,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; +import org.junit.Test; import java.util.Collections; import java.util.HashSet; @@ -137,7 +136,7 @@ public class ResetIntegrationTest { } } - @Ignore + @Test public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC); @@ -158,16 +157,22 @@ public class ResetIntegrationTest { 60000); // receive only first values to make sure intermediate user topic is not consumed completely // => required to test "seekToEnd" for intermediate topics - final KeyValue<Object, Object> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( resultTopicConsumerConfig, OUTPUT_TOPIC_2, - 1 - ).get(0); + 10 + ); streams.close(); TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + // insert bad record to maks sure intermediate user topic gets seekToEnd() + mockTime.sleep(1); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + INTERMEDIATE_USER_TOPIC, + Collections.singleton(new KeyValue<>(-1L, "badRecord-ShouldBeSkipped")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class), mockTime.milliseconds()); + // RESET streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); streams.cleanUp(); @@ -184,11 +189,11 @@ public class ResetIntegrationTest { OUTPUT_TOPIC, 10, 60000); - final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( resultTopicConsumerConfig, OUTPUT_TOPIC_2_RERUN, - 1 - ).get(0); + 10 + ); streams.close(); assertThat(resultRerun, equalTo(result)); @@ -219,7 +224,7 @@ public class ResetIntegrationTest { } } - @Ignore + @Test public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { final Properties streamsConfiguration = prepareTest(); final Properties resultTopicConsumerConfig = TestUtils.consumerConfig( @@ -274,7 +279,7 @@ public class ResetIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8); + streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); @@ -316,32 +321,17 @@ public class ResetIntegrationTest { final KStream<Long, String> input = builder.stream(INPUT_TOPIC); // use map to trigger internal re-partitioning before groupByKey - final KTable<Long, Long> globalCounts = input - .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { + input.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { @Override public KeyValue<Long, String> apply(final Long key, final String value) { return new KeyValue<>(key, value); } }) .groupByKey() - .count("global-count"); - globalCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC); - - final KStream<Long, Long> windowedCounts = input - .through(INTERMEDIATE_USER_TOPIC) - .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { - private long sleep = 1000; + .count("global-count") + .to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC); - @Override - public KeyValue<Long, String> apply(final Long key, final String value) { - // must sleep long enough to avoid processing the whole intermediate topic before application gets stopped - // => want to test "skip over" unprocessed records - // increasing the sleep time only has disadvantage that test run time is increased - mockTime.sleep(sleep); - sleep *= 2; - return new KeyValue<>(key, value); - } - }) + input.through(INTERMEDIATE_USER_TOPIC) .groupByKey() .count(TimeWindows.of(35).advanceBy(10), "count") .toStream() @@ -350,8 +340,8 @@ public class ResetIntegrationTest { public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) { return new KeyValue<>(key.window().start() + key.window().end(), value); } - }); - windowedCounts.to(Serdes.Long(), Serdes.Long(), outputTopic2); + }) + .to(Serdes.Long(), Serdes.Long(), outputTopic2); return builder; } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d586cb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 3730785..d40956e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -28,6 +28,11 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; @@ -51,9 +56,11 @@ import java.util.Properties; import java.util.Set; import java.util.UUID; +import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; public class StreamPartitionAssignorTest { @@ -812,7 +819,73 @@ public class StreamPartitionAssignorTest { final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); final Cluster cluster = partitionAssignor.clusterMetadata(); assertNotNull(cluster); + } + + @Test + public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() { + final String applicationId = "application-id"; + final KStreamBuilder builder = new KStreamBuilder(); + builder.setApplicationId(applicationId); + + KStream<Object, Object> stream1 = builder + .stream("topic1") + .selectKey(new KeyValueMapper<Object, Object, Object>() { + @Override + public Object apply(Object key, Object value) { + return null; + } + }) + .through("topic2"); + builder + .stream("unknownTopic") + .selectKey(new KeyValueMapper<Object, Object, Object>() { + @Override + public Object apply(Object key, Object value) { + return null; + } + }) + .join( + stream1, + new ValueJoiner() { + @Override + public Object apply(Object value1, Object value2) { + return null; + } + }, + JoinWindows.of(0) + ); + + final UUID uuid = UUID.randomUUID(); + final String client = "client1"; + + final StreamsConfig config = new StreamsConfig(configProps()); + final StreamThread streamThread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder)); + + final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client)); + + final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); + final Set<TaskId> emptyTasks = Collections.<TaskId>emptySet(); + subscriptions.put( + client, + new PartitionAssignor.Subscription( + Collections.singletonList("unknownTopic"), + new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode() + ) + ); + + final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions); + + final List<TopicPartition> expectedAssignment = Arrays.asList( + new TopicPartition("topic1", 0), + new TopicPartition("topic1", 1), + new TopicPartition("topic1", 2), + new TopicPartition("topic2", 0), + new TopicPartition("topic2", 1), + new TopicPartition("topic2", 2) + ); + assertThat(expectedAssignment, equalTo(assignment.get(client).partitions())); } private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssignor.Assignment assignment) {
