[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416494#comment-16416494 ]
Guozhang Wang commented on KAFKA-6437: -------------------------------------- This is a interesting reported issue in KAFKA-6720 that for join-involved topics, if it does not exist yet a exception will be thrown. I think it is not a complete duplicate of this ticket, and I'd like to summarize the "inconsistent" behavior that we are facing today: 1) For join operation from user topics directly (i.e. no reshuffling added as Streams assumes input topics already partitioned by key), we'd require user topics pre-exist; and if not, we throw TopologyBuilderException. 2) For join operation from repartition topics, since they are note available at assignment phase we "assume" the repartition topics will be created and become available, hence we do not check if the source topics are available. When the source topic is missing, and hence no data will be send to the repartition topics at all, Streams will hang (this is what this JIRA reported). 3) For stateless operations, if a source topic was missing, Streams will continue but generate a warning. So I think the actual fix should be in two folds: 1) We can [collect all external topic's num.partition|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L425-L437] at the very beginning of the assign() phase, and log a warning entry if some of the topic's metadata cannot be found. 2) In step one we do not need to [query the metadata|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L341] again but we can get directly from the collected available num.partitions map. 3) The finally in ensureCopartitioning, if the metadata cannot be found we skip the [checking co-partition phase|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L665] but log another warning that "since the topic is not found, we will skip the co-partition validation .." > Streams does not warn about missing input topics, but hangs > ----------------------------------------------------------- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.0.0 > Environment: Single client on single node broker > Reporter: Chris Schwarzfischer > Assignee: Mariam John > Priority: Minor > Labels: newbie > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the middle" of the topology (at …00009, see below). Only parts of > the intermediate topics are created (up to …00009) > When the missing input topic is created, the streams application resumes > processing. > {noformat} > Topology: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-0000000011: > topics: > [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-0000000009-repartition] > children: [KTABLE-AGGREGATE-0000000012] > KTABLE-AGGREGATE-0000000012: > states: > [KTABLE-AGGREGATE-STATE-STORE-0000000009] > children: [KTABLE-TOSTREAM-0000000020] > KTABLE-TOSTREAM-0000000020: > children: [KSTREAM-SINK-0000000021] > KSTREAM-SINK-0000000021: > topic: data_udr_month_customer_aggregration > KSTREAM-SOURCE-0000000017: > topics: > [mystreams_app-KSTREAM-MAP-0000000014-repartition] > children: [KSTREAM-LEFTJOIN-0000000018] > KSTREAM-LEFTJOIN-0000000018: > states: > [KTABLE-AGGREGATE-STATE-STORE-0000000009] > children: [KSTREAM-SINK-0000000019] > KSTREAM-SINK-0000000019: > topic: data_UDR_joined > Partitions [mystreams_app-KSTREAM-MAP-0000000014-repartition-0, > mystreams_app-KTABLE-AGGREGATE-STATE-STORE-0000000009-repartition-0] > {noformat} > *Why this matters* > The applications does quite a lot of preprocessing before joining with the > missing input topic. This preprocessing won't happen without the topic, > creating a huge backlog of data. > *Fix* > Issue an `warn` or `error` level message at start to inform about the missing > topic and it's consequences. -- This message was sent by Atlassian JIRA (v7.6.3#76005)