[ 
https://issues.apache.org/jira/browse/FLINK-19330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igal Shilman updated FLINK-19330:
---------------------------------
    Description: 
In Flink 1.11, the AbstractStreamOperator's runtimeContext is not fully 
initialized when executing
{code:java}
 AbstractStreamOperator#intializeState(){code}
in particular KeyedStateStore is set after intializeState was finished.
 See: 
 
[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L258,L259]
 This behaviour was changed from Flink 1.10->Flink 1.11.

StateFun's FunctionGroupOperator performs its initialization logic at 
initalizeState, and it requires an already initialized runtimeContext.

This situation causes the following failure after recovery:
{code:java}
java.lang.RuntimeException: java.lang.NullPointerException: Keyed state can 
only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.lambda$applyToAllKeys$0(AbstractKeyedStateBackend.java:256)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_265]
        at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 ~[?:1.8.0_265]
        at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) 
~[?:1.8.0_265]
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:249)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.fireExpiredAsyncOperations(AsyncOperationFailureNotifier.java:42)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.initializeState(FunctionGroupOperator.java:160)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 
'keyed stream', i.e., after a 'keyBy()' operation.
        at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:223)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:188)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateAccessor(FlinkState.java:69)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindValue(FlinkStateBinder.java:48)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:30) 
~[statefun-flink-distribution.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:46)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:148)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.Reductions.enqueueAsyncOperationAfterRestore(Reductions.java:154)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.process(AsyncOperationFailureNotifier.java:66)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.process(AsyncOperationFailureNotifier.java:30)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.lambda$applyToAllKeys$0(AbstractKeyedStateBackend.java:252)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        ... 16 more
{code}

  was:
In Flink 1.11, the AbstractStreamOperator's runtimeContext is not fully 
initialized when executing `AbstractStreamOperator#intializeState()` in 
particular KeyedStateStore is set after intializeState was finished.
See: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L258,L259
This behaviour was changed from Flink 1.10->Flink 1.11.

StateFun's FunctionGroupOperator performs its initialization logic at 
initalizeState, and it requires an already initialized runtimeContext.

This situation causes the following failure after recovery:

{code}
java.lang.RuntimeException: java.lang.NullPointerException: Keyed state can 
only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.lambda$applyToAllKeys$0(AbstractKeyedStateBackend.java:256)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_265]
        at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 ~[?:1.8.0_265]
        at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) 
~[?:1.8.0_265]
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:249)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.fireExpiredAsyncOperations(AsyncOperationFailureNotifier.java:42)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.initializeState(FunctionGroupOperator.java:160)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 
'keyed stream', i.e., after a 'keyBy()' operation.
        at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:223)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:188)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at 
org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateAccessor(FlinkState.java:69)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindValue(FlinkStateBinder.java:48)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:30) 
~[statefun-flink-distribution.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:46)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:148)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.Reductions.enqueueAsyncOperationAfterRestore(Reductions.java:154)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.process(AsyncOperationFailureNotifier.java:66)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.process(AsyncOperationFailureNotifier.java:30)
 ~[statefun-flink-core.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.lambda$applyToAllKeys$0(AbstractKeyedStateBackend.java:252)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        ... 16 more
{code}



> Recovery with async operations fails due to unitialized runtimeContext
> ----------------------------------------------------------------------
>
>                 Key: FLINK-19330
>                 URL: https://issues.apache.org/jira/browse/FLINK-19330
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>            Reporter: Igal Shilman
>            Priority: Blocker
>
> In Flink 1.11, the AbstractStreamOperator's runtimeContext is not fully 
> initialized when executing
> {code:java}
>  AbstractStreamOperator#intializeState(){code}
> in particular KeyedStateStore is set after intializeState was finished.
>  See: 
>  
> [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L258,L259]
>  This behaviour was changed from Flink 1.10->Flink 1.11.
> StateFun's FunctionGroupOperator performs its initialization logic at 
> initalizeState, and it requires an already initialized runtimeContext.
> This situation causes the following failure after recovery:
> {code:java}
> java.lang.RuntimeException: java.lang.NullPointerException: Keyed state can 
> only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
>       at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.lambda$applyToAllKeys$0(AbstractKeyedStateBackend.java:256)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_265]
>       at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>  ~[?:1.8.0_265]
>       at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) 
> ~[?:1.8.0_265]
>       at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:249)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.fireExpiredAsyncOperations(AsyncOperationFailureNotifier.java:42)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.initializeState(FunctionGroupOperator.java:160)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
> Caused by: java.lang.NullPointerException: Keyed state can only be used on a 
> 'keyed stream', i.e., after a 'keyBy()' operation.
>       at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) 
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:223)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:188)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateAccessor(FlinkState.java:69)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindValue(FlinkStateBinder.java:48)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:30) 
> ~[statefun-flink-distribution.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:46)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:148)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.flink.core.functions.Reductions.enqueueAsyncOperationAfterRestore(Reductions.java:154)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.process(AsyncOperationFailureNotifier.java:66)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.process(AsyncOperationFailureNotifier.java:30)
>  ~[statefun-flink-core.jar:2.3-SNAPSHOT]
>       at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.lambda$applyToAllKeys$0(AbstractKeyedStateBackend.java:252)
>  ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>       ... 16 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to