[
https://issues.apache.org/jira/browse/FLINK-25293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
rerorero updated FLINK-25293:
-----------------------------
Description:
Is it possible to let KafkaSource fail if it keeps failing to commit offset?
I faced an issue where KafkaSource keeps failing and never recover, while it's
logging like these logs:
{code:java}
2021-12-08 22:18:34,155 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer
clientId=dbz-mercari-contact-tool-jp-cg-1,
groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator
b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack:
null) is unavailable or invalid due to cause: null.isDisconnected: true.
Rediscovery will be attempted.
2021-12-08 22:18:34,157 WARN
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to
commit consumer offsets for checkpoint 13 {code}
This is happening not just once, but a couple of times a week. I found [other
people reporting the same
thing]([https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]).
This could possibly be a problem with the Kafka client. It can be resolved by
restarting the Flink Job.
However, Flink Kafka connector doesn't provide an automatic way to save this
situation. KafkaSource [keeps retrying forever when a retriable error
occurs]([https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]],
even if it is not retriable actually.
Since it sends metrics of the number of times a commit fails, it could be
automated by monitoring it and restarting the job, but that would mean we need
to have a new process to manage.
Does it make sense to have KafkaSource have the option like, let the source
task fail if it keeps failing to commit an offset more than X times?
was:
Is it possible to let KafkaSource fail if it keeps failing to commit offset?
I faced an issue where KafkaSource keeps failing and never recover, while it's
logging like these logs:
{code:java}
2021-12-08 22:18:34,155 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer
clientId=dbz-mercari-contact-tool-jp-cg-1,
groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator
b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack:
null) is unavailable or invalid due to cause: null.isDisconnected: true.
Rediscovery will be attempted.
2021-12-08 22:18:34,157 WARN
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to
commit consumer offsets for checkpoint 13 {code}
This is happening not just once, but a couple of times a week. I found [other
people reporting the same
thing](https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2). This
could possibly be a problem with the Kafka client. It can be resolved by
restarting the Flink Job.
However, Flink Kafka connector doesn't provide an automatic way to save this
situation. KafkaSource [keeps retrying forever when a retriable error
occurs](https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148],
even if it is not retriable actually.
Since it sends metrics of the number of times a commit fails, it could be
automated by monitoring it and restarting the job, but that would mean we need
to have a new process to manage.
Does it make sense to have KafkaSource have the option like, let the source
task fail if it keeps failing to commit an offset more than X times?
> Option to let fail if KafkaSource keeps failing to commit offset
> ----------------------------------------------------------------
>
> Key: FLINK-25293
> URL: https://issues.apache.org/jira/browse/FLINK-25293
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.14.0
> Environment: Flink 1.14.0
> Reporter: rerorero
> Priority: Major
>
> Is it possible to let KafkaSource fail if it keeps failing to commit offset?
>
> I faced an issue where KafkaSource keeps failing and never recover, while
> it's logging like these logs:
> {code:java}
> 2021-12-08 22:18:34,155 INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] -
> [Consumer clientId=dbz-mercari-contact-tool-jp-cg-1,
> groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator
> b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack:
> null) is unavailable or invalid due to cause: null.isDisconnected: true.
> Rediscovery will be attempted.
> 2021-12-08 22:18:34,157 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 13 {code}
> This is happening not just once, but a couple of times a week. I found [other
> people reporting the same
> thing]([https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]).
> This could possibly be a problem with the Kafka client. It can be resolved by
> restarting the Flink Job.
> However, Flink Kafka connector doesn't provide an automatic way to save this
> situation. KafkaSource [keeps retrying forever when a retriable error
> occurs]([https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]],
> even if it is not retriable actually.
> Since it sends metrics of the number of times a commit fails, it could be
> automated by monitoring it and restarting the job, but that would mean we
> need to have a new process to manage.
> Does it make sense to have KafkaSource have the option like, let the source
> task fail if it keeps failing to commit an offset more than X times?
--
This message was sent by Atlassian Jira
(v8.20.1#820001)