[
https://issues.apache.org/jira/browse/BEAM-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937108#comment-16937108
]
Jason Bowman commented on BEAM-8029:
------------------------------------
Serializing the generic record in the serializing the generic record to binary
avro, using the ByteArray coder, and deserializing it again in the next
pipeline stage prevents the mutation.
In the initial report it shows value it's showing fields being overwritten. We
see the same, and we see byte array fields getting partially overwritten, for
example a Json field will turn into: "\{"a": 1}b": 2} with the previous value
being left in the array. This seems to point to a reader corruption/reuse issue.
Digging a bit deeper I find:
[https://github.com/apache/beam/blob/ac45af909923e6d5e43f83087943ad71513b37e8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java#L251]
It seems the GenericRecord is reused explicitly and is a member variable of the
stream source, not the row result. The maintainers would seem to assume that
you would never use the GenericRecord result.
> Using BigQueryIO.read with DIRECT_READ causes Illegal Mutation
> ---------------------------------------------------------------
>
> Key: BEAM-8029
> URL: https://issues.apache.org/jira/browse/BEAM-8029
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.14.0
> Reporter: Chris Larsen
> Priority: Major
>
>
> Code to read from BigQuery that is causing the issue:
> {code:java}
> pipeline
> .apply(BigQueryIO
> .read(SchemaAndRecord::getRecord)
> .from(options.getTableRef())
> .withMethod(Method.DIRECT_READ)
> .withCoder(AvroCoder.of(schema)))
> {code}
> If we remove .withMethod(Method.DIRECT_READ) then there is no issue.
>
> The error is:
> {code:java}
> org.apache.beam.sdk.util.IllegalMutationException: PTransform
> BigQueryIO.TypedRead/Read(BigQueryStorageTableSource) mutated value
> {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f":
> 52.0, "sample_time": 1564412307969368, "humidity": 74.3} after it was output
> (new value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0,
> "temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}).
> Values must not be mutated in any way after being output.
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit
> (ImmutabilityCheckingBundleFactory.java:134)
> at org.apache.beam.runners.direct.EvaluationContext.commitBundles
> (EvaluationContext.java:210)
> at org.apache.beam.runners.direct.EvaluationContext.handleResult
> (EvaluationContext.java:151)
> at
> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult
> (QuiescenceDriver.java:262)
> at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle
> (DirectTransformExecutor.java:189)
> at org.apache.beam.runners.direct.DirectTransformExecutor.run
> (DirectTransformExecutor.java:126)
> at java.util.concurrent.Executors$RunnableAdapter.call
> (Executors.java:511)
> at java.util.concurrent.FutureTask.run (FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker
> (ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run
> (ThreadPoolExecutor.java:624)
> at java.lang.Thread.run (Thread.java:748)
> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
> {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f":
> 52.0, "sample_time": 1564412307969368, "humidity": 74.3} mutated illegally,
> new value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0,
> "temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}.
> Encoding was
> AiZycGktcnBpMC10aGVybW9zdGF0AgAAAAAAADRAAgAAAAAAAEpAArDVsP7jtMcFAjMzMzMzk1JA,
> now
> AiZycGktcnBpMC10aGVybW9zdGF0AgAAAAAAADRAAgAAAAAAAEpAAu6FuLDktMcFAs3MzMzMrFJA.
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation
> (MutationDetectors.java:153)
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions
> (MutationDetectors.java:148)
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified
> (MutationDetectors.java:123)
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit
> (ImmutabilityCheckingBundleFactory.java:124)
> at org.apache.beam.runners.direct.EvaluationContext.commitBundles
> (EvaluationContext.java:210)
> at org.apache.beam.runners.direct.EvaluationContext.handleResult
> (EvaluationContext.java:151)
> at
> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult
> (QuiescenceDriver.java:262)
> at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle
> (DirectTransformExecutor.java:189)
> at org.apache.beam.runners.direct.DirectTransformExecutor.run
> (DirectTransformExecutor.java:126)
> at java.util.concurrent.Executors$RunnableAdapter.call
> (Executors.java:511)
> at java.util.concurrent.FutureTask.run (FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker
> (ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run
> (ThreadPoolExecutor.java:624)
> at java.lang.Thread.run (Thread.java:748){code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)