[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416494#comment-16416494
 ] 

Guozhang Wang commented on KAFKA-6437:
--------------------------------------

This is a interesting reported issue in KAFKA-6720 that for join-involved 
topics, if it does not exist yet a exception will be thrown. I think it is not 
a complete duplicate of this ticket, and I'd like to summarize the 
"inconsistent" behavior that we are facing today:

1) For join operation from user topics directly (i.e. no reshuffling added as 
Streams assumes input topics already partitioned by key), we'd require user 
topics pre-exist; and if not, we throw TopologyBuilderException.
2) For join operation from repartition topics, since they are note available at 
assignment phase we "assume" the repartition topics will be created and become 
available, hence we do not check if the source topics are available. When the 
source topic is missing, and hence no data will be send to the repartition 
topics at all, Streams will hang (this is what this JIRA reported).
3) For stateless operations, if a source topic was missing, Streams will 
continue but generate a warning.

So I think the actual fix should be in two folds:

1) We can [collect all external topic's 
num.partition|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L425-L437]
 at the very beginning of the assign() phase, and log a warning entry if some 
of the topic's metadata cannot be found.

2) In step one we do not need to [query the 
metadata|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L341]
 again but we can get directly from the collected available num.partitions map.

3) The finally in ensureCopartitioning, if the metadata cannot be found we skip 
the [checking co-partition 
phase|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L665]
 but log another warning that "since the topic is not found, we will skip the 
co-partition validation .."


> Streams does not warn about missing input topics, but hangs
> -----------------------------------------------------------
>
>                 Key: KAFKA-6437
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6437
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0
>         Environment: Single client on single node broker
>            Reporter: Chris Schwarzfischer
>            Assignee: Mariam John
>            Priority: Minor
>              Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …00009, see below). Only parts of 
> the intermediate topics are created (up to …00009)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>       ProcessorTopology:
>               KSTREAM-SOURCE-0000000011:
>                       topics:         
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-0000000009-repartition]
>                       children:       [KTABLE-AGGREGATE-0000000012]
>               KTABLE-AGGREGATE-0000000012:
>                       states:         
> [KTABLE-AGGREGATE-STATE-STORE-0000000009]
>                       children:       [KTABLE-TOSTREAM-0000000020]
>               KTABLE-TOSTREAM-0000000020:
>                       children:       [KSTREAM-SINK-0000000021]
>               KSTREAM-SINK-0000000021:
>                       topic:          data_udr_month_customer_aggregration
>               KSTREAM-SOURCE-0000000017:
>                       topics:         
> [mystreams_app-KSTREAM-MAP-0000000014-repartition]
>                       children:       [KSTREAM-LEFTJOIN-0000000018]
>               KSTREAM-LEFTJOIN-0000000018:
>                       states:         
> [KTABLE-AGGREGATE-STATE-STORE-0000000009]
>                       children:       [KSTREAM-SINK-0000000019]
>               KSTREAM-SINK-0000000019:
>                       topic:          data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-0000000014-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-0000000009-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to