[ 
https://issues.apache.org/jira/browse/FLINK-25293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

rerorero updated FLINK-25293:
-----------------------------
    Description: 
Is it possible to let KafkaSource fail if it keeps failing to commit offset?

 

I faced an issue where KafkaSource keeps failing and never recover, while it's 
logging like these logs:
{code:java}
2021-12-08 22:18:34,155 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer 
clientId=dbz-mercari-contact-tool-jp-cg-1, 
groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator 
b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack: 
null) is unavailable or invalid due to cause: null.isDisconnected: true. 
Rediscovery will be attempted.
2021-12-08 22:18:34,157 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 13 {code}
This is happening not just once, but a couple of times a week. I found other 
people reporting the same thing: 
[https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]. It can be 
recovered by restarting the Flink Job.

This could possibly be a problem with the Kafka client. 

However, Flink Kafka connector doesn't provide an automatic way to save this 
situation. KafkaSource keeps retrying forever when a retriable error occurs, 
even if it is not retriable actually: 
[https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]

Since it sends metrics of the number of times a commit fails, it could be 
automated by monitoring it and restarting the job, but that would mean we need 
to have a new process to be managed.

Does it make sense to have KafkaSource have the option like, let the source 
task fail if it keeps failing to commit an offset more than X times?

  was:
Is it possible to let KafkaSource fail if it keeps failing to commit offset?

 

I faced an issue where KafkaSource keeps failing and never recover, while it's 
logging like these logs:
{code:java}
2021-12-08 22:18:34,155 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer 
clientId=dbz-mercari-contact-tool-jp-cg-1, 
groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator 
b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack: 
null) is unavailable or invalid due to cause: null.isDisconnected: true. 
Rediscovery will be attempted.
2021-12-08 22:18:34,157 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 13 {code}
This is happening not just once, but a couple of times a week. I found other 
people reporting the same thing: 
[https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2].

This could possibly be a problem with the Kafka client. It can be resolved by 
restarting the Flink Job.

However, Flink Kafka connector doesn't provide an automatic way to save this 
situation. KafkaSource keeps retrying forever when a retriable error occurs, 
even if it is not retriable actually: 
[https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]

Since it sends metrics of the number of times a commit fails, it could be 
automated by monitoring it and restarting the job, but that would mean we need 
to have a new process to be managed.

Does it make sense to have KafkaSource have the option like, let the source 
task fail if it keeps failing to commit an offset more than X times?


> Option to let fail if KafkaSource keeps failing to commit offset
> ----------------------------------------------------------------
>
>                 Key: FLINK-25293
>                 URL: https://issues.apache.org/jira/browse/FLINK-25293
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>         Environment: Flink 1.14.0
>            Reporter: rerorero
>            Priority: Major
>
> Is it possible to let KafkaSource fail if it keeps failing to commit offset?
>  
> I faced an issue where KafkaSource keeps failing and never recover, while 
> it's logging like these logs:
> {code:java}
> 2021-12-08 22:18:34,155 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - 
> [Consumer clientId=dbz-mercari-contact-tool-jp-cg-1, 
> groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator 
> b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack: 
> null) is unavailable or invalid due to cause: null.isDisconnected: true. 
> Rediscovery will be attempted.
> 2021-12-08 22:18:34,157 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 13 {code}
> This is happening not just once, but a couple of times a week. I found other 
> people reporting the same thing: 
> [https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]. It can be 
> recovered by restarting the Flink Job.
> This could possibly be a problem with the Kafka client. 
> However, Flink Kafka connector doesn't provide an automatic way to save this 
> situation. KafkaSource keeps retrying forever when a retriable error occurs, 
> even if it is not retriable actually: 
> [https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]
> Since it sends metrics of the number of times a commit fails, it could be 
> automated by monitoring it and restarting the job, but that would mean we 
> need to have a new process to be managed.
> Does it make sense to have KafkaSource have the option like, let the source 
> task fail if it keeps failing to commit an offset more than X times?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to