[
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jun Qin updated FLINK-17327:
----------------------------
Description:
Steps to reproduce:
# Start a Flink 1.10 standalone cluster
# Run a Flink job which reads from one Kafka topic and writes to another
topic, with exactly-once checkpointing enabled
# Stop all Kafka Brokers after a few successful checkpoints
When Kafka brokers are down:
# {{org.apache.kafka.clients.NetworkClient}} reported connection to broker
could not be established
# Then, Flink could not complete snapshot due to {{Timeout expired while
initializing transactional state in 60000ms}}
# After several snapshot failures, Flink reported {{Too many ongoing
snapshots. Increase kafka producers pool size or decrease number of concurrent
checkpoints.}}
# Eventually, Flink tried to cancel the task which did not succeed within 3
min. According to logs, consumer was cancelled, but producer is still running
# Then {{Fatal error occurred while executing the TaskManager. Shutting it
down...}}
I will attach the logs to show the details. Worth to note that if there would
be no consumer but producer only in the task, the behavior is different:
# {{org.apache.kafka.clients.NetworkClient}} reported connection to broker
could not be established
# after {{delivery.timeout.ms}} (2min by default), producer reports:
{{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for
output-topic-0:120001 ms has passed since batch creation}}
# Flink tried to cancel the upstream tasks and created a new producer
# The new producer obviously reported connectivity issue to brokers
# This continues till Kafka brokers are back.
# Flink reported {{Too many ongoing snapshots. Increase kafka producers pool
size or decrease number of concurrent checkpoints.}}
# Flink cancelled the tasks and restarted them
# The job continues, and new checkpoint succeeded.
# TM runs all the time in this scenario
I set Kafka transaction time out to 1 hour just to avoid transaction timeout
during the test.
To get a producer only task, I called {{env.disableOperatorChaining();}} in the
second scenario.
was:
Steps to reproduce:
# Start a Flink 1.10 standalone cluster
# Run a Flink job which reads from one Kafka topic and writes to another
topic, with exactly-once checkpointing enabled
# Stop all Kafka Brokers after a few successful checkpoints
When Kafka brokers are down:
# {{org.apache.kafka.clients.NetworkClient}} reported connection to broker
could not be established
# Then, Flink could not complete snapshot due to {{Timeout expired while
initializing transactional state in 60000ms}}
# After several snapshot failures, Flink reported {{Too many ongoing
snapshots. Increase kafka producers pool size or decrease number of concurrent
checkpoints.}}
# Eventually, Flink tried to cancel the task which did not succeed within 3 min
# Then {{Fatal error occurred while executing the TaskManager. Shutting it
down...}}
I will attach the logs to show the details. Worth to note that if there would
be no consumer but producer only in the task, the behavior is different:
# {{org.apache.kafka.clients.NetworkClient}} reported connection to broker
could not be established
# after {{delivery.timeout.ms}} (2min by default), producer reports:
{{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for
output-topic-0:120001 ms has passed since batch creation}}
# Flink tried to cancel the upstream tasks and created a new producer
# The new producer obviously reported connectivity issue to brokers
# This continues till Kafka brokers are back.
# Flink reported {{Too many ongoing snapshots. Increase kafka producers pool
size or decrease number of concurrent checkpoints.}}
# Flink cancelled the tasks and restarted them
# The job continues, and new checkpoint succeeded.
# TM runs all the time in this scenario
I set Kafka transaction time out to 1 hour just to avoid transaction timeout.
To get a producer only task, I called {{env.disableOperatorChaining();}} in the
second scenario.
> Kafka unavailability could cause Flink TM shutdown
> --------------------------------------------------
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.10.0
> Reporter: Jun Qin
> Priority: Major
>
> Steps to reproduce:
> # Start a Flink 1.10 standalone cluster
> # Run a Flink job which reads from one Kafka topic and writes to another
> topic, with exactly-once checkpointing enabled
> # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
> # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker
> could not be established
> # Then, Flink could not complete snapshot due to {{Timeout expired while
> initializing transactional state in 60000ms}}
> # After several snapshot failures, Flink reported {{Too many ongoing
> snapshots. Increase kafka producers pool size or decrease number of
> concurrent checkpoints.}}
> # Eventually, Flink tried to cancel the task which did not succeed within 3
> min. According to logs, consumer was cancelled, but producer is still running
> # Then {{Fatal error occurred while executing the TaskManager. Shutting it
> down...}}
> I will attach the logs to show the details. Worth to note that if there
> would be no consumer but producer only in the task, the behavior is different:
> # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker
> could not be established
> # after {{delivery.timeout.ms}} (2min by default), producer reports:
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for
> output-topic-0:120001 ms has passed since batch creation}}
> # Flink tried to cancel the upstream tasks and created a new producer
> # The new producer obviously reported connectivity issue to brokers
> # This continues till Kafka brokers are back.
> # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool
> size or decrease number of concurrent checkpoints.}}
> # Flink cancelled the tasks and restarted them
> # The job continues, and new checkpoint succeeded.
> # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in
> the second scenario.
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)