[ 
https://issues.apache.org/jira/browse/FLINK-26657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507781#comment-17507781
 ] 

John Karp edited comment on FLINK-26657 at 3/16/22, 6:03 PM:
-------------------------------------------------------------

I was considering contributing a fix, though before starting I wanted to be 
sure it wouldn't be rejected for violating some design principle of Flink, or 
cause problems for other Flink components which I am not familiar with.

Adding FlinkKinesisConsumer.failOnError(boolean) to the API seems like a 
workable interface for turning on this feature. (With the default value being 
'true' = fail on error.)


was (Author: jkarp):
I was considering contributing a fix, though before starting I wanted to be 
sure it wouldn't be rejected for violating some design principle of Flink, or 
cause problems for other Flink components which I am not familiar with.

Adding FlinkKinesisConsumer.failOnError(boolean) to the API seems like a 
workable interface for turning on this feature. (With the default value being 
'true'.)

> Resilient Kinesis consumption
> -----------------------------
>
>                 Key: FLINK-26657
>                 URL: https://issues.apache.org/jira/browse/FLINK-26657
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kinesis
>            Reporter: John Karp
>            Priority: Major
>
> Currently, any sort of error reading from a Kinesis stream will quickly 
> result in a job-killing error. If the error is not 'recoverable', failure 
> will be instant, or if it is 'recoverable', there will be a fixed number of 
> retries before the job fails -- and for some operations such as GetRecords, 
> the retries can be exhausted in just a few seconds. Furthermore, 
> KinesisProxy.isRecoverableSdkClientException() and 
> KinesisProxy.isRecoverableException() only recognize very narrow categories 
> of errors as even being recoverable.
> So for example if a Flink job is aggregating Kinesis streams from multiple 
> regions, the Flink job will not be able to make any forward progress on 
> processing data from any region if there is a single-region outage, since the 
> job will likely fail before any checkpoint can be completed. For some use 
> cases, it is better to proceed with processing most of the data, than to wait 
> indefinitely for the problematic region to recover.
> One mitigation is to increase all of the ConsumerConfig timeouts to be very 
> high. However, this will only affect error handling for 'recoverable' 
> exceptions, and depending on the nature of the regional failure, the 
> resulting errors may not be classified as 'recoverable'.
> Proposed mitigation: add a 'soft failure' mode to the Kinesis consumer, where 
> most errors arising from Kinesis operations are considered recoverable, and 
> there are unlimited retries. (Except for perhaps EFO de-registration, which 
> I'm assuming needs to complete in a timely fashion. Also, it looks like 
> ExpiredIteratorException needs to bubble up to 
> PollingRecordPublisher.getRecords() without retries.)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to