[
https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17367027#comment-17367027
]
Yun Gao commented on FLINK-22085:
---------------------------------
Hi [~lindong] thanks for the explanation and it indeed helps a lot. First sorry
that I still have not found the root cause, I further checked the case, and the
rough process is
1. A generate job starts up and generates 5 partitions, each with 1000 records.
2. A consumer job starts up, it has two tasks, the first is kafka source ->
failing identity map -> validate map with parallelism 8, and the second task
is the validating exactly-once sink, with parallelism 1.
3. Since there are only 5 partitions, 3 of the source tasks would finished
immediately after startup.
4. One of the failing identity map would trigger failover in the middle.
5. After restarts, the job would re-run. Note that no checkpoint taken in the
first run due to finished tasks, the job would run from scratch after failover.
And similar to the first run 3 of the 8 source tasks would finish immediately.
6. The original design is that the sink task would check if 5000 records are
received, if so, it would throw a SuccessException() to terminate the job.
The main problem here is that the debug thread prints that each of the 5 alive
failing identity map have processed 1000 records, but the sink still do not
throw the exception. I could not see other possible execution path for now. To
make it clear how many records the sink has received, I first open [a PR to
also add debug thread to print the sink thread |
https://github.com/apache/flink/pull/16233]. Could some one have a look at this
PR first? Very thanks~
> KafkaSourceLegacyITCase hangs/fails on azure
> --------------------------------------------
>
> Key: FLINK-22085
> URL: https://issues.apache.org/jira/browse/FLINK-22085
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.13.0, 1.14.0
> Reporter: Dawid Wysakowicz
> Assignee: Yun Gao
> Priority: Critical
> Labels: pull-request-available, test-stability
> Fix For: 1.14.0
>
>
> 1) Observations
> a) The Azure pipeline would occasionally hang without printing any test error
> information.
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=8219]
> b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO
> level logging, the the test would hang with the following error message
> printed repeatedly:
> {code:java}
> 20451 [New I/O boss #50] ERROR
> org.apache.flink.networking.NetworkFailureHandler [] - Closing communication
> channel because of an exception
> java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[?:1.8.0_151]
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[?:1.8.0_151]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
> ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
> [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_151]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_151]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
> {code}
> *2) Root cause explanations*
> The test would hang because it enters the following loop:
> - closeOnFlush() is called for a given channel
> - closeOnFlush() calls channel.write(..)
> - channel.write() triggers the exceptionCaught(...) callback
> - closeOnFlush() is called for the same channel again.
> *3) Solution*
> Update closeOnFlush() so that, if a channel is being closed by this method,
> then closeOnFlush() would not try to write to this channel if it is called on
> this channel again.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)