[ 
https://issues.apache.org/jira/browse/FLINK-7524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16176212#comment-16176212
 ] 

ASF GitHub Bot commented on FLINK-7524:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r140455425
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java 
---
    @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws 
IOException {
                }
     
                synchronized (getSynchronizationLock()) {
    -                   if (closed) {
    -                           IOUtils.closeQuietly(closeable);
    -                           throw new IOException("Cannot register 
Closeable, registry is already closed. Closing argument.");
    +                   if (!closed) {
    +                           doRegister(closeable, closeableToRef);
    +                           return;
                        }
    -
    -                   doRegister(closeable, closeableToRef);
                }
    +
    +           IOUtils.closeQuietly(closeable);
    --- End diff --
    
    And for the second part, I think it should be closed automatically, because 
you the caller decides to give the responsibility to the registry and tie it to 
the registry's status. So the registry should take care that the close status 
is propagated to new incoming objects.


> Task "xxx" did not react to cancelling signal, but is stuck in method
> ---------------------------------------------------------------------
>
>                 Key: FLINK-7524
>                 URL: https://issues.apache.org/jira/browse/FLINK-7524
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0
>            Reporter: Bowen Li
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> Hi,
> I observed the following errors in taskmanager.log 
> {code:java}
> 2017-08-25 17:03:40,141 WARN  org.apache.flink.runtime.taskmanager.Task       
>               - Task 'TriggerWindow(SlidingEventTimeWindows(259200000, 
> 3600000), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal, but is stuck in 
> method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> java.lang.Thread.run(Thread.java:748)
> ...
> 2017-08-25 17:05:10,139 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Notifying TaskManager about fatal error. Task 
> 'TriggerWindow(SlidingEventTimeWindows(259200000, 3600000), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 
> seconds, but is stuck in method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> java.lang.Thread.run(Thread.java:748)
> .
> 2017-08-25 17:05:10,140 ERROR org.apache.flink.yarn.YarnTaskManager           
>               - 
> ==============================================================
> ======================      FATAL      =======================
> ==============================================================
> A fatal error occurred, forcing the TaskManager to shut down: Task 
> 'TriggerWindow(SlidingEventTimeWindows(259200000, 3600000), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 
> seconds, but is stuck in method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> java.lang.Thread.run(Thread.java:748)
> {code}
> Why is the task stucked?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to