This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e0a36a28c11 Remove reuse of GenericRecord instance when reading Avro
from BigQuery (#25320)
e0a36a28c11 is described below
commit e0a36a28c11e9d5d9c410e35386c7e6223cf2f42
Author: Bruno Volpato <[email protected]>
AuthorDate: Thu Feb 9 09:46:51 2023 -0500
Remove reuse of GenericRecord instance when reading Avro from BigQuery
(#25320)
---
.../io/gcp/bigquery/BigQueryStorageAvroReader.java | 10 ++---
.../io/gcp/bigquery/BigQueryIOStorageReadTest.java | 50 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 6 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageAvroReader.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageAvroReader.java
index 486a5dac638..50ce6a89f7a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageAvroReader.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageAvroReader.java
@@ -35,7 +35,6 @@ class BigQueryStorageAvroReader implements
BigQueryStorageReader {
private final Schema avroSchema;
private final DatumReader<GenericRecord> datumReader;
private @Nullable BinaryDecoder decoder;
- private @Nullable GenericRecord record;
private long rowCount;
BigQueryStorageAvroReader(ReadSession readSession) {
@@ -43,7 +42,6 @@ class BigQueryStorageAvroReader implements
BigQueryStorageReader {
this.datumReader = new GenericDatumReader<>(avroSchema);
this.rowCount = 0;
decoder = null;
- record = null;
}
@Override
@@ -68,11 +66,11 @@ class BigQueryStorageAvroReader implements
BigQueryStorageReader {
public GenericRecord readSingleRecord() throws IOException {
Preconditions.checkStateNotNull(decoder);
@SuppressWarnings({
- "nullness" // reused record can be null but avro not annotated
+ "nullness" // reused record is null but avro not annotated
})
- GenericRecord newRecord = datumReader.read(record, decoder);
- record = newRecord;
- return record;
+ // record should not be reused, mutating outputted values is unsafe
+ GenericRecord newRecord = datumReader.read(/*reuse=*/ null, decoder);
+ return newRecord;
}
@Override
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index 518c4a80cdb..d5dcee095c6 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -83,6 +83,8 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
@@ -2206,6 +2208,54 @@ public class BigQueryIOStorageReadTest {
new TupleTag<>("output"),
FieldAccessDescriptor.withFieldNames("foo"))));
}
+ @Test
+ public void testReadFromBigQueryAvroObjectsMutation() throws Exception {
+ ReadSession readSession =
+ ReadSession.newBuilder()
+ .setName("readSession")
+
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+ .build();
+
+ ReadRowsRequest expectedRequest =
+ ReadRowsRequest.newBuilder().setReadStream("readStream").build();
+
+ List<GenericRecord> records =
+ Lists.newArrayList(createRecord("A", 1, AVRO_SCHEMA),
createRecord("B", 2, AVRO_SCHEMA));
+
+ List<ReadRowsResponse> responses =
+ Lists.newArrayList(
+ createResponse(AVRO_SCHEMA, records.subList(0, 1), 0.0, 0.5),
+ createResponse(AVRO_SCHEMA, records.subList(1, 2), 0.5, 1.0));
+
+ StorageClient fakeStorageClient = mock(StorageClient.class);
+ when(fakeStorageClient.readRows(expectedRequest, ""))
+ .thenReturn(new FakeBigQueryServerStream<>(responses));
+
+ BigQueryStorageStreamSource<GenericRecord> streamSource =
+ BigQueryStorageStreamSource.create(
+ readSession,
+ ReadStream.newBuilder().setName("readStream").build(),
+ TABLE_SCHEMA,
+ SchemaAndRecord::getRecord,
+ AvroCoder.of(AVRO_SCHEMA),
+ new FakeBigQueryServices().withStorageClient(fakeStorageClient));
+
+ BoundedReader<GenericRecord> reader = streamSource.createReader(options);
+
+ // Reads A.
+ assertTrue(reader.start());
+ GenericRecord rowA = reader.getCurrent();
+ assertEquals(new Utf8("A"), rowA.get("name"));
+
+ // Reads B.
+ assertTrue(reader.advance());
+ GenericRecord rowB = reader.getCurrent();
+ assertEquals(new Utf8("B"), rowB.get("name"));
+
+ // Make sure rowA has not mutated after advance
+ assertEquals(new Utf8("A"), rowA.get("name"));
+ }
+
private static org.apache.arrow.vector.types.pojo.Field field(
String name,
boolean nullable,