[
https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371822#comment-17371822
]
Yun Gao edited comment on FLINK-22085 at 6/30/21, 6:00 AM:
-----------------------------------------------------------
Thanks [~xintongsong] for the review and monitoring the tests~ Afterward I
tried to repeat this test on Azure:
[https://github.com/apache/flink/pull/16317], and it seems that the issue could
reproduce in Azure for some probability. With all these instances I found that:
# The failing identity map indeed process required (namely 1000) records after
failover.
# The sink might lost all the records from some map tasks, but the number of
maps get lost is random. And I have not met with the case that a part of
records from a single task get lost.
As a whole, I think it seems the case is not related to Kafka, but might have
some relationship with local network stack. One possible related point is that
the test has set buffer timeout to 0. I'll try to further analyze the causes.
was (Author: gaoyunhaii):
Thanks [~xintongsong] for the review and monitoring the tests~ Afterward I
tried to repeat this test on Azure:
[https://github.com/apache/flink/pull/16317], and it seems that the issue could
reproduce in Azure for some probability. With all these instances I found that:
# The failing identity map indeed process required (namely 1000) records after
failover.
# The sink might lost all the records from some map tasks, but the number of
maps get lost is random. And I have not met with the case that a part of
records get lost.
As a whole, I think it seems the case is not related to Kafka, but might have
some relationship with local network stack. One possible related point is that
the test has set buffer timeout to 0. I'll try to further analyze the causes.
> KafkaSourceLegacyITCase hangs/fails on azure
> --------------------------------------------
>
> Key: FLINK-22085
> URL: https://issues.apache.org/jira/browse/FLINK-22085
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.13.0, 1.14.0
> Reporter: Dawid Wysakowicz
> Assignee: Yun Gao
> Priority: Critical
> Labels: pull-request-available, test-stability
> Fix For: 1.14.0
>
>
> 1) Observations
> a) The Azure pipeline would occasionally hang without printing any test error
> information.
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=8219]
> b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO
> level logging, the the test would hang with the following error message
> printed repeatedly:
> {code:java}
> 20451 [New I/O boss #50] ERROR
> org.apache.flink.networking.NetworkFailureHandler [] - Closing communication
> channel because of an exception
> java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[?:1.8.0_151]
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[?:1.8.0_151]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
> ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_151]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_151]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
> {code}
> *2) Root cause explanations*
> The test would hang because it enters the following loop:
> - closeOnFlush() is called for a given channel
> - closeOnFlush() calls channel.write(..)
> - channel.write() triggers the exceptionCaught(...) callback
> - closeOnFlush() is called for the same channel again.
> *3) Solution*
> Update closeOnFlush() so that, if a channel is being closed by this method,
> then closeOnFlush() would not try to write to this channel if it is called on
> this channel again.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)