Moritz Mack created BEAM-13631:
----------------------------------
Summary: SQS read IO requires deterministic coder for SQS message
to work in batch mode mode.
Key: BEAM-13631
URL: https://issues.apache.org/jira/browse/BEAM-13631
Project: Beam
Issue Type: Bug
Components: io-java-aws
Reporter: Moritz Mack
Currently the SQS read IO uses SerializableCoder.of(Message.class), which isn't
deterministic. This may cause issues when used in batch mode (based on
BoundedReadFromUnboundedSource). The mutation detector will throw in such case:
{code:java}
Jan 10, 2022 11:37:05 AM
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
verifyUnmodifiedThrowingCheckedExceptions
WARNING: Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a
#structuralValue method which does not return true when the encoding of the
elements is equal. Element
Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c,
maxNumRecords=1, maxReadTime=null}
Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a
#structuralValue method which does not return true when the encoding of the
elements is equal. Element
Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c,
maxNumRecords=1, maxReadTime=null}
Exception in thread "main" org.apache.beam.sdk.util.IllegalMutationException:
PTransform SqsIO.Read/Read(SqsUnboundedSource)/Read/ParMultiDo(Read) mutated
value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100,
56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50,
51, 99, 50, 54, 51], value={MessageId:
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2FXnTVQ==,MD5OfBody:
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes:
{SentTimestamp=1641794775474},MessageAttributes:
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues:
[],BinaryListValues: [],}}}} after it was output (new value was
ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45,
52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99,
50, 54, 51], value={MessageId:
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle:
DeVRF8vQATm1f+rHIvR3eaejlRHksL1R7WE4zDT7lSwdIs9gJCYKXFXnTVQ==,MD5OfBody:
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes:
{SentTimestamp=1641794775474},MessageAttributes:
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues:
[],BinaryListValues: [],}}}}). Values must not be mutated in any way after
being output.
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
at
org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:231)
at
org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163)
at
org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
at
org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
at
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45,
52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99,
50, 54, 51], value={MessageId:
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2KQ==,MD5OfBody:
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes:
{SentTimestamp=1641794775474},MessageAttributes:
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues:
[],BinaryListValues: [],}}}} mutated illegally, new value was
ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45,
52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99,
50, 54, 51], value={MessageId:
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQ==,MD5OfBody:
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes:
{SentTimestamp=1641794775474},MessageAttributes:
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues:
[],BinaryListValues: [],}}}}. Encoding was rO.
at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
... 10 more
Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45,
52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99,
50, 54, 51], value={MessageId:
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj=,MD5OfBody:
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes:
{SentTimestamp=1641794775474},MessageAttributes:
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues:
[],BinaryListValues: [],}}}} mutated illegally, new value was
ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45,
52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99,
50, 54, 51], value={MessageId:
b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQE==,MD5OfBody:
38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes:
{SentTimestamp=1641794775474},MessageAttributes:
{requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues:
[],BinaryListValues: [],}}}}. Encoding was rO2Mw.
{code}
https://stackoverflow.com/questions/70648489/apache-beam-2-34-0-sqsio-illegal-mutation-exception
--
This message was sent by Atlassian Jira
(v8.20.1#820001)