[
https://issues.apache.org/jira/browse/FLINK-18150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152630#comment-17152630
]
Julius Michaelis edited comment on FLINK-18150 at 7/7/20, 10:22 AM:
--------------------------------------------------------------------
That's reassuring to hear. It's my first time poking around in the Kafka code
and I'm a bit out of my depth.
Bearing that in mind, I've found another odd symptom: If you turn one trace
logging on a few of the Kafka classes with Flink 1.11:
{code:yaml}
taskmanager1: &taskmanager
# ...
entrypoint: |
bash -c "
echo >>/opt/flink/conf/flink-conf.yaml env.java.opts:
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
echo >>/opt/flink/conf/log4j-console.properties logger.my1.name =
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
echo >>/opt/flink/conf/log4j-console.properties logger.my1.level = WARN
echo >>/opt/flink/conf/log4j-console.properties logger.my2.name =
org.apache.kafka.clients.NetworkClient
echo >>/opt/flink/conf/log4j-console.properties logger.my2.level = TRACE
echo >>/opt/flink/conf/log4j-console.properties logger.my3.name =
org.apache.kafka.clients.consumer.internals.Fetcher
echo >>/opt/flink/conf/log4j-console.properties logger.my3.level = TRACE
sed -ri 's/appender.console.layout.pattern = /&%t /'
/opt/flink/conf/log4j-console.properties
exec /docker-entrypoint.sh taskmanager
"
{code}
Then you only get log lines like
{code:none}
2020-07-07 08:12:47,186 TRACE org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=consumer-joebs-49, groupId=joebs] Found
least loaded connecting node kafka1:9091 (id: -1 rack: null)
{code}
i.e. it seems like its not even trying to connect to the healthy broker, only
the dead one. (With Flink 1.10 / Kafka Client 2.2.0, it at least seems to try
both.)
I've played with trying to get {{NetworkClient}} to pronounce the nead node
dead for a longer time by setting {{reconnect.backoff(.max).ms}} so it wont
turn up in the Connectable/Connecting list, to no avail.
was (Author: caesar):
That's reassuring to hear. It's my first time poking around in the Kafka code
and I'm a bit out of my depth.
Bearing that in mind, I've found another odd symptom: If you turn one trace
logging on a few of the Kafka classes with Flink 1.11:
{code:yaml}
taskmanager1: &taskmanager
# ...
entrypoint: |
bash -c "
echo >>/opt/flink/conf/flink-conf.yaml env.java.opts:
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
echo >>/opt/flink/conf/log4j-console.properties logger.my1.name =
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
echo >>/opt/flink/conf/log4j-console.properties logger.my1.level = WARN
echo >>/opt/flink/conf/log4j-console.properties logger.my2.name =
org.apache.kafka.clients.NetworkClient
echo >>/opt/flink/conf/log4j-console.properties logger.my2.level = TRACE
echo >>/opt/flink/conf/log4j-console.properties logger.my3.name =
org.apache.kafka.clients.consumer.internals.Fetcher
echo >>/opt/flink/conf/log4j-console.properties logger.my3.level = TRACE
sed -ri 's/appender.console.layout.pattern = /&%t /'
/opt/flink/conf/log4j-console.properties
exec /docker-entrypoint.sh taskmanager
"
{code}
Then you only get log lines like
{code:none}
2020-07-07 08:12:47,186 TRACE org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=consumer-joebs-49, groupId=joebs] Found
least loaded connecting node kafka1:9091 (id: -1 rack: null)
{code}
i.e. it seems like its not even trying to connect to the healthy broker, only
the dead one. (With Flink 1.10 / Kafka Client 2.2.0, it at least seems to try
both.)
I've played with trying to get {NetworkClient} to pronounce the nead node dead
for a longer time by setting {{reconnect.backoff(.max).ms}} so it wont turn up
in the Connectable/Connecting list, to no avail.
> A single failing Kafka broker may cause jobs to fail indefinitely with
> TimeoutException: Timeout expired while fetching topic metadata
> --------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-18150
> URL: https://issues.apache.org/jira/browse/FLINK-18150
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.10.1
> Environment: It is a bit unclear to me under what circumstances this
> can be reproduced. I created a "minimum" non-working example at
> https://github.com/jcaesar/flink-kafka-ha-failure. Note that this deploys the
> minimum number of Kafka brokers, but it works just as well with replication
> factor 3 and 8 brokers, e.g.
> I run this with
> {code:bash}
> docker-compose kill; and docker-compose rm -vf; and docker-compose up
> --abort-on-container-exit --build
> {code}
> The exception should appear on the webui after 5~6 minutes.
> To make sure that this isn't dependent on my machine, I've also checked
> reproducibility on a m5a.2xlarge EC2 instance.
> You verify that the Kafka cluster is running "normally" e.g. with:
> {code:bash}
> kafkacat -b localhost,localhost:9093 -L
> {code}
> So far, I only know that
> * {{flink.partition-discovery.interval-millis}} must be set.
> * The broker that failed must be part of the {{bootstrap.servers}}
> * There needs to be a certain amount of topics or producers, but I'm unsure
> which is crucial
> * Changing the values of {{metadata.request.timeout.ms}} or
> {{flink.partition-discovery.interval-millis}} does not seem to have any
> effect.
> Reporter: Julius Michaelis
> Assignee: Aljoscha Krettek
> Priority: Major
>
> When a Kafka broker fails that is listed among the bootstrap servers and
> partition discovery is active, the Flink job reading from that Kafka may
> enter a failing loop.
> At first, the job seems to react normally without failure with only a short
> latency spike when switching Kafka leaders.
> Then, it fails with a
> {code:none}
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
> at
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:821)
> at
> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> It recovers, but processes fewer than the expected amount of records.
> Finally, the job fails with
> {code:none}
> 2020-06-05 13:59:37
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
> {code}
> and repeats doing so while not processing any records. (The exception comes
> without any backtrace or otherwise interesting information)
> I have also observed this behavior with partition-discovery turned off, but
> only when the Flink job failed (after a broker failure) and had to run
> checkpoint recovery for some other reason.
> Please see the [Environment] description for information on how to reproduce
> the issue.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)