Ning created BEAM-13627:
---------------------------

             Summary: aws sqs I/O misses to delete some acked messages, output 
mutation exception
                 Key: BEAM-13627
                 URL: https://issues.apache.org/jira/browse/BEAM-13627
             Project: Beam
          Issue Type: Bug
          Components: io-java-aws
    Affects Versions: 2.34.0
            Reporter: Ning


Original problem and analysis: 
https://stackoverflow.com/questions/70648489/apache-beam-2-34-0-sqsio-illegal-mutation-exception

The I/O is much more complicated in Beam 2.34.0 than 2.31.0. For Beam 2.34.0, 
the 
[deleteBatch](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L584)
 logic **filters** messages **to delete** based on the **inflight** state. 
However, there are assumptions in the 
[extend](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L756)
 logic where the inflight state is modified to **exclude** messages that are 
**assumed expired or to be expired**. These messages are **never** explicitly 
deleted from sqs.

So in a future pull, sqs could resend messages that are read from 
[messagesNotYetRead](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L509).
 And 
[safeToDeleteIds](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L526)
 contains those excluded message ids but never get to delete them.

This will be detected as output mutation exception.

**TL;DR: debugging process**

The mutation was detected in the SqsUnboundedSource, not caused by any other 
code in the pipeline.

The code that reports the warning and throws the exception is 
[here](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java#L145).

The only field changed is the Receipt handle. It's documented 
[here](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html)
 that:

> If you receive a message more than once, each time you receive it, you get a 
> different receipt handle. You must provide the most recently received receipt 
> handle when you request to delete the message (otherwise, the message might 
> not be deleted).

There is no 
[aws_java_sdk_version](https://github.com/apache/beam/blob/release-2.34.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L447)
 change between Beam 2.31.0 and Beam 2.34.0. So AWS SDK shouldn't be the 
culprit.

There is a significant change between Beam 2.31.0 and Beam 2.34.0 for 
[SqsUnboundedReader](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java).

To receive a message more than once, the message must not have been deleted 
since the first time received. The deletion logic is invoked in 
[SqsCheckpointMark](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsCheckpointMark.java).






--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to