[ 
https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371822#comment-17371822
 ] 

Yun Gao commented on FLINK-22085:
---------------------------------

Thanks [~xintongsong] for the review and monitoring the tests~ Afterward I 
tried to repeat this test on Azure: 
[https://github.com/apache/flink/pull/16317], and it seems that the issue could 
reproduce in Azure for some probability. With all these instances I found that:
 # The failing identity map indeed process required (namely 1000) records after 
failover.
 # The sink might lost all the records from some map tasks, but the number of 
maps get lost is random. And I have not met with the case that a part of 
records get lost.

As a whole, I think it seems the case is not related to Kafka, but might have 
some relationship with local network stack. One possible related point is that 
the test has set buffer timeout to 0. I'll try to further analyze the causes. 

> KafkaSourceLegacyITCase hangs/fails on azure
> --------------------------------------------
>
>                 Key: FLINK-22085
>                 URL: https://issues.apache.org/jira/browse/FLINK-22085
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.0, 1.14.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Yun Gao
>            Priority: Critical
>              Labels: pull-request-available, test-stability
>             Fix For: 1.14.0
>
>
> 1) Observations
> a) The Azure pipeline would occasionally hang without printing any test error 
> information.
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=8219]
> b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO 
> level logging, the the test would hang with the following error message 
> printed repeatedly:
> {code:java}
> 20451 [New I/O boss #50] ERROR 
> org.apache.flink.networking.NetworkFailureHandler [] - Closing communication 
> channel because of an exception
> java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
> ~[?:1.8.0_151]
>         at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
> ~[?:1.8.0_151]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
>  ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_151]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_151]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
> {code}
> *2) Root cause explanations*
> The test would hang because it enters the following loop:
>  - closeOnFlush() is called for a given channel
>  - closeOnFlush() calls channel.write(..)
>  - channel.write() triggers the exceptionCaught(...) callback
>  - closeOnFlush() is called for the same channel again.
> *3) Solution*
> Update closeOnFlush() so that, if a channel is being closed by this method, 
> then closeOnFlush() would not try to write to this channel if it is called on 
> this channel again.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to