[ 
https://issues.apache.org/jira/browse/KAFKA-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4311:
------------------------------
    Description: 
The two exceptions below were reported by Frank on the dev mailing list. After 
investigation, the root cause is multiple cache evictions happening in the same 
topology. 

{code}
        final KTable<String, String> one = builder.table(Serdes.String(), 
Serdes.String(), tableOne, tableOne);
        final KTable<Long, String> two = builder.table(Serdes.Long(), 
Serdes.String(), tableTwo, tableTwo);
        final KTable<String, Long> reduce = two.groupBy(new 
KeyValueMapper<Long, String, KeyValue<String, Long>>() {
            @Override
            public KeyValue<String, Long> apply(final Long key, final String 
value) {
                return new KeyValue<>(value, key);
            }
        }, Serdes.String(), Serdes.Long())
                .reduce(new Reducer<Long>() {
                    @Override
                    public Long apply(final Long value1, final Long value2) {
                        return value1 + value2;
                    }
                }, new Reducer<Long>() {
                    @Override
                    public Long apply(final Long value1, final Long value2) {
                        return value1 - value2;
                    }
                }, "reducer-store");
    one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {
            @Override
            public String apply(final String value1, final Long value2) {
                return value1 + ":" + value2;
            }
        })
        .mapValues(new ValueMapper<String, String>() {
                    @Override
                    public String apply(final String value) {
                        return value;
                    }
                });
{code}



Reported on the mailing list. Needs looking into how it could get in this state.
[StreamThread-1] ERROR
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[StreamThread-1] Failed to close state manager for StreamTask 0_0:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
to close state store addr-organization

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
at
org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
at
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
at
org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
entry is null

at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)

... 7 more


  was:
Reported on the mailing list. Needs looking into how it could get in this state.
[StreamThread-1] ERROR
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[StreamThread-1] Failed to close state manager for StreamTask 0_0:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
to close state store addr-organization

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
at
org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
at
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
at
org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
entry is null

at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)

... 7 more



> Multi layer cache eviction causes forwarding to incorrect Processor Node 
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-4311
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4311
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>            Reporter: Damian Guy
>            Assignee: Damian Guy
>             Fix For: 0.10.1.1
>
>
> The two exceptions below were reported by Frank on the dev mailing list. 
> After investigation, the root cause is multiple cache evictions happening in 
> the same topology. 
> {code}
>         final KTable<String, String> one = builder.table(Serdes.String(), 
> Serdes.String(), tableOne, tableOne);
>         final KTable<Long, String> two = builder.table(Serdes.Long(), 
> Serdes.String(), tableTwo, tableTwo);
>         final KTable<String, Long> reduce = two.groupBy(new 
> KeyValueMapper<Long, String, KeyValue<String, Long>>() {
>             @Override
>             public KeyValue<String, Long> apply(final Long key, final String 
> value) {
>                 return new KeyValue<>(value, key);
>             }
>         }, Serdes.String(), Serdes.Long())
>                 .reduce(new Reducer<Long>() {
>                     @Override
>                     public Long apply(final Long value1, final Long value2) {
>                         return value1 + value2;
>                     }
>                 }, new Reducer<Long>() {
>                     @Override
>                     public Long apply(final Long value1, final Long value2) {
>                         return value1 - value2;
>                     }
>                 }, "reducer-store");
>     one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {
>             @Override
>             public String apply(final String value1, final Long value2) {
>                 return value1 + ":" + value2;
>             }
>         })
>         .mapValues(new ValueMapper<String, String>() {
>                     @Override
>                     public String apply(final String value) {
>                         return value;
>                     }
>                 });
> {code}
> Reported on the mailing list. Needs looking into how it could get in this 
> state.
> [StreamThread-1] ERROR
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-1] Failed to close state manager for StreamTask 0_0:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to close state store addr-organization
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
> Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
> entry is null
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)
> ... 7 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to