[ 
https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17372420#comment-17372420
 ] 

Yun Gao edited comment on FLINK-22085 at 7/2/21, 11:51 AM:
-----------------------------------------------------------

Hi all, I should have found the reason: for flush timeout = 0, the flush 
happens only after the records emitted and there is no standalone flush thread, 
then after failover:
 # The (source -> map -> map) task (parallel = 8) started, it first try to 
restore the partition state, after this it broadcast EndOfChannelStateEvent, 
this would block the channel. 
 # Then for the following records emitted by the (source -> map -> map) tasks, 
it would not notifyDataAvailable since isBlocked = true.
 # After the (sink) task (parallel = 1) received all the 
EndOfChannelStateEvent, it would resume all the subpartitions. After this it 
would check if the subpartition is available, if so, it would queue the 
corresponding local input channel
 # However, if before 3, the (source -> map -> map) task has emitted all the 
1000 records, then these record would not be notified during resuming since the 
subpartition has isBlock = false, but when it check the availability of the 
subpartition, it would return isAvailable() = false since flush requested = 
false. Then the data won't be notified in the future

The bug could be reproduced locally by add sleep in the 
UpstreamRecoveryTrackImpl#handleEndOfRecovery to delay the step 3. 


was (Author: gaoyunhaii):
Hi all, I should have found the reason: for flush timeout = 0, the flush 
happens only after the records emitted and there is no standalone flush thread, 
then after failover:
 # The (source -> map -> map) task (parallel = 8) started, it first try to 
restore the partition state, after this it broadcast EndOfChannelStateEvent, 
this would block the channel. 
 # Then for the following records emitted by the (source -> map -> map) tasks, 
it would not notifyDataAvailable since isBlocked = true.
 # After the (sink) task (parallel = 1) received all the 
EndOfChannelStateEvent, it would resume all the subpartitions. After this it 
would check if the subpartition is available, if so, it would queue the 
corresponding local input channel
 # However, if before 3, the (source -> map -> map) task has emitted all the 
1000 records, then these record would be notified, during resuming, the 
subpartition set isBlock to false, but when it check the availability of the 
subpartition, it would return isAvailable() = false since flush requested = 
false. Then the data won't be notified in the future

The bug could be reproduced locally by add sleep in the 
UpstreamRecoveryTrackImpl#handleEndOfRecovery to delay the step 3. 

> KafkaSourceLegacyITCase hangs/fails on azure
> --------------------------------------------
>
>                 Key: FLINK-22085
>                 URL: https://issues.apache.org/jira/browse/FLINK-22085
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.0, 1.14.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Yun Gao
>            Priority: Critical
>              Labels: pull-request-available, test-stability
>             Fix For: 1.14.0
>
>
> 1) Observations
> a) The Azure pipeline would occasionally hang without printing any test error 
> information.
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=8219]
> b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO 
> level logging, the the test would hang with the following error message 
> printed repeatedly:
> {code:java}
> 20451 [New I/O boss #50] ERROR 
> org.apache.flink.networking.NetworkFailureHandler [] - Closing communication 
> channel because of an exception
> java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
> ~[?:1.8.0_151]
>         at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
> ~[?:1.8.0_151]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
>  ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_151]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_151]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
> {code}
> *2) Root cause explanations*
> The test would hang because it enters the following loop:
>  - closeOnFlush() is called for a given channel
>  - closeOnFlush() calls channel.write(..)
>  - channel.write() triggers the exceptionCaught(...) callback
>  - closeOnFlush() is called for the same channel again.
> *3) Solution*
> Update closeOnFlush() so that, if a channel is being closed by this method, 
> then closeOnFlush() would not try to write to this channel if it is called on 
> this channel again.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to