[
https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17333085#comment-17333085
]
Dong Lin commented on FLINK-22085:
----------------------------------
[~dwysakowicz] I see. Previously the test would be fail with
org.junit.runners.model.TestTimedOutException if it does not end within 60
seconds. I suppose the test is hanging now because we have removed timeouts.
Now that we have verifies that the test would indeed hang if we do not timeout
tests, I think we can add back the timeout now to avoid test hang.
Regarding why that test could not end by itself: I have seen and investigated
this before. I didn't find the root cause as explained in the earlier comment
[1]. Take testMultipleSourcesOnePartition as example, this test might not end
by itself if the ValidatingExactlyOnceSink in the Pipeline could not receive
the expected number of messages, which means that some message is lost in the
pipeline.
Previously I have checked the FlinkKafkaConsumer code but could not find any
hypothesis of why message could be lost without triggering an exception in this
class. But FLINK-21996 gives me new hint as it suggests that messages could be
lost in the pipeline when any AddSplitEvent is lost. I currently don't have
enough knowledge to prove or disprove this hypothesis. Will need more time to
read the code.
It will be great if other Flink developers could help provide more ideas or
debug how a message could be lost in a Flink job.
[1]
https://issues.apache.org/jira/browse/FLINK-22085?focusedCommentId=17330815&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17330815
> KafkaSourceLegacyITCase hangs/fails on azure
> --------------------------------------------
>
> Key: FLINK-22085
> URL: https://issues.apache.org/jira/browse/FLINK-22085
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.13.0, 1.14.0
> Reporter: Dawid Wysakowicz
> Assignee: Dong Lin
> Priority: Critical
> Labels: pull-request-available, test-stability
> Fix For: 1.13.0
>
>
> 1) Observations
> a) The Azure pipeline would occasionally hang without printing any test error
> information.
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=8219]
> b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO
> level logging, the the test would hang with the following error message
> printed repeatedly:
> {code:java}
> 20451 [New I/O boss #50] ERROR
> org.apache.flink.networking.NetworkFailureHandler [] - Closing communication
> channel because of an exception
> java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[?:1.8.0_151]
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[?:1.8.0_151]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
> ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_151]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_151]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
> {code}
> *2) Root cause explanations*
> The test would hang because it enters the following loop:
> - closeOnFlush() is called for a given channel
> - closeOnFlush() calls channel.write(..)
> - channel.write() triggers the exceptionCaught(...) callback
> - closeOnFlush() is called for the same channel again.
> *3) Solution*
> Update closeOnFlush() so that, if a channel is being closed by this method,
> then closeOnFlush() would not try to write to this channel if it is called on
> this channel again.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)