Shannon Carey created FLINK-4803:
------------------------------------
Summary: Job Cancel can hang forever waiting for
OutputFormat.close()
Key: FLINK-4803
URL: https://issues.apache.org/jira/browse/FLINK-4803
Project: Flink
Issue Type: Bug
Affects Versions: 1.1.1
Reporter: Shannon Carey
If the Flink job uses a badly-behaved OutputFormat (in this example, a
HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() method
blocks forever, it is impossible to cancel the Flink job even though the
blocked thread would respond to an interrupt. The stack traces below show the
state of the important threads when a job is canceled and the OutputFormat is
blocking forever inside of close().
I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on
`this.format.close()`. When the timeout is reached, the Task thread should be
interrupted.
{code}
java.lang.Thread.State: BLOCKED (on object monitor)
at
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
- waiting to lock <0x00000006bae5f788> (a java.lang.Object)
at
org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
at
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
at java.lang.Thread.run(Thread.java:745)
"DataSink
(org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)"
#6410 daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on
condition [0x00007fb7bdf78000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c5ab5e20> (a
java.util.concurrent.SynchronousQueue$TransferStack)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
at
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
at
java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
at
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
at
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
at
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
at
org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
at
org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
at
org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
at
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
- locked <0x00000006bae5f788> (a java.lang.Object)
at
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)