[ 
https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-4803.
------------------------------
    Resolution: Duplicate

> Job Cancel can hang forever waiting for OutputFormat.close()
> ------------------------------------------------------------
>
>                 Key: FLINK-4803
>                 URL: https://issues.apache.org/jira/browse/FLINK-4803
>             Project: Flink
>          Issue Type: Bug
>          Components: Batch Connectors and Input/Output Formats
>    Affects Versions: 1.1.1
>            Reporter: Shannon Carey
>
> If the Flink job uses a badly-behaved OutputFormat (in this example, a 
> HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() 
> method blocks forever, it is impossible to cancel the Flink job even though 
> the blocked thread would respond to an interrupt. The stack traces below show 
> the state of the important threads when a job is canceled and the 
> OutputFormat is blocking forever inside of close().
> I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
> `this.format.close()`. When the timeout is reached, the Task thread should be 
> interrupted.
> {code}
> "Canceler for DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6422 daemon prio=5 os_prio=0 tid=0x00007fb7e42f0000 nid=0x34f3 waiting for 
> monitor entry [0x00007fb7be079000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
>         - waiting to lock <0x00000006bae5f788> (a java.lang.Object)
>         at 
> org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
>         at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
>         at java.lang.Thread.run(Thread.java:745)
> "DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6410 daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on 
> condition [0x00007fb7bdf78000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000006c5ab5e20> (a 
> java.util.concurrent.SynchronousQueue$TransferStack)
>         at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>         at 
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
>         at 
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
>         at 
> java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
>         at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
>         at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
>         at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
>         at 
> org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
>         at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
>         at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
>         at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
>         - locked <0x00000006bae5f788> (a java.lang.Object)
>         at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to