Samuel Siebenmann created FLINK-31492:
-----------------------------------------

             Summary: AWS Firehose Connector misclassifies IAM permission 
exceptions as retryable
                 Key: FLINK-31492
                 URL: https://issues.apache.org/jira/browse/FLINK-31492
             Project: Flink
          Issue Type: Bug
          Components: Connectors / AWS, Connectors / Firehose
    Affects Versions: aws-connector-4.1.0
            Reporter: Samuel Siebenmann


The AWS Firehose connector uses an exception classification mechanism to decide 
if errors writing requests to AWS Firehose are fatal (i.e. non-retryable) or 
not (i.e. retryable).
{code:java}
private boolean isRetryable(Throwable err) {
    if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, 
getFatalExceptionCons())) {
        return false;
    }
    if (failOnError) {
        getFatalExceptionCons()
                .accept(new 
KinesisFirehoseException.KinesisFirehoseFailFastException(err));
        return false;
    }

    return true;
} {code}
([github|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L252])

This exception classification mechanism compares an exception's actual type 
with known, fatal exception types (by using Flink's 
[FatalExceptionClassifier.withExceptionClassifier|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java#L60]).
  An exception is considered fatal if it is assignable to a given known fatal 
exception 
([code|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L479]).

The AWS Firehose SDK throws fatal IAM permission exceptions as 
[FirehoseException|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/FirehoseException.html]s,
 e.g.
{code:java}
software.amazon.awssdk.services.firehose.model.FirehoseException: User: 
arn:aws:sts::000000000000:assumed-role/example-role/kiam-kiam is not authorized 
to perform: firehose:PutRecordBatch on resource: 
arn:aws:firehose:us-east-1:000000000000:deliverystream/example-stream because 
no identity-based policy allows the firehose:PutRecordBatch action{code}
At the same time, certain subtypes of FirehoseException are retryable and 
non-fatal 
(e.g.[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/LimitExceededException.html]).

The AWS Firehose connector currently wrongly classifies the fatal IAM 
permission exception as non-fatal. However, the current exception 
classification mechanism does not easily handle a case where a super-type 
should be considered fatal, but its child type shouldn't.

To address this issue, AWS services and the AWS SDK use error codes (see e.g. 
[Firehose's error 
codes|https://docs.aws.amazon.com/firehose/latest/APIReference/CommonErrors.html]
 or [S3's error 
codes|https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList],
 see API docs 
[here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsErrorDetails.html#errorCode()]
 and 
[here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsServiceException.html#awsErrorDetails()])
 to uniquely identify error conditions and to be used to handle errors by type.

The AWS Firehose connector (and other AWS connectors) currently log to debug 
when retrying fully failed records 
([code|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L213]).
 This makes it difficult for users to root cause the above issue without 
enabling debug logs.

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to