[ 
https://issues.apache.org/jira/browse/FLINK-18204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17130399#comment-17130399
 ] 

shengsheng lin commented on FLINK-18204:
----------------------------------------

[~yunta] It seems that we found the cause of the problem because we used the 
registerEventTimeTimer method of InternalTimerService in another thread, which 
will call eventTimeTimersQueue.add Method, and Flink will call 
eventTimeTimersQueue.poll () method. As a result, the eventTimeTimersQueue was 
modified by two threads. Can we consider implementing a thread safe queue in 
Flink, like java.util.concurrent.PriorityBlockingQueue .

> NullPointerException when  materialize checkpoint
> -------------------------------------------------
>
>                 Key: FLINK-18204
>                 URL: https://issues.apache.org/jira/browse/FLINK-18204
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.6.3
>            Reporter: shengsheng lin
>            Priority: Major
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 3398 for 
> operator SelectCepOperator -> Map -> Sink: Unnamed (1/12).
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>  ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NullPointerException
>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>  at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>  at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>  ... 5 more
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.flink.runtime.state.KeyExtractorFunction$1.extractKeyFromElement(KeyExtractorFunction.java:35)
>  at 
> org.apache.flink.runtime.state.KeyExtractorFunction$1.extractKeyFromElement(KeyExtractorFunction.java:31)
>  at 
> org.apache.flink.runtime.state.KeyGroupPartitioner.reportAllElementKeyGroups(KeyGroupPartitioner.java:153)
>  at 
> org.apache.flink.runtime.state.KeyGroupPartitioner.partitionByKeyGroup(KeyGroupPartitioner.java:137)
>  at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueStateSnapshot.getKeyGroupWriter(HeapPriorityQueueStateSnapshot.java:103)
>  at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:814)
>  at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759)
>  at 
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>  ... 7 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to