[ 
https://issues.apache.org/jira/browse/FLINK-7524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16152697#comment-16152697
 ] 

ASF GitHub Bot commented on FLINK-7524:
---------------------------------------

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/4639

    [FLINK-7524] Remove potentially blocking behaviour from AbstractClose…

    …ableRegistry
    
    ## What is the purpose of the change
    
    This PR removes potential for blocking behaviour in all CloseableRegistries.
    
    ## Verifying this change
    
    Change is tested in `AbstractCloseableRegistryTest::testNonBlockingClose`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
    
    ## Documentation
    
      - Does this pull request introduce a new feature?  (no)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink 
improve-closeable-registry

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4639
    
----
commit a11bc242031134504d80c66e3349772e315fa2cf
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2017-09-04T10:30:41Z

    [FLINK-7524] Remove potentially blocking behaviour from 
AbstractCloseableRegistry

----


> Task "xxx" did not react to cancelling signal, but is stuck in method
> ---------------------------------------------------------------------
>
>                 Key: FLINK-7524
>                 URL: https://issues.apache.org/jira/browse/FLINK-7524
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0
>            Reporter: Bowen Li
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> Hi,
> I observed the following errors in taskmanager.log 
> {code:java}
> 2017-08-25 17:03:40,141 WARN  org.apache.flink.runtime.taskmanager.Task       
>               - Task 'TriggerWindow(SlidingEventTimeWindows(259200000, 
> 3600000), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal, but is stuck in 
> method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> java.lang.Thread.run(Thread.java:748)
> ...
> 2017-08-25 17:05:10,139 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Notifying TaskManager about fatal error. Task 
> 'TriggerWindow(SlidingEventTimeWindows(259200000, 3600000), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 
> seconds, but is stuck in method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> java.lang.Thread.run(Thread.java:748)
> .
> 2017-08-25 17:05:10,140 ERROR org.apache.flink.yarn.YarnTaskManager           
>               - 
> ==============================================================
> ======================      FATAL      =======================
> ==============================================================
> A fatal error occurred, forcing the TaskManager to shut down: Task 
> 'TriggerWindow(SlidingEventTimeWindows(259200000, 3600000), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 
> seconds, but is stuck in method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> java.lang.Thread.run(Thread.java:748)
> {code}
> Why is the task stucked?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to