[
https://issues.apache.org/jira/browse/BEAM-13627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ning updated BEAM-13627:
------------------------
Description:
The I/O is much more complicated in Beam 2.34.0 than 2.31.0. For Beam 2.34.0,
the
[deleteBatch](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L584)
logic **filters** messages **to delete** based on the **inflight** state.
However, there are assumptions in the
[extend](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L756)
logic where the inflight state is modified to **exclude** messages that are
**assumed expired or to be expired**. These messages are **not** explicitly
requested by the I/O to be deleted from sqs nor dropped by the I/O itself (the
I/O could be
[processing](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L509)
a message that should have been expired to wait for it to be resent).
The ideal behavior should be not advancing a message if its receipt handle is
expired, skip it and wait for it to be resent.
Though I'm not sure, with the current wrong behavior: advancing a message that
is not "acked" (not deleted and will be resent), if pulling the same message
again with a new receipt handle within the same bundle would cause the problem
of mutation detection because receipt handle is part of the Message hashcode
unless there is a hash collision in the mutation detector.
**TL;DR: debugging process**
The mutation was detected in the SqsUnboundedSource, not caused by any other
code in the pipeline.
The code that reports the warning and throws the exception is
[here](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java#L145).
The only field changed is the Receipt handle. It's documented
[here](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html)
that:
> If you receive a message more than once, each time you receive it, you get a
> different receipt handle. You must provide the most recently received receipt
> handle when you request to delete the message (otherwise, the message might
> not be deleted).
There is no
[aws_java_sdk_version](https://github.com/apache/beam/blob/release-2.34.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L447)
change between Beam 2.31.0 and Beam 2.34.0. So AWS SDK shouldn't be the
culprit.
There is a significant change between Beam 2.31.0 and Beam 2.34.0 for
[SqsUnboundedReader](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java).
To receive a message more than once, the message must not have been deleted
since the first time received. The deletion logic is invoked in
[SqsCheckpointMark](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsCheckpointMark.java).
was:
The I/O is much more complicated in Beam 2.34.0 than 2.31.0. For Beam 2.34.0,
the
[deleteBatch](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L584)
logic **filters** messages **to delete** based on the **inflight** state.
However, there are assumptions in the
[extend](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L756)
logic where the inflight state is modified to **exclude** messages that are
**assumed expired or to be expired**. These messages are **not** explicitly
requested by the I/O to be deleted from sqs nor dropped by the I/O itself (the
I/O could be
[processing](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L509)
a message that should have been expired to wait for it to be resent).
Filed https://issues.apache.org/jira/browse/BEAM-13627.
Though I'm not sure if pulling the same message again with a new receipt handle
within the same bundle would cause the problem of mutation detection because
receipt handle is part of the Message hashcode unless there is a hash collision
in the mutation detector.
**TL;DR: debugging process**
The mutation was detected in the SqsUnboundedSource, not caused by any other
code in the pipeline.
The code that reports the warning and throws the exception is
[here](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java#L145).
The only field changed is the Receipt handle. It's documented
[here](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html)
that:
> If you receive a message more than once, each time you receive it, you get a
> different receipt handle. You must provide the most recently received receipt
> handle when you request to delete the message (otherwise, the message might
> not be deleted).
There is no
[aws_java_sdk_version](https://github.com/apache/beam/blob/release-2.34.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L447)
change between Beam 2.31.0 and Beam 2.34.0. So AWS SDK shouldn't be the
culprit.
There is a significant change between Beam 2.31.0 and Beam 2.34.0 for
[SqsUnboundedReader](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java).
To receive a message more than once, the message must not have been deleted
since the first time received. The deletion logic is invoked in
[SqsCheckpointMark](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsCheckpointMark.java).
> aws sqs I/O misses to drop expired messages, transform output mutation
> exception
> --------------------------------------------------------------------------------
>
> Key: BEAM-13627
> URL: https://issues.apache.org/jira/browse/BEAM-13627
> Project: Beam
> Issue Type: Bug
> Components: io-java-aws
> Affects Versions: 2.34.0
> Reporter: Ning
> Priority: P2
>
> The I/O is much more complicated in Beam 2.34.0 than 2.31.0. For Beam 2.34.0,
> the
> [deleteBatch](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L584)
> logic **filters** messages **to delete** based on the **inflight** state.
> However, there are assumptions in the
> [extend](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L756)
> logic where the inflight state is modified to **exclude** messages that are
> **assumed expired or to be expired**. These messages are **not** explicitly
> requested by the I/O to be deleted from sqs nor dropped by the I/O itself
> (the I/O could be
> [processing](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L509)
> a message that should have been expired to wait for it to be resent).
> The ideal behavior should be not advancing a message if its receipt handle is
> expired, skip it and wait for it to be resent.
> Though I'm not sure, with the current wrong behavior: advancing a message
> that is not "acked" (not deleted and will be resent), if pulling the same
> message again with a new receipt handle within the same bundle would cause
> the problem of mutation detection because receipt handle is part of the
> Message hashcode unless there is a hash collision in the mutation detector.
> **TL;DR: debugging process**
> The mutation was detected in the SqsUnboundedSource, not caused by any other
> code in the pipeline.
> The code that reports the warning and throws the exception is
> [here](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java#L145).
> The only field changed is the Receipt handle. It's documented
> [here](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html)
> that:
> > If you receive a message more than once, each time you receive it, you get
> > a different receipt handle. You must provide the most recently received
> > receipt handle when you request to delete the message (otherwise, the
> > message might not be deleted).
> There is no
> [aws_java_sdk_version](https://github.com/apache/beam/blob/release-2.34.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L447)
> change between Beam 2.31.0 and Beam 2.34.0. So AWS SDK shouldn't be the
> culprit.
> There is a significant change between Beam 2.31.0 and Beam 2.34.0 for
> [SqsUnboundedReader](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java).
> To receive a message more than once, the message must not have been deleted
> since the first time received. The deletion logic is invoked in
> [SqsCheckpointMark](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsCheckpointMark.java).
--
This message was sent by Atlassian Jira
(v8.20.1#820001)