[ 
https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17008701#comment-17008701
 ] 

Seth Wiesman commented on FLINK-15406:
--------------------------------------

The issue is the `KeyedStateBootstrapOperator` creates a timer service whether 
or not it is used[1]. This means as it stands it can only create operators that 
also use a timer service. We can either update the operator to create the timer 
service lazily or all a different operator / api for non-process functions. 

[1] 
https://github.com/apache/flink/blob/a50d9ff6db93d961805c0e8426921efc00d42385/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java#L64-L74
 

> The savepoint is writted by "State Processor API" can't be restore by map or 
> flatmap
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-15406
>                 URL: https://issues.apache.org/jira/browse/FLINK-15406
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.9.1
>            Reporter: Darcy Lin
>            Priority: Major
>         Attachments: CountWord.java
>
>
> The savepoint is writted by "State Processor API" can't be restore by map or 
> flatmap. But it can be retored by KeyedProcessFunction.  
>  Following is the error message:
> {code:java}
> java.lang.Exception: Could not write timer service of Flat Map -> Map -> 
> Sink: device_first_user_create (1/8) to checkpoint state 
> stream.java.lang.Exception: Could not write timer service of Flat Map -> Map 
> -> Sink: device_first_user_create (1/8) to checkpoint state stream. at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
> java.lang.Thread.run(Thread.java:748)Caused by: 
> java.lang.NullPointerException at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at 
> org.apache.flink.streaming.api.operators.InternalTimersSnapshot.<init>(InternalTimersSnapshot.java:52)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
>  ... 19 more{code}
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to