This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c6ef7dd478 Pass BQ coder to underlying avro source (#28143)
0c6ef7dd478 is described below

commit 0c6ef7dd4788d13b3785d4e06d4552907c7200e3
Author: Michel Davit <[email protected]>
AuthorDate: Thu Aug 24 16:48:03 2023 +0200

    Pass BQ coder to underlying avro source (#28143)
---
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java    | 28 ++++++++++------------
 .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java    | 19 +++++++++++++++
 .../gcp/bigquery/BigQueryIOStorageQueryTest.java   | 13 ++++++++++
 3 files changed, 45 insertions(+), 15 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 640d7285424..96abde5dc35 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toList;
 import static org.apache.beam.sdk.io.FileSystems.match;
 import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
@@ -29,6 +31,8 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Stream;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.extensions.avro.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -40,7 +44,6 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -242,24 +245,19 @@ abstract class BigQuerySourceBase<T> extends 
BoundedSource<T> {
 
     AvroSource.DatumReaderFactory<T> factory = readerFactory.apply(schema);
 
-    List<BoundedSource<T>> avroSources = Lists.newArrayList();
+    Stream<AvroSource<GenericRecord>> avroSources;
     // If metadata is available, create AvroSources with said metadata in 
SINGLE_FILE_OR_SUBRANGE
     // mode.
     if (metadata != null) {
-      for (MatchResult.Metadata file : metadata) {
-        avroSources.add(
-            (AvroSource<T>)
-                
AvroSource.from(file).withSchema(avroSchema).withDatumReaderFactory(factory));
-      }
+      avroSources = metadata.stream().map(AvroSource::from);
     } else {
-      for (ResourceId file : files) {
-        avroSources.add(
-            (AvroSource<T>)
-                AvroSource.from(file.toString())
-                    .withSchema(avroSchema)
-                    .withDatumReaderFactory(factory));
-      }
+      avroSources = 
files.stream().map(ResourceId::toString).map(AvroSource::from);
     }
-    return ImmutableList.copyOf(avroSources);
+
+    return avroSources
+        .map(s -> s.withSchema(avroSchema))
+        .map(s -> (AvroSource<T>) s.withDatumReaderFactory(factory))
+        .map(s -> s.withCoder(coder))
+        .collect(collectingAndThen(toList(), ImmutableList::copyOf));
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index ba254eb0973..bc75ba8bd9b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -80,6 +80,7 @@ import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MoreCollectors;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -804,6 +805,12 @@ public class BigQueryIOReadTest implements Serializable {
     // Simulate a repeated call to split(), like a Dataflow worker will 
sometimes do.
     sources = bqSource.split(200, options);
     assertEquals(2, sources.size());
+    assertEquals(
+        TableRowJsonCoder.of(),
+        sources.stream()
+            .map(BoundedSource<TableRow>::getOutputCoder)
+            .distinct()
+            .collect(MoreCollectors.onlyElement()));
 
     // A repeated call to split() should not have caused a duplicate extract 
job.
     assertEquals(1, fakeJobService.getNumExtractJobCalls());
@@ -992,6 +999,12 @@ public class BigQueryIOReadTest implements Serializable {
 
     List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, 
options);
     assertEquals(2, sources.size());
+    assertEquals(
+        TableRowJsonCoder.of(),
+        sources.stream()
+            .map(BoundedSource<TableRow>::getOutputCoder)
+            .distinct()
+            .collect(MoreCollectors.onlyElement()));
   }
 
   /**
@@ -1058,6 +1071,12 @@ public class BigQueryIOReadTest implements Serializable {
 
     List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, 
options);
     assertEquals(2, sources.size());
+    assertEquals(
+        TableRowJsonCoder.of(),
+        sources.stream()
+            .map(BoundedSource<TableRow>::getOutputCoder)
+            .distinct()
+            .collect(MoreCollectors.onlyElement()));
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index f54bf8e2e16..af6dd505b91 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -82,6 +82,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MoreCollectors;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -426,6 +427,12 @@ public class BigQueryIOStorageQueryTest {
 
     List<? extends BoundedSource<TableRow>> sources = 
querySource.split(bundleSize, options);
     assertEquals(expectedStreamCount, sources.size());
+    assertEquals(
+        TableRowJsonCoder.of(),
+        sources.stream()
+            .map(BoundedSource<TableRow>::getOutputCoder)
+            .distinct()
+            .collect(MoreCollectors.onlyElement()));
   }
 
   /**
@@ -520,6 +527,12 @@ public class BigQueryIOStorageQueryTest {
 
     List<? extends BoundedSource<TableRow>> sources = querySource.split(1024, 
options);
     assertEquals(1024, sources.size());
+    assertEquals(
+        TableRowJsonCoder.of(),
+        sources.stream()
+            .map(BoundedSource<TableRow>::getOutputCoder)
+            .distinct()
+            .collect(MoreCollectors.onlyElement()));
   }
 
   private static final String AVRO_SCHEMA_STRING =

Reply via email to