[ 
https://issues.apache.org/jira/browse/BEAM-13631?focusedWorklogId=706923&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-706923
 ]

ASF GitHub Bot logged work on BEAM-13631:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Jan/22 15:06
            Start Date: 11/Jan/22 15:06
    Worklog Time Spent: 10m 
      Work Description: mosche commented on pull request #16480:
URL: https://github.com/apache/beam/pull/16480#issuecomment-1010053712


   R: @aromanenko-dev 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 706923)
    Time Spent: 20m  (was: 10m)

> SQS read IO requires deterministic coder for SQS message to work in batch 
> mode mode.
> ------------------------------------------------------------------------------------
>
>                 Key: BEAM-13631
>                 URL: https://issues.apache.org/jira/browse/BEAM-13631
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>            Reporter: Moritz Mack
>            Assignee: Moritz Mack
>            Priority: P2
>              Labels: SQS, aws-sdk-v1
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently the SQS read IO uses SerializableCoder.of(Message.class), which 
> isn't deterministic. This may cause issues when used in batch mode (based on 
> BoundedReadFromUnboundedSource). The mutation detector will throw in such 
> case:
> {code:java}
> Jan 10, 2022 11:37:05 AM 
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector 
> verifyUnmodifiedThrowingCheckedExceptions
> WARNING: Coder of type class org.apache.beam.sdk.coders.SerializableCoder has 
> a #structuralValue method which does not return true when the encoding of the 
> elements is equal. Element 
> Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, 
> maxNumRecords=1, maxReadTime=null}
> Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a 
> #structuralValue method which does not return true when the encoding of the 
> elements is equal. Element 
> Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, 
> maxNumRecords=1, maxReadTime=null}
> Exception in thread "main" org.apache.beam.sdk.util.IllegalMutationException: 
> PTransform SqsIO.Read/Read(SqsUnboundedSource)/Read/ParMultiDo(Read) mutated 
> value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 
> 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 
> 50, 51, 99, 50, 54, 51], value={MessageId: 
> b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2FXnTVQ==,MD5OfBody: 
> 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
> {SentTimestamp=1641794775474},MessageAttributes: 
> {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
> [],BinaryListValues: [],}}}} after it was output (new value was 
> ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 
> 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 
> 51, 99, 50, 54, 51], value={MessageId: 
> b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: 
> DeVRF8vQATm1f+rHIvR3eaejlRHksL1R7WE4zDT7lSwdIs9gJCYKXFXnTVQ==,MD5OfBody: 
> 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
> {SentTimestamp=1641794775474},MessageAttributes: 
> {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
> [],BinaryListValues: [],}}}}). Values must not be mutated in any way after 
> being output.
>     at 
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>     at 
> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:231)
>     at 
> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163)
>     at 
> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>     at 
> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>     at 
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value 
> ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 
> 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 
> 51, 99, 50, 54, 51], value={MessageId: 
> b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2KQ==,MD5OfBody: 
> 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
> {SentTimestamp=1641794775474},MessageAttributes: 
> {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
> [],BinaryListValues: [],}}}} mutated illegally, new value was 
> ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 
> 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 
> 51, 99, 50, 54, 51], value={MessageId: 
> b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQ==,MD5OfBody: 
> 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
> {SentTimestamp=1641794775474},MessageAttributes: 
> {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
> [],BinaryListValues: [],}}}}. Encoding was rO.
>     at 
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>     at 
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>     at 
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>     at 
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>     ... 10 more
> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value 
> ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 
> 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 
> 51, 99, 50, 54, 51], value={MessageId: 
> b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj=,MD5OfBody: 
> 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
> {SentTimestamp=1641794775474},MessageAttributes: 
> {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
> [],BinaryListValues: [],}}}} mutated illegally, new value was 
> ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 
> 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 
> 51, 99, 50, 54, 51], value={MessageId: 
> b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQE==,MD5OfBody: 
> 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: 
> {SentTimestamp=1641794775474},MessageAttributes: 
> {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: 
> [],BinaryListValues: [],}}}}. Encoding was rO2Mw.
>  {code}
> https://stackoverflow.com/questions/70648489/apache-beam-2-34-0-sqsio-illegal-mutation-exception



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to