[ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16310226#comment-16310226
 ] 

ASF GitHub Bot commented on KAFKA-6398:
---------------------------------------

guozhangwang opened a new pull request #4384: KAFKA-6398: fix KTable.filter 
that does not include its parent's queryable storename
URL: https://github.com/apache/kafka/pull/4384
 
 
   1. Include the parent's queryable store name in KTable.filter if this 
operator is not materialized.
   2. Augment InternalTopologyBuilder checking on null processor / store names 
from the enum.
   3. Unit test.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Stream-Table join fails, if table is not materialized
> -----------------------------------------------------
>
>                 Key: KAFKA-6398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6398
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.1, 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
> building: StateStore null is not added yet.
>       at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
>       at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., 
> "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize 
> processor KSTREAM-JOIN-0000000005
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
> topology building: Processor KSTREAM-JOIN-0000000005 has no access to 
> StateStore KTABLE-SOURCE-STATE-STORE-0000000000
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
> {noformat}
> One can workaround by piping the result through a topic:
> {noformat}
> final KTable filteredKTable = 
> builder.table("table-topic").filter(...).through("TOPIC");;
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to