Moritz Mack created BEAM-13631:
----------------------------------

             Summary: SQS read IO requires deterministic coder for SQS message 
to work in batch mode mode.
                 Key: BEAM-13631
                 URL: https://issues.apache.org/jira/browse/BEAM-13631
             Project: Beam
          Issue Type: Bug
          Components: io-java-aws
            Reporter: Moritz Mack


Currently the SQS read IO uses SerializableCoder.of(Message.class), which isn't 
deterministic. This may cause issues when used in batch mode (based on 
BoundedReadFromUnboundedSource). The mutation detector will throw in such case:
{code:java}
Jan 10, 2022 11:37:05 AM 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector 
verifyUnmodifiedThrowingCheckedExceptions
WARNING: Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a 
#structuralValue method which does not return true when the encoding of the 
elements is equal. Element 
Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, 
maxNumRecords=1, maxReadTime=null}
Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a 
#structuralValue method which does not return true when the encoding of the 
elements is equal. Element 
Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, 
maxNumRecords=1, maxReadTime=null}

Exception in thread "main" org.apache.beam.sdk.util.IllegalMutationException: 
PTransform SqsIO.Read/Read(SqsUnboundedSource)/Read/ParMultiDo(Read) mutated 
value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 
56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 
51, 99, 50, 54, 51], value={MessageId: 
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2FXnTVQ==,MD5OfBody: 
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
{SentTimestamp=1641794775474},MessageAttributes: 
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
[],BinaryListValues: [],}}}} after it was output (new value was 
ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 
52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 
50, 54, 51], value={MessageId: 
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: 
DeVRF8vQATm1f+rHIvR3eaejlRHksL1R7WE4zDT7lSwdIs9gJCYKXFXnTVQ==,MD5OfBody: 
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
{SentTimestamp=1641794775474},MessageAttributes: 
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
[],BinaryListValues: [],}}}}). Values must not be mutated in any way after 
being output.
    at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
    at 
org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:231)
    at 
org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163)
    at 
org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
    at 
org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
    at 
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value 
ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 
52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 
50, 54, 51], value={MessageId: 
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2KQ==,MD5OfBody: 
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
{SentTimestamp=1641794775474},MessageAttributes: 
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
[],BinaryListValues: [],}}}} mutated illegally, new value was 
ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 
52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 
50, 54, 51], value={MessageId: 
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQ==,MD5OfBody: 
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
{SentTimestamp=1641794775474},MessageAttributes: 
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
[],BinaryListValues: [],}}}}. Encoding was rO.
    at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
    at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
    at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
    at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
    ... 10 more
Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value 
ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 
52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 
50, 54, 51], value={MessageId: 
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj=,MD5OfBody: 
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
{SentTimestamp=1641794775474},MessageAttributes: 
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
[],BinaryListValues: [],}}}} mutated illegally, new value was 
ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 
52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 
50, 54, 51], value={MessageId: 
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQE==,MD5OfBody: 
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
{SentTimestamp=1641794775474},MessageAttributes: 
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
[],BinaryListValues: [],}}}}. Encoding was rO2Mw.
 {code}
https://stackoverflow.com/questions/70648489/apache-beam-2-34-0-sqsio-illegal-mutation-exception



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to