[
https://issues.apache.org/jira/browse/FLINK-25293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
rerorero updated FLINK-25293:
-----------------------------
Description:
Is it possible to let KafkaSource fail if it keeps failing to commit offset?
I faced an issue where KafkaSource keeps failing and never recover, while it's
logging like these logs:
{code:java}
2021-12-08 22:18:34,155 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer
clientId=dbz-mercari-contact-tool-jp-cg-1,
groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator
b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack:
null) is unavailable or invalid due to cause: null.isDisconnected: true.
Rediscovery will be attempted.
2021-12-08 22:18:34,157 WARN
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to
commit consumer offsets for checkpoint 13 {code}
This is happening not just once, but a couple of times a week (it happens when
the Kafka broker performs rolling restart). It can be recovered by restarting
the Flink Job.
I found other people reporting the similar thing:
[https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]. This could
possibly be a problem with the Kafka client.
However, Flink Kafka connector doesn't provide an automatic way to save this
situation. KafkaSource keeps retrying forever when a retriable error occurs,
even if it is not retriable actually:
[https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]
Since it sends metrics of the number of times a commit fails, it could be
automated by monitoring it and restarting the job, but that would mean we need
to have a new process to be managed.
Does it make sense to have KafkaSource have the option like, let the source
task fail if it keeps failing to commit an offset more than X times?
was:
Is it possible to let KafkaSource fail if it keeps failing to commit offset?
I faced an issue where KafkaSource keeps failing and never recover, while it's
logging like these logs:
{code:java}
2021-12-08 22:18:34,155 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer
clientId=dbz-mercari-contact-tool-jp-cg-1,
groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator
b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack:
null) is unavailable or invalid due to cause: null.isDisconnected: true.
Rediscovery will be attempted.
2021-12-08 22:18:34,157 WARN
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to
commit consumer offsets for checkpoint 13 {code}
This is happening not just once, but a couple of times a week (it happens when
the Kafka broker performs rolling restart). I found other people reporting the
same thing: [https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2].
It can be recovered by restarting the Flink Job.
This could possibly be a problem with the Kafka client.
However, Flink Kafka connector doesn't provide an automatic way to save this
situation. KafkaSource keeps retrying forever when a retriable error occurs,
even if it is not retriable actually:
[https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]
Since it sends metrics of the number of times a commit fails, it could be
automated by monitoring it and restarting the job, but that would mean we need
to have a new process to be managed.
Does it make sense to have KafkaSource have the option like, let the source
task fail if it keeps failing to commit an offset more than X times?
> Option to let fail if KafkaSource keeps failing to commit offset
> ----------------------------------------------------------------
>
> Key: FLINK-25293
> URL: https://issues.apache.org/jira/browse/FLINK-25293
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.14.0
> Environment: Flink 1.14.0
> Reporter: rerorero
> Priority: Major
>
> Is it possible to let KafkaSource fail if it keeps failing to commit offset?
>
> I faced an issue where KafkaSource keeps failing and never recover, while
> it's logging like these logs:
> {code:java}
> 2021-12-08 22:18:34,155 INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] -
> [Consumer clientId=dbz-mercari-contact-tool-jp-cg-1,
> groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator
> b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack:
> null) is unavailable or invalid due to cause: null.isDisconnected: true.
> Rediscovery will be attempted.
> 2021-12-08 22:18:34,157 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 13 {code}
> This is happening not just once, but a couple of times a week (it happens
> when the Kafka broker performs rolling restart). It can be recovered by
> restarting the Flink Job.
> I found other people reporting the similar thing:
> [https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]. This
> could possibly be a problem with the Kafka client.
> However, Flink Kafka connector doesn't provide an automatic way to save this
> situation. KafkaSource keeps retrying forever when a retriable error occurs,
> even if it is not retriable actually:
> [https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]
> Since it sends metrics of the number of times a commit fails, it could be
> automated by monitoring it and restarting the job, but that would mean we
> need to have a new process to be managed.
> Does it make sense to have KafkaSource have the option like, let the source
> task fail if it keeps failing to commit an offset more than X times?
--
This message was sent by Atlassian Jira
(v8.20.1#820001)