[ 
https://issues.apache.org/jira/browse/FLINK-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladislav Pernin updated FLINK-6061:
------------------------------------
    Summary: NPE on TypeSerializer.serialize with a RocksDBStateBackend calling 
entries() on a keyed state in the open() function  (was: NPE on 
TypeSerializer.serialize with a RocksDBStateBackend calling state.entries in 
the open() function)

> NPE on TypeSerializer.serialize with a RocksDBStateBackend calling entries() 
> on a keyed state in the open() function
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6061
>                 URL: https://issues.apache.org/jira/browse/FLINK-6061
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, State Backends, Checkpointing, Streaming
>    Affects Versions: 1.3.0
>            Reporter: Vladislav Pernin
>
> With a default state (heap), the call to state.entries() "nicely fails" with 
> a IllegalStateException :
> {noformat}
> Caused by: java.lang.IllegalStateException: No key set.
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>       at 
> org.apache.flink.runtime.state.heap.HeapMapState.entries(HeapMapState.java:188)
>       at 
> org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
>       at 
> org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> With a RocksDBStateBackend, it fails with a NPE :
> {noformat}
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:64)
>       at 
> org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:27)
>       at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKey(AbstractRocksDBState.java:181)
>       at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
>       at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:148)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.serializeCurrentKeyAndNamespace(RocksDBMapState.java:263)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.iterator(RocksDBMapState.java:196)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.entries(RocksDBMapState.java:143)
>       at 
> org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
>       at 
> org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The reason is that the record is null, because backend.getCurrentKey() is 
> null (not yet set) in AbstractRocksDBState.
> This may also be the case for other RockDBState implementations.
> You can find the reproducer here based on 1.3-SNAPSHOT (needed for the 
> MapState) :
> https://github.com/vpernin/flink-rocksdbstate-npe
> The reproducer is a non sense application. There is no MapState with TTL or 
> expiration yet, so the goal is to try to shrink or expire the state at some 
> interval.
> This could be done by iterating over the entries of the state and removing 
> some of them.
> This could probably not be done in the open() method of a rich function.
> I also tried to implement CheckpointListener and to access the state content 
> in notifyCheckpointComplete() method, but it fails to, I guess due to the 
> asynchronous nature of the checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to