[
https://issues.apache.org/jira/browse/FLINK-18150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152808#comment-17152808
]
Aljoscha Krettek commented on FLINK-18150:
------------------------------------------
This is even easier to reproduce with this:
{code:java}
package org.apache.flink.streaming.connectors.kafka;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
/**
* The list of brokers should contain a broker that doesn't exist. Sometimes
this example will
* succeed, sometimes it hangs indefinitely. This depends on which broker from
the bootstrap servers
* the client picks.
*/
public class FetchFromMissingBroker {
private static final Logger LOG =
LoggerFactory.getLogger(FetchFromMissingBroker.class);
public static void main(String[] args) {
LOG.info("Starting...");
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers",
"localhost:9092,localhost:9093");
kafkaProperties.setProperty("group.id", "joebs");
kafkaProperties.setProperty("key.deserializer",
StringDeserializer.class.getCanonicalName());
kafkaProperties.setProperty("value.deserializer",
StringDeserializer.class.getCanonicalName());
KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(kafkaProperties);
consumer.listTopics();
}
}
{code}
with trace logging on {{org.apache.kafka.clients.NetworkClient}} you can see
what happens in the different cases.
# good case: the client picks {{localhost:9093}} -> all good
# semi-good case: the client picks {{localhost:9092}}, then tries again with
{{localhost:9093}} -> all good
# bad case: the client picks {{localhost:9092}} and then tries with that again
after a timeout -> bad
This is an interesting excerpt from a log where we see it repeatedly picks the
missing broker and then finally the good broker:
{code:java}
991 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer [] -
[Consumer clientId=consumer-joebs-1, groupId=joebs] Initializing the Kafka
consumer
1111 [main] INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka
version: 2.4.1
1111 [main] INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka
commitId: c57222ae8cd7866b
1111 [main] INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka
startTimeMs: 1594132735452
1112 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer [] -
[Consumer clientId=consumer-joebs-1, groupId=joebs] Kafka consumer initialized
1334 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded node
localhost:9092 (id: -1 rack: null) with no active connection
1338 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Initiating connection to node
localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1
1358 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
1364 [main] DEBUG org.apache.kafka.common.network.Selector [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Created socket with SO_RCVBUF =
342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1
1365 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Completed connection to node -1.
Fetching API versions.
1365 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Initiating API versions fetch from
node -1.
1365 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] No version information found when
sending API_VERSIONS with correlation id 1 to node -1. Assuming version 3.
1426 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Sending API_VERSIONS
{client_software_name=apache-kafka-java,client_software_version=2.4.1,_tagged_fields={}}
with correlation id 1 to node -1
1429 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
1434 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
4434 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
7438 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
10442 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
13445 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
16449 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
19452 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
22454 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
25455 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
28458 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9092 (id: -1 rack: null)
31462 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Disconnecting from node -1 due to
request timeout.
31465 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Cancelled request API_VERSIONS
{client_software_name=apache-kafka-java,client_software_version=2.4.1,_tagged_fields={}}
with correlation id 1 due to node -1 being disconnected
31465 [main] WARN org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Bootstrap broker localhost:9092 (id:
-1 rack: null) disconnected
31469 [main] DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient [] -
[Consumer clientId=consumer-joebs-1, groupId=joebs] Cancelled request with
header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-joebs-1,
correlationId=0) due to node -1 being disconnected
31572 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded node
localhost:9093 (id: -2 rack: null) with no active connection
31573 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Initiating connection to node
localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1
31574 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9093 (id: -2 rack: null)
31576 [main] DEBUG org.apache.kafka.common.network.Selector [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Created socket with SO_RCVBUF =
342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -2
31577 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Completed connection to node -2.
Fetching API versions.
31577 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Initiating API versions fetch from
node -2.
31577 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] No version information found when
sending API_VERSIONS with correlation id 3 to node -2. Assuming version 3.
31577 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Sending API_VERSIONS
{client_software_name=apache-kafka-java,client_software_version=2.4.1,_tagged_fields={}}
with correlation id 3 to node -2
31577 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9093 (id: -2 rack: null)
31578 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node
localhost:9093 (id: -2 rack: null){code}
The reason for why this problem becomes more prevalent in Flink 1.11 is here:
[https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L684].
Which was introduced in Kafka 2.3.x:
https://issues.apache.org/jira/browse/KAFKA-8376. The problem with this is that
the "broken" node {{localhost:9092}} is listed as a connecting node and the
logic will prefer that as the "least loaded node". With Flink 1.11.x you will
have this "fix" while on Flink 1.10.x, which uses Kafka 2.2.x you don't have it.
Also, as you found out already, there's two levels of retry: the first one is
here:
[https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L368].
Where the polling will continuously pick our broken broker. When that polling
finally fails we send a completely new request in the next loop iteration,
which might or might not pick the good broker now.
> A single failing Kafka broker may cause jobs to fail indefinitely with
> TimeoutException: Timeout expired while fetching topic metadata
> --------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-18150
> URL: https://issues.apache.org/jira/browse/FLINK-18150
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.10.1
> Environment: It is a bit unclear to me under what circumstances this
> can be reproduced. I created a "minimum" non-working example at
> https://github.com/jcaesar/flink-kafka-ha-failure. Note that this deploys the
> minimum number of Kafka brokers, but it works just as well with replication
> factor 3 and 8 brokers, e.g.
> I run this with
> {code:bash}
> docker-compose kill; and docker-compose rm -vf; and docker-compose up
> --abort-on-container-exit --build
> {code}
> The exception should appear on the webui after 5~6 minutes.
> To make sure that this isn't dependent on my machine, I've also checked
> reproducibility on a m5a.2xlarge EC2 instance.
> You verify that the Kafka cluster is running "normally" e.g. with:
> {code:bash}
> kafkacat -b localhost,localhost:9093 -L
> {code}
> So far, I only know that
> * {{flink.partition-discovery.interval-millis}} must be set.
> * The broker that failed must be part of the {{bootstrap.servers}}
> * There needs to be a certain amount of topics or producers, but I'm unsure
> which is crucial
> * Changing the values of {{metadata.request.timeout.ms}} or
> {{flink.partition-discovery.interval-millis}} does not seem to have any
> effect.
> Reporter: Julius Michaelis
> Assignee: Aljoscha Krettek
> Priority: Major
>
> When a Kafka broker fails that is listed among the bootstrap servers and
> partition discovery is active, the Flink job reading from that Kafka may
> enter a failing loop.
> At first, the job seems to react normally without failure with only a short
> latency spike when switching Kafka leaders.
> Then, it fails with a
> {code:none}
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
> at
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:821)
> at
> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> It recovers, but processes fewer than the expected amount of records.
> Finally, the job fails with
> {code:none}
> 2020-06-05 13:59:37
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
> {code}
> and repeats doing so while not processing any records. (The exception comes
> without any backtrace or otherwise interesting information)
> I have also observed this behavior with partition-discovery turned off, but
> only when the Flink job failed (after a broker failure) and had to run
> checkpoint recovery for some other reason.
> Please see the [Environment] description for information on how to reproduce
> the issue.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)