[
https://issues.apache.org/jira/browse/FLINK-13226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16885348#comment-16885348
]
Jiangjie Qin commented on FLINK-13226:
--------------------------------------
I took a look at the stack trace here. We are hitting KAFKA-6635 here which is
causing a deadlock.
The expected behavior of the test is the following:
# produce some messages to Kafka
# flush and checkpoint some of the messages.
# produce some more messages.
# inject an artificial exception to cause the application to fail
# failover, complete the producing and finish the job
The process stuck at step 5.
The task main thread was about to exit and tried to grab checkpoint lock to
release the operator chain outputs.
{noformat}
"Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Unnamed) (1/1)" #224
prio=5 os_prio=0 tid=0x00007f7360c28800 nid=0x36e4 waiting for monitor entry
[0x00007f732d44d000]
java.lang.Thread.State: BLOCKED (on object monitor)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:489)
- waiting to lock <0x000000009914c9d8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:685)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:515)
at java.lang.Thread.run(Thread.java:748)
{noformat}
But the checkpointing thread was holding the checkpoint lock and blocked
waiting for the Kafka transaction to be committed.
{noformat}
"Async calls on Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Unnamed)
(1/1)" #227 daemon prio=5 os_prio=0 tid=0x00007f7343ccf800 nid=0x36e7 waiting
on condition [0x00007f732cf4a000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ec805fb8> (a
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at
org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
at
org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:698)
at
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.commitTransaction(FlinkKafkaInternalProducer.java:86)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.commit(FlinkKafkaProducer.java:906)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.commit(FlinkKafkaProducer.java:98)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:283)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:827)
- locked <0x000000009914c9d8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1178)
at
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{noformat}
The KafkaProducer internal sender thread is supposed to complete the
transaction commit and notify the thread committing transaction. However, the
internal sender thread has already gone because the producers has been closed
when the task main thread disposes the all the operators. Therefore the
checkpointing thread will just block forever, so does the task main thread.
This fix should be on the KafkaProducer side. However, currently the fix is
only in Kafka 2.3.0. And there is no chance this to be backported to 0.11.
Given that we do not have Kafka 2.3.0 dependency in Flink 1.9. We probably
still need to fix it by ourselves in Flink first.
I think the solution here is basically
# Ensure the closing of the FlinkKafkaProducer is mutually exclusive to any
other thread.
# After the FlinkKafkaProducer is closed, no further action is made to the
producer.
> KafkaProducerExactlyOnceITCase.testMultipleSinkOperators fails on Travis
> ------------------------------------------------------------------------
>
> Key: FLINK-13226
> URL: https://issues.apache.org/jira/browse/FLINK-13226
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.9.0
> Reporter: Till Rohrmann
> Assignee: Jiangjie Qin
> Priority: Critical
> Labels: test-stability
> Fix For: 1.9.0
>
>
> The {{KafkaProducerExactlyOnceITCase.testMultipleSinkOperators}} fails on
> Travis with not producing output for 300 s.
> https://api.travis-ci.org/v3/job/557290235/log.txt
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)