[ 
https://issues.apache.org/jira/browse/FLINK-19866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seth Wiesman reassigned FLINK-19866:
------------------------------------

    Assignee: wang

> FunctionsStateBootstrapOperator.createStateAccessor fails due to 
> uninitialized runtimeContext
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19866
>                 URL: https://issues.apache.org/jira/browse/FLINK-19866
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>    Affects Versions: statefun-2.2.0, 1.11.2
>            Reporter: wang
>            Assignee: wang
>            Priority: Blocker
>              Labels: pull-request-available
>
> It has bugs similar to 
> [FLINK-19330|https://issues.apache.org/jira/browse/FLINK-19330]
> In Flink 1.11.2, statefun-flink-state-processor 2.2.0, the 
> AbstractStreamOperator's runtimeContext is not fully initialized when 
> executing
>  AbstractStreamOperator#intializeState()
> in particular KeyedStateStore is set after intializeState was finished.
> See:
> [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L258,L259]
> This behaviour was changed from Flink 1.10->Flink 1.11.
> StateFun's FunctionsStateBootstrapOperator performs its initialization logic 
> at initalizeState, and it requires an already initialized runtimeContext to 
> create stateAccessor.
> This situation causes the following failure: 
> {code:java}
> Caused by: java.lang.NullPointerException: Keyed state can only be used on a 
> 'keyed stream', i.e., after a 'keyBy()' operation.Caused by: 
> java.lang.NullPointerException: Keyed state can only be used on a 'keyed 
> stream', i.e., after a 'keyBy()' operation. at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:223)
>  at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:188)
>  at 
> org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateAccessor(FlinkState.java:69)
>  at 
> org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindValue(FlinkStateBinder.java:48)
>  at org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:30) 
> at 
> org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:46)
>  at 
> org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapFunctionRegistry.bindState(StateBootstrapFunctionRegistry.java:120)
>  at 
> org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapFunctionRegistry.initialize(StateBootstrapFunctionRegistry.java:103)
>  at 
> org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapper.<init>(StateBootstrapper.java:39)
>  at 
> org.apache.flink.statefun.flink.state.processor.operator.FunctionsStateBootstrapOperator.initializeState(FunctionsStateBootstrapOperator.java:67)
>  at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
>  at 
> org.apache.flink.state.api.output.BoundedStreamTask.init(BoundedStreamTask.java:85)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:457)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>  at 
> org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76)
>  at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504) at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
> java.lang.Thread.run(Thread.java:748){code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to