[ 
https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568076#comment-15568076
 ] 

Stephan Ewen commented on FLINK-4803:
-------------------------------------

The task thread is actually interrupted. Immediately, after the call to 
"cancel()" and then periodically, if it does not exit.

The problem here seems to be that "cancel()" cannot proceed (after which the 
interruption would come) because in the HadoopOutputFormatBase, it is blocked 
on a lock held by the regular "close()" call.

I think there are two ways to address that:
  - Generic (1) - [FLINK-4715] - the TaskManagers should exit themselves (and 
rely on Yarn / Mesos / container service) to be restarted. This is the hard 
fallback that should catch all problems with respect to cancellation.
  - Generic (2) Let the cancel() call and the interrupt() calls come from two 
different threads (or add a watchdog that calls "interrupt()" if "cancel()" 
blocks for too long)
  - DataSink specific - attempt to call close() on cancellation, but do not 
block on locks and rather throw an exception.

What is your take on each of these approaches?

> Job Cancel can hang forever waiting for OutputFormat.close()
> ------------------------------------------------------------
>
>                 Key: FLINK-4803
>                 URL: https://issues.apache.org/jira/browse/FLINK-4803
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.1.1
>            Reporter: Shannon Carey
>
> If the Flink job uses a badly-behaved OutputFormat (in this example, a 
> HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() 
> method blocks forever, it is impossible to cancel the Flink job even though 
> the blocked thread would respond to an interrupt. The stack traces below show 
> the state of the important threads when a job is canceled and the 
> OutputFormat is blocking forever inside of close().
> I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
> `this.format.close()`. When the timeout is reached, the Task thread should be 
> interrupted.
> {code}
> "Canceler for DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6422 daemon prio=5 os_prio=0 tid=0x00007fb7e42f0000 nid=0x34f3 waiting for 
> monitor entry [0x00007fb7be079000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
>         - waiting to lock <0x00000006bae5f788> (a java.lang.Object)
>         at 
> org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
>         at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
>         at java.lang.Thread.run(Thread.java:745)
> "DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6410 daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on 
> condition [0x00007fb7bdf78000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000006c5ab5e20> (a 
> java.util.concurrent.SynchronousQueue$TransferStack)
>         at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>         at 
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
>         at 
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
>         at 
> java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
>         at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
>         at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
>         at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
>         at 
> org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
>         at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
>         at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
>         at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
>         - locked <0x00000006bae5f788> (a java.lang.Object)
>         at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to