[
https://issues.apache.org/jira/browse/FLINK-25293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
rerorero updated FLINK-25293:
-----------------------------
Description:
Is it possible to let KafkaSource fail if it keeps failing to commit offset?
I faced an issue where KafkaSource keeps failing and never recover, while it's
logging like these logs:
{code:java}
2021-12-08 22:18:34,155 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer
clientId=dbz-mercari-contact-tool-jp-cg-1,
groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator
b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack:
null) is unavailable or invalid due to cause: null.isDisconnected: true.
Rediscovery will be attempted.
2021-12-08 22:18:34,157 WARN
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to
commit consumer offsets for checkpoint 13 {code}
This is happening not just once, but a couple of times a week. I found other
people reporting the same thing:
[https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2].
This could possibly be a problem with the Kafka client. It can be resolved by
restarting the Flink Job.
However, Flink Kafka connector doesn't provide an automatic way to save this
situation. KafkaSource keeps retrying forever when a retriable error occurs,
even if it is not retriable actually:
[https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]
Since it sends metrics of the number of times a commit fails, it could be
automated by monitoring it and restarting the job, but that would mean we need
to have a new process to be managed.
Does it make sense to have KafkaSource have the option like, let the source
task fail if it keeps failing to commit an offset more than X times?
was:
Is it possible to let KafkaSource fail if it keeps failing to commit offset?
I faced an issue where KafkaSource keeps failing and never recover, while it's
logging like these logs:
{code:java}
2021-12-08 22:18:34,155 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer
clientId=dbz-mercari-contact-tool-jp-cg-1,
groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator
b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack:
null) is unavailable or invalid due to cause: null.isDisconnected: true.
Rediscovery will be attempted.
2021-12-08 22:18:34,157 WARN
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to
commit consumer offsets for checkpoint 13 {code}
This is happening not just once, but a couple of times a week. I found [other
people reporting the same
thing]([https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]).
This could possibly be a problem with the Kafka client. It can be resolved by
restarting the Flink Job.
However, Flink Kafka connector doesn't provide an automatic way to save this
situation. KafkaSource [keeps retrying forever when a retriable error
occurs]([https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]],
even if it is not retriable actually.
Since it sends metrics of the number of times a commit fails, it could be
automated by monitoring it and restarting the job, but that would mean we need
to have a new process to manage.
Does it make sense to have KafkaSource have the option like, let the source
task fail if it keeps failing to commit an offset more than X times?
> Option to let fail if KafkaSource keeps failing to commit offset
> ----------------------------------------------------------------
>
> Key: FLINK-25293
> URL: https://issues.apache.org/jira/browse/FLINK-25293
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.14.0
> Environment: Flink 1.14.0
> Reporter: rerorero
> Priority: Major
>
> Is it possible to let KafkaSource fail if it keeps failing to commit offset?
>
> I faced an issue where KafkaSource keeps failing and never recover, while
> it's logging like these logs:
> {code:java}
> 2021-12-08 22:18:34,155 INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] -
> [Consumer clientId=dbz-mercari-contact-tool-jp-cg-1,
> groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator
> b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack:
> null) is unavailable or invalid due to cause: null.isDisconnected: true.
> Rediscovery will be attempted.
> 2021-12-08 22:18:34,157 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 13 {code}
> This is happening not just once, but a couple of times a week. I found other
> people reporting the same thing:
> [https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2].
> This could possibly be a problem with the Kafka client. It can be resolved by
> restarting the Flink Job.
> However, Flink Kafka connector doesn't provide an automatic way to save this
> situation. KafkaSource keeps retrying forever when a retriable error occurs,
> even if it is not retriable actually:
> [https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]
> Since it sends metrics of the number of times a commit fails, it could be
> automated by monitoring it and restarting the job, but that would mean we
> need to have a new process to be managed.
> Does it make sense to have KafkaSource have the option like, let the source
> task fail if it keeps failing to commit an offset more than X times?
--
This message was sent by Atlassian Jira
(v8.20.1#820001)