[ https://issues.apache.org/jira/browse/FLINK-7524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16152697#comment-16152697 ]
ASF GitHub Bot commented on FLINK-7524: --------------------------------------- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/4639 [FLINK-7524] Remove potentially blocking behaviour from AbstractClose… …ableRegistry ## What is the purpose of the change This PR removes potential for blocking behaviour in all CloseableRegistries. ## Verifying this change Change is tested in `AbstractCloseableRegistryTest::testNonBlockingClose`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink improve-closeable-registry Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4639.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4639 ---- commit a11bc242031134504d80c66e3349772e315fa2cf Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2017-09-04T10:30:41Z [FLINK-7524] Remove potentially blocking behaviour from AbstractCloseableRegistry ---- > Task "xxx" did not react to cancelling signal, but is stuck in method > --------------------------------------------------------------------- > > Key: FLINK-7524 > URL: https://issues.apache.org/jira/browse/FLINK-7524 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.3.0 > Reporter: Bowen Li > Priority: Blocker > Fix For: 1.4.0 > > > Hi, > I observed the following errors in taskmanager.log > {code:java} > 2017-08-25 17:03:40,141 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'TriggerWindow(SlidingEventTimeWindows(259200000, > 3600000), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal, but is stuck in > method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > java.lang.Thread.run(Thread.java:748) > ... > 2017-08-25 17:05:10,139 INFO org.apache.flink.runtime.taskmanager.Task > - Notifying TaskManager about fatal error. Task > 'TriggerWindow(SlidingEventTimeWindows(259200000, 3600000), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 > seconds, but is stuck in method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > java.lang.Thread.run(Thread.java:748) > . > 2017-08-25 17:05:10,140 ERROR org.apache.flink.yarn.YarnTaskManager > - > ============================================================== > ====================== FATAL ======================= > ============================================================== > A fatal error occurred, forcing the TaskManager to shut down: Task > 'TriggerWindow(SlidingEventTimeWindows(259200000, 3600000), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 > seconds, but is stuck in method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > java.lang.Thread.run(Thread.java:748) > {code} > Why is the task stucked? -- This message was sent by Atlassian JIRA (v6.4.14#64029)