[ 
https://issues.apache.org/jira/browse/KAFKA-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4311:
------------------------------
    Description: 
The two exceptions below were reported by Frank on the dev mailing list. After 
investigation, the root cause is multiple cache evictions happening in the same 
topology. 

Given a topology like the one below. If a record arriving in `tableOne` causes 
a cache eviction, it will trigger the `leftJoin` that will do a `get` from 
`reducer-store`. If the key is not currently cached in `reducer-store`, but is 
in the backing store, it will be put into the cache, and it may also trigger an 
eviction. If it does trigger an eviction and the eldest entry is dirty it will 
flush the dirty keys. It is at this point that the exception in the comment 
happens (ClassCastException). This occurs because the ProcessorContext is still 
set to the context of the `leftJoin` and the next child in the topology is 
`mapValues`.

We need to set the correct `ProcessorNode`, on the context,  in the 
`ForwardingCacheFlushListener` prior to calling `context.forward`. We also need 
to set remember to reset the `ProcessorNode` to the previous node once 
`context.forward` has completed.

{code}
        final KTable<String, String> one = builder.table(Serdes.String(), 
Serdes.String(), tableOne, tableOne);
        final KTable<Long, String> two = builder.table(Serdes.Long(), 
Serdes.String(), tableTwo, tableTwo);
        final KTable<String, Long> reduce = two.groupBy(new 
KeyValueMapper<Long, String, KeyValue<String, Long>>() {
            @Override
            public KeyValue<String, Long> apply(final Long key, final String 
value) {
                return new KeyValue<>(value, key);
            }
        }, Serdes.String(), Serdes.Long())
                .reduce(new Reducer<Long>() {
                    @Override
                    public Long apply(final Long value1, final Long value2) {
                        return value1 + value2;
                    }
                }, new Reducer<Long>() {
                    @Override
                    public Long apply(final Long value1, final Long value2) {
                        return value1 - value2;
                    }
                }, "reducer-store");
    one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {
            @Override
            public String apply(final String value1, final Long value2) {
                return value1 + ":" + value2;
            }
        })
        .mapValues(new ValueMapper<String, String>() {
                    @Override
                    public String apply(final String value) {
                        return value;
                    }
                });
{code}


This exception is actually a symptom of the exception reported below in the 
comment. After the first exception is thrown, the StreamThread triggers a 
shutdown that then throws this exception.

[StreamThread-1] ERROR
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[StreamThread-1] Failed to close state manager for StreamTask 0_0:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
to close state store addr-organization

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
at
org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
at
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
at
org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
entry is null

at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)

... 7 more


  was:
The two exceptions below were reported by Frank on the dev mailing list. After 
investigation, the root cause is multiple cache evictions happening in the same 
topology. 

{code}
        final KTable<String, String> one = builder.table(Serdes.String(), 
Serdes.String(), tableOne, tableOne);
        final KTable<Long, String> two = builder.table(Serdes.Long(), 
Serdes.String(), tableTwo, tableTwo);
        final KTable<String, Long> reduce = two.groupBy(new 
KeyValueMapper<Long, String, KeyValue<String, Long>>() {
            @Override
            public KeyValue<String, Long> apply(final Long key, final String 
value) {
                return new KeyValue<>(value, key);
            }
        }, Serdes.String(), Serdes.Long())
                .reduce(new Reducer<Long>() {
                    @Override
                    public Long apply(final Long value1, final Long value2) {
                        return value1 + value2;
                    }
                }, new Reducer<Long>() {
                    @Override
                    public Long apply(final Long value1, final Long value2) {
                        return value1 - value2;
                    }
                }, "reducer-store");
    one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {
            @Override
            public String apply(final String value1, final Long value2) {
                return value1 + ":" + value2;
            }
        })
        .mapValues(new ValueMapper<String, String>() {
                    @Override
                    public String apply(final String value) {
                        return value;
                    }
                });
{code}



Reported on the mailing list. Needs looking into how it could get in this state.
[StreamThread-1] ERROR
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[StreamThread-1] Failed to close state manager for StreamTask 0_0:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
to close state store addr-organization

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
at
org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
at
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
at
org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
entry is null

at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)

... 7 more



> Multi layer cache eviction causes forwarding to incorrect Processor Node 
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-4311
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4311
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>            Reporter: Damian Guy
>            Assignee: Damian Guy
>             Fix For: 0.10.1.1
>
>
> The two exceptions below were reported by Frank on the dev mailing list. 
> After investigation, the root cause is multiple cache evictions happening in 
> the same topology. 
> Given a topology like the one below. If a record arriving in `tableOne` 
> causes a cache eviction, it will trigger the `leftJoin` that will do a `get` 
> from `reducer-store`. If the key is not currently cached in `reducer-store`, 
> but is in the backing store, it will be put into the cache, and it may also 
> trigger an eviction. If it does trigger an eviction and the eldest entry is 
> dirty it will flush the dirty keys. It is at this point that the exception in 
> the comment happens (ClassCastException). This occurs because the 
> ProcessorContext is still set to the context of the `leftJoin` and the next 
> child in the topology is `mapValues`.
> We need to set the correct `ProcessorNode`, on the context,  in the 
> `ForwardingCacheFlushListener` prior to calling `context.forward`. We also 
> need to set remember to reset the `ProcessorNode` to the previous node once 
> `context.forward` has completed.
> {code}
>         final KTable<String, String> one = builder.table(Serdes.String(), 
> Serdes.String(), tableOne, tableOne);
>         final KTable<Long, String> two = builder.table(Serdes.Long(), 
> Serdes.String(), tableTwo, tableTwo);
>         final KTable<String, Long> reduce = two.groupBy(new 
> KeyValueMapper<Long, String, KeyValue<String, Long>>() {
>             @Override
>             public KeyValue<String, Long> apply(final Long key, final String 
> value) {
>                 return new KeyValue<>(value, key);
>             }
>         }, Serdes.String(), Serdes.Long())
>                 .reduce(new Reducer<Long>() {
>                     @Override
>                     public Long apply(final Long value1, final Long value2) {
>                         return value1 + value2;
>                     }
>                 }, new Reducer<Long>() {
>                     @Override
>                     public Long apply(final Long value1, final Long value2) {
>                         return value1 - value2;
>                     }
>                 }, "reducer-store");
>     one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {
>             @Override
>             public String apply(final String value1, final Long value2) {
>                 return value1 + ":" + value2;
>             }
>         })
>         .mapValues(new ValueMapper<String, String>() {
>                     @Override
>                     public String apply(final String value) {
>                         return value;
>                     }
>                 });
> {code}
> This exception is actually a symptom of the exception reported below in the 
> comment. After the first exception is thrown, the StreamThread triggers a 
> shutdown that then throws this exception.
> [StreamThread-1] ERROR
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-1] Failed to close state manager for StreamTask 0_0:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to close state store addr-organization
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
> Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
> entry is null
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)
> ... 7 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to