[
https://issues.apache.org/jira/browse/FLINK-18204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17130399#comment-17130399
]
shengsheng lin commented on FLINK-18204:
----------------------------------------
[~yunta] It seems that we found the cause of the problem because we used the
registerEventTimeTimer method of InternalTimerService in another thread, which
will call eventTimeTimersQueue.add Method, and Flink will call
eventTimeTimersQueue.poll () method. As a result, the eventTimeTimersQueue was
modified by two threads. Can we consider implementing a thread safe queue in
Flink, like java.util.concurrent.PriorityBlockingQueue .
> NullPointerException when materialize checkpoint
> -------------------------------------------------
>
> Key: FLINK-18204
> URL: https://issues.apache.org/jira/browse/FLINK-18204
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.6.3
> Reporter: shengsheng lin
> Priority: Major
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 3398 for
> operator SelectCepOperator -> Map -> Sink: Unnamed (1/12).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NullPointerException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.runtime.state.KeyExtractorFunction$1.extractKeyFromElement(KeyExtractorFunction.java:35)
> at
> org.apache.flink.runtime.state.KeyExtractorFunction$1.extractKeyFromElement(KeyExtractorFunction.java:31)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner.reportAllElementKeyGroups(KeyGroupPartitioner.java:153)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner.partitionByKeyGroup(KeyGroupPartitioner.java:137)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueStateSnapshot.getKeyGroupWriter(HeapPriorityQueueStateSnapshot.java:103)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:814)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759)
> at
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
> ... 7 more
--
This message was sent by Atlassian Jira
(v8.3.4#803005)