[ https://issues.apache.org/jira/browse/BEAM-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on BEAM-13631 stopped by null. ----------------------------------- > SQS read IO requires deterministic coder for SQS message to work in batch > mode mode. > ------------------------------------------------------------------------------------ > > Key: BEAM-13631 > URL: https://issues.apache.org/jira/browse/BEAM-13631 > Project: Beam > Issue Type: Bug > Components: io-java-aws > Reporter: Moritz Mack > Assignee: Moritz Mack > Priority: P2 > Labels: SQS, aws-sdk-v1 > Fix For: 2.37.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently the SQS read IO uses SerializableCoder.of(Message.class), which > isn't deterministic. This may cause issues when used in batch mode (based on > BoundedReadFromUnboundedSource). The mutation detector will throw in such > case: > {code:java} > Jan 10, 2022 11:37:05 AM > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector > verifyUnmodifiedThrowingCheckedExceptions > WARNING: Coder of type class org.apache.beam.sdk.coders.SerializableCoder has > a #structuralValue method which does not return true when the encoding of the > elements is equal. Element > Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, > maxNumRecords=1, maxReadTime=null} > Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a > #structuralValue method which does not return true when the encoding of the > elements is equal. Element > Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, > maxNumRecords=1, maxReadTime=null} > Exception in thread "main" org.apache.beam.sdk.util.IllegalMutationException: > PTransform SqsIO.Read/Read(SqsUnboundedSource)/Read/ParMultiDo(Read) mutated > value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, > 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, > 50, 51, 99, 50, 54, 51], value={MessageId: > b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2FXnTVQ==,MD5OfBody: > 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: > {SentTimestamp=1641794775474},MessageAttributes: > {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: > [],BinaryListValues: [],}}}} after it was output (new value was > ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, > 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, > 51, 99, 50, 54, 51], value={MessageId: > b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: > DeVRF8vQATm1f+rHIvR3eaejlRHksL1R7WE4zDT7lSwdIs9gJCYKXFXnTVQ==,MD5OfBody: > 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: > {SentTimestamp=1641794775474},MessageAttributes: > {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: > [],BinaryListValues: [],}}}}). Values must not be mutated in any way after > being output. > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137) > at > org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:231) > at > org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163) > at > org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292) > at > org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194) > at > org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value > ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, > 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, > 51, 99, 50, 54, 51], value={MessageId: > b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2KQ==,MD5OfBody: > 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: > {SentTimestamp=1641794775474},MessageAttributes: > {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: > [],BinaryListValues: [],}}}} mutated illegally, new value was > ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, > 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, > 51, 99, 50, 54, 51], value={MessageId: > b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQ==,MD5OfBody: > 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: > {SentTimestamp=1641794775474},MessageAttributes: > {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: > [],BinaryListValues: [],}}}}. Encoding was rO. > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128) > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127) > ... 10 more > Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value > ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, > 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, > 51, 99, 50, 54, 51], value={MessageId: > b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj=,MD5OfBody: > 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: > {SentTimestamp=1641794775474},MessageAttributes: > {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: > [],BinaryListValues: [],}}}} mutated illegally, new value was > ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, > 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, > 51, 99, 50, 54, 51], value={MessageId: > b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQE==,MD5OfBody: > 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: > {SentTimestamp=1641794775474},MessageAttributes: > {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: > [],BinaryListValues: [],}}}}. Encoding was rO2Mw. > {code} > https://stackoverflow.com/questions/70648489/apache-beam-2-34-0-sqsio-illegal-mutation-exception -- This message was sent by Atlassian Jira (v8.20.10#820010)