[
https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15569025#comment-15569025
]
Shannon Carey commented on FLINK-4803:
--------------------------------------
Yes, that's right. cancel() blocks on close(), and therefore if close()
misbehaves the thread is never interrupted and cancel() blocks forever.
In the issue description, I suggested your option #2. I think you'll want #1 no
matter what. However, #2 allows for at least one message and/or exception to be
logged that tells the user what went wrong (why their job is taking a long time
to cancel, or why it did not cancel gracefully). I'm not sure what your
DataSink-specific option would look like. Maybe it is similar to my workaround,
where I wrapped my HadoopOutputFormat in a subclass that calls super.close()
from a separate thread with a timeout? That workaround is ok, but I had to
expend a fair amount of effort to figure out what the problem was, and also
there was nothing I could do but restart Flink in order to get my job to
terminate (not a desirable solution). You'll want Flink to function smoothly
regardless of what data sink the user chooses.
> Job Cancel can hang forever waiting for OutputFormat.close()
> ------------------------------------------------------------
>
> Key: FLINK-4803
> URL: https://issues.apache.org/jira/browse/FLINK-4803
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.1.1
> Reporter: Shannon Carey
>
> If the Flink job uses a badly-behaved OutputFormat (in this example, a
> HadoopOutputFormat containing a CqlBulkOutputFormat), where the close()
> method blocks forever, it is impossible to cancel the Flink job even though
> the blocked thread would respond to an interrupt. The stack traces below show
> the state of the important threads when a job is canceled and the
> OutputFormat is blocking forever inside of close().
> I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on
> `this.format.close()`. When the timeout is reached, the Task thread should be
> interrupted.
> {code}
> "Canceler for DataSink
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)"
> #6422 daemon prio=5 os_prio=0 tid=0x00007fb7e42f0000 nid=0x34f3 waiting for
> monitor entry [0x00007fb7be079000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - waiting to lock <0x00000006bae5f788> (a java.lang.Object)
> at
> org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
> at java.lang.Thread.run(Thread.java:745)
> "DataSink
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)"
> #6410 daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on
> condition [0x00007fb7bdf78000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000006c5ab5e20> (a
> java.util.concurrent.SynchronousQueue$TransferStack)
> at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
> at
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
> at
> java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
> at
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
> at
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
> at
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
> at
> org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
> at
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
> at
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
> at
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - locked <0x00000006bae5f788> (a java.lang.Object)
> at
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)