This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0c6ef7dd478 Pass BQ coder to underlying avro source (#28143)
0c6ef7dd478 is described below
commit 0c6ef7dd4788d13b3785d4e06d4552907c7200e3
Author: Michel Davit <[email protected]>
AuthorDate: Thu Aug 24 16:48:03 2023 +0200
Pass BQ coder to underlying avro source (#28143)
---
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 28 ++++++++++------------
.../sdk/io/gcp/bigquery/BigQueryIOReadTest.java | 19 +++++++++++++++
.../gcp/bigquery/BigQueryIOStorageQueryTest.java | 13 ++++++++++
3 files changed, 45 insertions(+), 15 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 640d7285424..96abde5dc35 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toList;
import static org.apache.beam.sdk.io.FileSystems.match;
import static
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
@@ -29,6 +31,8 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.util.List;
+import java.util.stream.Stream;
+import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
@@ -40,7 +44,6 @@ import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -242,24 +245,19 @@ abstract class BigQuerySourceBase<T> extends
BoundedSource<T> {
AvroSource.DatumReaderFactory<T> factory = readerFactory.apply(schema);
- List<BoundedSource<T>> avroSources = Lists.newArrayList();
+ Stream<AvroSource<GenericRecord>> avroSources;
// If metadata is available, create AvroSources with said metadata in
SINGLE_FILE_OR_SUBRANGE
// mode.
if (metadata != null) {
- for (MatchResult.Metadata file : metadata) {
- avroSources.add(
- (AvroSource<T>)
-
AvroSource.from(file).withSchema(avroSchema).withDatumReaderFactory(factory));
- }
+ avroSources = metadata.stream().map(AvroSource::from);
} else {
- for (ResourceId file : files) {
- avroSources.add(
- (AvroSource<T>)
- AvroSource.from(file.toString())
- .withSchema(avroSchema)
- .withDatumReaderFactory(factory));
- }
+ avroSources =
files.stream().map(ResourceId::toString).map(AvroSource::from);
}
- return ImmutableList.copyOf(avroSources);
+
+ return avroSources
+ .map(s -> s.withSchema(avroSchema))
+ .map(s -> (AvroSource<T>) s.withDatumReaderFactory(factory))
+ .map(s -> s.withCoder(coder))
+ .collect(collectingAndThen(toList(), ImmutableList::copyOf));
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index ba254eb0973..bc75ba8bd9b 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -80,6 +80,7 @@ import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MoreCollectors;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -804,6 +805,12 @@ public class BigQueryIOReadTest implements Serializable {
// Simulate a repeated call to split(), like a Dataflow worker will
sometimes do.
sources = bqSource.split(200, options);
assertEquals(2, sources.size());
+ assertEquals(
+ TableRowJsonCoder.of(),
+ sources.stream()
+ .map(BoundedSource<TableRow>::getOutputCoder)
+ .distinct()
+ .collect(MoreCollectors.onlyElement()));
// A repeated call to split() should not have caused a duplicate extract
job.
assertEquals(1, fakeJobService.getNumExtractJobCalls());
@@ -992,6 +999,12 @@ public class BigQueryIOReadTest implements Serializable {
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100,
options);
assertEquals(2, sources.size());
+ assertEquals(
+ TableRowJsonCoder.of(),
+ sources.stream()
+ .map(BoundedSource<TableRow>::getOutputCoder)
+ .distinct()
+ .collect(MoreCollectors.onlyElement()));
}
/**
@@ -1058,6 +1071,12 @@ public class BigQueryIOReadTest implements Serializable {
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100,
options);
assertEquals(2, sources.size());
+ assertEquals(
+ TableRowJsonCoder.of(),
+ sources.stream()
+ .map(BoundedSource<TableRow>::getOutputCoder)
+ .distinct()
+ .collect(MoreCollectors.onlyElement()));
}
@Test
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index f54bf8e2e16..af6dd505b91 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -82,6 +82,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MoreCollectors;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -426,6 +427,12 @@ public class BigQueryIOStorageQueryTest {
List<? extends BoundedSource<TableRow>> sources =
querySource.split(bundleSize, options);
assertEquals(expectedStreamCount, sources.size());
+ assertEquals(
+ TableRowJsonCoder.of(),
+ sources.stream()
+ .map(BoundedSource<TableRow>::getOutputCoder)
+ .distinct()
+ .collect(MoreCollectors.onlyElement()));
}
/**
@@ -520,6 +527,12 @@ public class BigQueryIOStorageQueryTest {
List<? extends BoundedSource<TableRow>> sources = querySource.split(1024,
options);
assertEquals(1024, sources.size());
+ assertEquals(
+ TableRowJsonCoder.of(),
+ sources.stream()
+ .map(BoundedSource<TableRow>::getOutputCoder)
+ .distinct()
+ .collect(MoreCollectors.onlyElement()));
}
private static final String AVRO_SCHEMA_STRING =