[ 
https://issues.apache.org/jira/browse/FLINK-13226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16885348#comment-16885348
 ] 

Jiangjie Qin commented on FLINK-13226:
--------------------------------------

I took a look at the stack trace here. We are hitting KAFKA-6635 here which is 
causing a deadlock.

The expected behavior of the test is the following:
 # produce some messages to Kafka 
 # flush and checkpoint some of the messages.
 # produce some more messages.
 # inject an artificial exception to cause the application to fail
 # failover, complete the producing and finish the job

The process stuck at step 5.

 

The task main thread was about to exit and tried to grab checkpoint lock to 
release the operator chain outputs.

{noformat}

"Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Unnamed) (1/1)" #224 
prio=5 os_prio=0 tid=0x00007f7360c28800 nid=0x36e4 waiting for monitor entry 
[0x00007f732d44d000]
 java.lang.Thread.State: BLOCKED (on object monitor)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:489)
 - waiting to lock <0x000000009914c9d8> (a java.lang.Object)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:685)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:515)
 at java.lang.Thread.run(Thread.java:748)

{noformat}

But the checkpointing thread was holding the checkpoint lock and blocked 
waiting for the Kafka transaction to be committed.

{noformat}

"Async calls on Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Unnamed) 
(1/1)" #227 daemon prio=5 os_prio=0 tid=0x00007f7343ccf800 nid=0x36e7 waiting 
on condition [0x00007f732cf4a000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x00000000ec805fb8> (a 
java.util.concurrent.CountDownLatch$Sync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
 at 
org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
 at 
org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:698)
 at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.commitTransaction(FlinkKafkaInternalProducer.java:86)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.commit(FlinkKafkaProducer.java:906)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.commit(FlinkKafkaProducer.java:98)
 at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:283)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:827)
 - locked <0x000000009914c9d8> (a java.lang.Object)
 at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1178)
 at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

{noformat}

The KafkaProducer internal sender thread is supposed to complete the 
transaction commit and notify the thread committing transaction. However, the 
internal sender thread has already gone because the producers has been closed 
when the task main thread disposes the all the operators. Therefore the 
checkpointing thread will just block forever, so does the task main thread.

This fix should be on the KafkaProducer side. However, currently the fix is 
only in Kafka 2.3.0. And there is no chance this to be backported to 0.11. 
Given that we do not have Kafka 2.3.0 dependency in Flink 1.9. We probably 
still need to fix it by ourselves in Flink first.

I think the solution here is basically
 # Ensure the closing of the FlinkKafkaProducer is mutually exclusive to any 
other thread.
 # After the FlinkKafkaProducer is closed, no further action is made to the 
producer.

 

> KafkaProducerExactlyOnceITCase.testMultipleSinkOperators fails on Travis
> ------------------------------------------------------------------------
>
>                 Key: FLINK-13226
>                 URL: https://issues.apache.org/jira/browse/FLINK-13226
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>            Reporter: Till Rohrmann
>            Assignee: Jiangjie Qin
>            Priority: Critical
>              Labels: test-stability
>             Fix For: 1.9.0
>
>
> The {{KafkaProducerExactlyOnceITCase.testMultipleSinkOperators}} fails on 
> Travis with not producing output for 300 s.
> https://api.travis-ci.org/v3/job/557290235/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to