This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e0a36a28c11 Remove reuse of GenericRecord instance when reading Avro 
from BigQuery (#25320)
e0a36a28c11 is described below

commit e0a36a28c11e9d5d9c410e35386c7e6223cf2f42
Author: Bruno Volpato <[email protected]>
AuthorDate: Thu Feb 9 09:46:51 2023 -0500

    Remove reuse of GenericRecord instance when reading Avro from BigQuery 
(#25320)
---
 .../io/gcp/bigquery/BigQueryStorageAvroReader.java | 10 ++---
 .../io/gcp/bigquery/BigQueryIOStorageReadTest.java | 50 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageAvroReader.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageAvroReader.java
index 486a5dac638..50ce6a89f7a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageAvroReader.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageAvroReader.java
@@ -35,7 +35,6 @@ class BigQueryStorageAvroReader implements 
BigQueryStorageReader {
   private final Schema avroSchema;
   private final DatumReader<GenericRecord> datumReader;
   private @Nullable BinaryDecoder decoder;
-  private @Nullable GenericRecord record;
   private long rowCount;
 
   BigQueryStorageAvroReader(ReadSession readSession) {
@@ -43,7 +42,6 @@ class BigQueryStorageAvroReader implements 
BigQueryStorageReader {
     this.datumReader = new GenericDatumReader<>(avroSchema);
     this.rowCount = 0;
     decoder = null;
-    record = null;
   }
 
   @Override
@@ -68,11 +66,11 @@ class BigQueryStorageAvroReader implements 
BigQueryStorageReader {
   public GenericRecord readSingleRecord() throws IOException {
     Preconditions.checkStateNotNull(decoder);
     @SuppressWarnings({
-      "nullness" // reused record can be null but avro not annotated
+      "nullness" // reused record is null but avro not annotated
     })
-    GenericRecord newRecord = datumReader.read(record, decoder);
-    record = newRecord;
-    return record;
+    // record should not be reused, mutating outputted values is unsafe
+    GenericRecord newRecord = datumReader.read(/*reuse=*/ null, decoder);
+    return newRecord;
   }
 
   @Override
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index 518c4a80cdb..d5dcee095c6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -83,6 +83,8 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
@@ -2206,6 +2208,54 @@ public class BigQueryIOStorageReadTest {
                     new TupleTag<>("output"), 
FieldAccessDescriptor.withFieldNames("foo"))));
   }
 
+  @Test
+  public void testReadFromBigQueryAvroObjectsMutation() throws Exception {
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSession")
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+            .build();
+
+    ReadRowsRequest expectedRequest =
+        ReadRowsRequest.newBuilder().setReadStream("readStream").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(createRecord("A", 1, AVRO_SCHEMA), 
createRecord("B", 2, AVRO_SCHEMA));
+
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponse(AVRO_SCHEMA, records.subList(0, 1), 0.0, 0.5),
+            createResponse(AVRO_SCHEMA, records.subList(1, 2), 0.5, 1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(expectedRequest, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses));
+
+    BigQueryStorageStreamSource<GenericRecord> streamSource =
+        BigQueryStorageStreamSource.create(
+            readSession,
+            ReadStream.newBuilder().setName("readStream").build(),
+            TABLE_SCHEMA,
+            SchemaAndRecord::getRecord,
+            AvroCoder.of(AVRO_SCHEMA),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient));
+
+    BoundedReader<GenericRecord> reader = streamSource.createReader(options);
+
+    // Reads A.
+    assertTrue(reader.start());
+    GenericRecord rowA = reader.getCurrent();
+    assertEquals(new Utf8("A"), rowA.get("name"));
+
+    // Reads B.
+    assertTrue(reader.advance());
+    GenericRecord rowB = reader.getCurrent();
+    assertEquals(new Utf8("B"), rowB.get("name"));
+
+    // Make sure rowA has not mutated after advance
+    assertEquals(new Utf8("A"), rowA.get("name"));
+  }
+
   private static org.apache.arrow.vector.types.pojo.Field field(
       String name,
       boolean nullable,

Reply via email to