[ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311091#comment-16311091
 ] 

Bertrand JAMET commented on KAFKA-6398:
---------------------------------------

Hello, 
The failure occurs when the ProcessorTopologyTestDriver is created : 
{code:java}
        KTable<Bytes, String> table1 = builder.table("table1");
        KTable<Bytes, String> table2 = builder.table("table2");

        KTable<Bytes, String> joinedTable = table1.leftJoin(table2, new 
ValueJoiner<String, String, String>() {
                    @Override
                    public String apply(final String value1, final String 
value2) {
                        return value1;
                    }
                } , Materialized.<Bytes, String, KeyValueStore<Bytes, 
byte[]>>with(Serdes.Bytes(), Serdes.String()));

                builder.<Bytes, Object>stream("stream").join(joinedTable, new 
ValueJoiner<Object, String, Object>() {
                    @Override
                    public Object apply(final Object value1, final String 
value2) {
                        return value1;
                    }
                });
        Topology topology = builder.build();
        StreamsConfig st = newStreamsConfig();
        ProcessorTopologyTestDriver driver = new 
ProcessorTopologyTestDriver(st, topology);     
{code}

I get the following stack, even on trunk
{noformat}
org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor KSTREAM-JOIN-0000000011
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:106)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:389)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:167)
        at 
org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:251)
        at 
org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:168)
        at nc.smti.sae.TestLeftJoinStream.TestJoin(TestLeftJoinStream.java:86)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        [...]
Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
topology building: Processor KSTREAM-JOIN-0000000011 has no access to 
StateStore table1-STATE-STORE-0000000000
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:72)
        at 
org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
        at 
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinValueGetter.init(KTableKTableLeftJoin.java:104)
        at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:54)
        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:104)
        ... 28 more
{noformat}





> Stream-Table join fails, if table is not materialized
> -----------------------------------------------------
>
>                 Key: KAFKA-6398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6398
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.1, 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
> building: StateStore null is not added yet.
>       at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
>       at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., 
> "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize 
> processor KSTREAM-JOIN-0000000005
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
> topology building: Processor KSTREAM-JOIN-0000000005 has no access to 
> StateStore KTABLE-SOURCE-STATE-STORE-0000000000
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
> {noformat}
> One can workaround by piping the result through a topic:
> {noformat}
> final KTable filteredKTable = 
> builder.table("table-topic").filter(...).through("TOPIC");;
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to