This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new b24e82c Merge pull request #8620: [BEAM-6673] Add schema support to BigQuery reads b24e82c is described below commit b24e82cc40e732ea6cff023650cc2b83cf14f32a Author: Charith Ellawala <chari...@users.noreply.github.com> AuthorDate: Fri Jun 7 21:41:52 2019 +0100 Merge pull request #8620: [BEAM-6673] Add schema support to BigQuery reads --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 91 ++++++++++--- .../sdk/io/gcp/bigquery/BigQueryQuerySource.java | 103 ++------------ ...uerySource.java => BigQueryQuerySourceDef.java} | 107 +++++++-------- .../bigquery/BigQuerySchemaRetrievalException.java | 25 ++++ .../sdk/io/gcp/bigquery/BigQuerySourceDef.java | 52 ++++++++ .../sdk/io/gcp/bigquery/BigQueryTableSource.java | 61 ++------- ...ableSource.java => BigQueryTableSourceDef.java} | 88 +++++------- .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 131 +++++++++++++++++- .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java | 148 +++++++++++++-------- .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 109 +++++++++++++++ 10 files changed, 598 insertions(+), 317 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index a66c903..5e6e59f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -83,6 +83,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; @@ -100,6 +101,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptors; @@ -405,6 +407,14 @@ public class BigQueryIO { return read(new TableRowParser()).withCoder(TableRowJsonCoder.of()); } + /** Like {@link #readTableRows()} but with {@link Schema} support. */ + public static TypedRead<TableRow> readTableRowsWithSchema() { + return read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) + .withBeamRowConverters( + BigQueryUtils.tableRowToBeamRow(), BigQueryUtils.tableRowFromBeamRow()); + } + /** * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per * each row of the table or query result, parsed from the BigQuery AVRO format using the specified @@ -593,6 +603,12 @@ public class BigQueryIO { DIRECT_READ, } + interface ToBeamRowFunction<T> + extends SerializableFunction<Schema, SerializableFunction<T, Row>> {} + + interface FromBeamRowFunction<T> + extends SerializableFunction<Schema, SerializableFunction<Row, T>> {} + abstract Builder<T> toBuilder(); @AutoValue.Builder @@ -628,6 +644,12 @@ public class BigQueryIO { abstract Builder<T> setCoder(Coder<T> coder); abstract Builder<T> setKmsKey(String kmsKey); + + @Experimental(Experimental.Kind.SCHEMAS) + abstract Builder<T> setToBeamRowFn(ToBeamRowFunction<T> toRowFn); + + @Experimental(Experimental.Kind.SCHEMAS) + abstract Builder<T> setFromBeamRowFn(FromBeamRowFunction<T> fromRowFn); } @Nullable @@ -669,6 +691,14 @@ public class BigQueryIO { @Nullable abstract String getKmsKey(); + @Nullable + @Experimental(Experimental.Kind.SCHEMAS) + abstract ToBeamRowFunction<T> getToBeamRowFn(); + + @Nullable + @Experimental(Experimental.Kind.SCHEMAS) + abstract FromBeamRowFunction<T> getFromBeamRowFn(); + /** * An enumeration type for the priority of a query. * @@ -709,27 +739,22 @@ public class BigQueryIO { } } - private BigQuerySourceBase<T> createSource(String jobUuid, Coder<T> coder) { - BigQuerySourceBase<T> source; + private BigQuerySourceDef createSourceDef() { + BigQuerySourceDef sourceDef; if (getQuery() == null) { - source = - BigQueryTableSource.create( - jobUuid, getTableProvider(), getBigQueryServices(), coder, getParseFn()); + sourceDef = BigQueryTableSourceDef.create(getBigQueryServices(), getTableProvider()); } else { - source = - BigQueryQuerySource.create( - jobUuid, + sourceDef = + BigQueryQuerySourceDef.create( + getBigQueryServices(), getQuery(), getFlattenResults(), getUseLegacySql(), - getBigQueryServices(), - coder, - getParseFn(), MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH), getQueryLocation(), getKmsKey()); } - return source; + return sourceDef; } private BigQueryStorageQuerySource<T> createStorageQuerySource( @@ -840,6 +865,12 @@ public class BigQueryIO { } checkArgument(getParseFn() != null, "A parseFn is required"); + // if both toRowFn and fromRowFn values are set, enable Beam schema support + boolean beamSchemaEnabled = false; + if (getToBeamRowFn() != null && getFromBeamRowFn() != null) { + beamSchemaEnabled = true; + } + Pipeline p = input.getPipeline(); final Coder<T> coder = inferCoder(p.getCoderRegistry()); @@ -852,6 +883,7 @@ public class BigQueryIO { "Invalid BigQueryIO.Read: Specifies table read options, " + "which only applies when using Method.DIRECT_READ"); + final BigQuerySourceDef sourceDef = createSourceDef(); final PCollectionView<String> jobIdTokenView; PCollection<String> jobIdTokenCollection; PCollection<T> rows; @@ -862,7 +894,10 @@ public class BigQueryIO { p.apply("TriggerIdCreation", Create.of(staticJobUuid)) .apply("ViewId", View.asSingleton()); // Apply the traditional Source model. - rows = p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid, coder))); + rows = + p.apply( + org.apache.beam.sdk.io.Read.from( + sourceDef.toSource(staticJobUuid, coder, getParseFn()))); } else { // Create a singleton job ID token at execution time. jobIdTokenCollection = @@ -888,7 +923,8 @@ public class BigQueryIO { @ProcessElement public void processElement(ProcessContext c) throws Exception { String jobUuid = c.element(); - BigQuerySourceBase<T> source = createSource(jobUuid, coder); + BigQuerySourceBase<T> source = + sourceDef.toSource(jobUuid, coder, getParseFn()); BigQueryOptions options = c.getPipelineOptions().as(BigQueryOptions.class); ExtractResult res = source.extractFiles(options); @@ -919,7 +955,8 @@ public class BigQueryIO { BigQueryHelpers.fromJsonString( c.sideInput(schemaView), TableSchema.class); String jobUuid = c.sideInput(jobIdTokenView); - BigQuerySourceBase<T> source = createSource(jobUuid, coder); + BigQuerySourceBase<T> source = + sourceDef.toSource(jobUuid, coder, getParseFn()); List<BoundedSource<T>> sources = source.createSources( ImmutableList.of( @@ -966,7 +1003,18 @@ public class BigQueryIO { } } }; - return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView)); + + rows = rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView)); + + if (beamSchemaEnabled) { + BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class); + Schema beamSchema = sourceDef.getBeamSchema(bqOptions); + SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema); + SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema); + + rows.setSchema(beamSchema, toBeamRow, fromBeamRow); + } + return rows; } private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) { @@ -1201,6 +1249,17 @@ public class BigQueryIO { return toBuilder().setKmsKey(kmsKey).build(); } + /** + * Sets the functions to convert elements to/from {@link Row} objects. + * + * <p>Setting these conversion functions is necessary to enable {@link Schema} support. + */ + @Experimental(Experimental.Kind.SCHEMAS) + public TypedRead<T> withBeamRowConverters( + ToBeamRowFunction<T> toRowFn, FromBeamRowFunction<T> fromRowFn) { + return toBuilder().setToBeamRowFn(toRowFn).setFromBeamRowFn(fromRowFn).build(); + } + /** See {@link Read#from(String)}. */ public TypedRead<T> from(String tableSpec) { return from(StaticValueProvider.of(tableSpec)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java index 375cc4f..f2a70da 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java @@ -17,20 +17,10 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference; -import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; - -import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.TableReference; import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; @@ -45,115 +35,44 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { static <T> BigQueryQuerySource<T> create( String stepUuid, - ValueProvider<String> query, - Boolean flattenResults, - Boolean useLegacySql, + BigQueryQuerySourceDef queryDef, BigQueryServices bqServices, Coder<T> coder, - SerializableFunction<SchemaAndRecord, T> parseFn, - QueryPriority priority, - String location, - String kmsKey) { - return new BigQueryQuerySource<>( - stepUuid, - query, - flattenResults, - useLegacySql, - bqServices, - coder, - parseFn, - priority, - location, - kmsKey); + SerializableFunction<SchemaAndRecord, T> parseFn) { + return new BigQueryQuerySource<>(stepUuid, queryDef, bqServices, coder, parseFn); } - private final ValueProvider<String> query; - private final Boolean flattenResults; - private final Boolean useLegacySql; - private final QueryPriority priority; - private final String location; - private final String kmsKey; - - private transient AtomicReference<JobStatistics> dryRunJobStats; + private final BigQueryQuerySourceDef queryDef; private BigQueryQuerySource( String stepUuid, - ValueProvider<String> query, - Boolean flattenResults, - Boolean useLegacySql, + BigQueryQuerySourceDef queryDef, BigQueryServices bqServices, Coder<T> coder, - SerializableFunction<SchemaAndRecord, T> parseFn, - QueryPriority priority, - String location, - String kmsKey) { + SerializableFunction<SchemaAndRecord, T> parseFn) { super(stepUuid, bqServices, coder, parseFn); - this.query = checkNotNull(query, "query"); - this.flattenResults = checkNotNull(flattenResults, "flattenResults"); - this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql"); - this.priority = priority; - this.location = location; - this.kmsKey = kmsKey; - dryRunJobStats = new AtomicReference<>(); - } - - /** - * Since the query helper reference is declared as transient, neither the AtomicReference nor the - * structure it refers to are persisted across serialization boundaries. The code below is - * resilient to the QueryHelper object disappearing in between method calls, but the reference - * object must be recreated at deserialization time. - */ - private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { - in.defaultReadObject(); - dryRunJobStats = new AtomicReference<>(); + this.queryDef = queryDef; } @Override public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - return BigQueryQueryHelper.dryRunQueryIfNeeded( - bqServices, - options.as(BigQueryOptions.class), - dryRunJobStats, - query.get(), - flattenResults, - useLegacySql, - location) - .getQuery() - .getTotalBytesProcessed(); + return queryDef.getEstimatedSizeBytes(options.as(BigQueryOptions.class)); } @Override protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException, InterruptedException { - return BigQueryQueryHelper.executeQuery( - bqServices, - bqOptions, - dryRunJobStats, - stepUuid, - query.get(), - flattenResults, - useLegacySql, - priority, - location, - kmsKey); + return queryDef.getTableReference(bqOptions, stepUuid); } @Override protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { - TableReference tableToRemove = - createTempTableReference( - bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid)); - - DatasetService tableService = bqServices.getDatasetService(bqOptions); - LOG.info("Deleting temporary table with query results {}", tableToRemove); - tableService.deleteTable(tableToRemove); - LOG.info("Deleting temporary dataset with query results {}", tableToRemove.getDatasetId()); - tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId()); + queryDef.cleanupTempResource(bqOptions, stepUuid); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("query", query)); + builder.add(DisplayData.item("query", queryDef.getQuery())); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java similarity index 67% copy from sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java copy to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java index 375cc4f..1f2366f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java @@ -23,74 +23,54 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; import java.io.IOException; import java.io.ObjectInputStream; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** A {@link BigQuerySourceBase} for querying BigQuery tables. */ -@VisibleForTesting -class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { - - private static final Logger LOG = LoggerFactory.getLogger(BigQueryQuerySource.class); - - static <T> BigQueryQuerySource<T> create( - String stepUuid, - ValueProvider<String> query, - Boolean flattenResults, - Boolean useLegacySql, - BigQueryServices bqServices, - Coder<T> coder, - SerializableFunction<SchemaAndRecord, T> parseFn, - QueryPriority priority, - String location, - String kmsKey) { - return new BigQueryQuerySource<>( - stepUuid, - query, - flattenResults, - useLegacySql, - bqServices, - coder, - parseFn, - priority, - location, - kmsKey); - } +class BigQueryQuerySourceDef implements BigQuerySourceDef { + private static final Logger LOG = LoggerFactory.getLogger(BigQueryQuerySourceDef.class); + private final BigQueryServices bqServices; private final ValueProvider<String> query; private final Boolean flattenResults; private final Boolean useLegacySql; - private final QueryPriority priority; + private final BigQueryIO.TypedRead.QueryPriority priority; private final String location; private final String kmsKey; private transient AtomicReference<JobStatistics> dryRunJobStats; - private BigQueryQuerySource( - String stepUuid, + static BigQueryQuerySourceDef create( + BigQueryServices bqServices, ValueProvider<String> query, Boolean flattenResults, Boolean useLegacySql, + BigQueryIO.TypedRead.QueryPriority priority, + String location, + String kmsKey) { + return new BigQueryQuerySourceDef( + bqServices, query, flattenResults, useLegacySql, priority, location, kmsKey); + } + + private BigQueryQuerySourceDef( BigQueryServices bqServices, - Coder<T> coder, - SerializableFunction<SchemaAndRecord, T> parseFn, - QueryPriority priority, + ValueProvider<String> query, + Boolean flattenResults, + Boolean useLegacySql, + BigQueryIO.TypedRead.QueryPriority priority, String location, String kmsKey) { - super(stepUuid, bqServices, coder, parseFn); this.query = checkNotNull(query, "query"); this.flattenResults = checkNotNull(flattenResults, "flattenResults"); this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql"); + this.bqServices = bqServices; this.priority = priority; this.location = location; this.kmsKey = kmsKey; @@ -108,11 +88,10 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { dryRunJobStats = new AtomicReference<>(); } - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + long getEstimatedSizeBytes(BigQueryOptions bqOptions) throws Exception { return BigQueryQueryHelper.dryRunQueryIfNeeded( bqServices, - options.as(BigQueryOptions.class), + bqOptions, dryRunJobStats, query.get(), flattenResults, @@ -122,8 +101,7 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { .getTotalBytesProcessed(); } - @Override - protected TableReference getTableToExtract(BigQueryOptions bqOptions) + TableReference getTableReference(BigQueryOptions bqOptions, String stepUuid) throws IOException, InterruptedException { return BigQueryQueryHelper.executeQuery( bqServices, @@ -138,22 +116,47 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { kmsKey); } - @Override - protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { + void cleanupTempResource(BigQueryOptions bqOptions, String stepUuid) throws Exception { TableReference tableToRemove = createTempTableReference( bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid)); - DatasetService tableService = bqServices.getDatasetService(bqOptions); + BigQueryServices.DatasetService tableService = bqServices.getDatasetService(bqOptions); LOG.info("Deleting temporary table with query results {}", tableToRemove); tableService.deleteTable(tableToRemove); LOG.info("Deleting temporary dataset with query results {}", tableToRemove.getDatasetId()); tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId()); } + /** {@inheritDoc} */ + @Override + public <T> BigQuerySourceBase<T> toSource( + String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn) { + return BigQueryQuerySource.create(stepUuid, this, bqServices, coder, parseFn); + } + + /** {@inheritDoc} */ @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("query", query)); + public Schema getBeamSchema(BigQueryOptions bqOptions) { + try { + JobStatistics stats = + BigQueryQueryHelper.dryRunQueryIfNeeded( + bqServices, + bqOptions, + dryRunJobStats, + query.get(), + flattenResults, + useLegacySql, + location); + TableSchema tableSchema = stats.getQuery().getSchema(); + return BigQueryUtils.fromTableSchema(tableSchema); + } catch (IOException | InterruptedException | NullPointerException e) { + throw new BigQuerySchemaRetrievalException( + "Exception while trying to retrieve schema of query", e); + } + } + + ValueProvider<String> getQuery() { + return query; } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaRetrievalException.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaRetrievalException.java new file mode 100644 index 0000000..2736e56 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaRetrievalException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +/** Exception to signal that BigQuery schema retrieval failed. */ +public class BigQuerySchemaRetrievalException extends RuntimeException { + BigQuerySchemaRetrievalException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java new file mode 100644 index 0000000..0f3de1d --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.SerializableFunction; + +/** + * Represents a source used for {@link BigQueryIO#read(SerializableFunction)}. Currently this could + * be either a table or a query. Direct read sources are not yet supported. + */ +interface BigQuerySourceDef extends Serializable { + /** + * Convert this source definition into a concrete source implementation. + * + * @param stepUuid Job UUID + * @param coder Coder + * @param parseFn Parse function + * @param <T> Type of the resulting PCollection + * @return An implementation of {@link BigQuerySourceBase} + */ + <T> BigQuerySourceBase<T> toSource( + String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn); + + /** + * Extract the Beam {@link Schema} corresponding to this source. + * + * @param bqOptions BigQueryOptions + * @return Beam schema of the source + * @throws BigQuerySchemaRetrievalException if schema retrieval fails + */ + @Experimental(Experimental.Kind.SCHEMAS) + Schema getBeamSchema(BigQueryOptions bqOptions); +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java index f8ea5e1..4334f7e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java @@ -17,22 +17,15 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; - import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,69 +36,41 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> { static <T> BigQueryTableSource<T> create( String stepUuid, - ValueProvider<TableReference> table, + BigQueryTableSourceDef tableDef, BigQueryServices bqServices, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn) { - return new BigQueryTableSource<>(stepUuid, table, bqServices, coder, parseFn); + return new BigQueryTableSource<>(stepUuid, tableDef, bqServices, coder, parseFn); } - private final ValueProvider<String> jsonTable; + private final BigQueryTableSourceDef tableDef; private final AtomicReference<Long> tableSizeBytes; private BigQueryTableSource( String stepUuid, - ValueProvider<TableReference> table, + BigQueryTableSourceDef tableDef, BigQueryServices bqServices, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn) { super(stepUuid, bqServices, coder, parseFn); - this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson()); + this.tableDef = tableDef; this.tableSizeBytes = new AtomicReference<>(); } @Override protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException { - TableReference tableReference = - BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class); - return setDefaultProjectIfAbsent(bqOptions, tableReference); - } - - /** - * Sets the {@link TableReference#projectId} of the provided table reference to the id of the - * default project if the table reference does not have a project ID specified. - */ - private TableReference setDefaultProjectIfAbsent( - BigQueryOptions bqOptions, TableReference tableReference) { - if (Strings.isNullOrEmpty(tableReference.getProjectId())) { - checkState( - !Strings.isNullOrEmpty(bqOptions.getProject()), - "No project ID set in %s or %s, cannot construct a complete %s", - TableReference.class.getSimpleName(), - BigQueryOptions.class.getSimpleName(), - TableReference.class.getSimpleName()); - LOG.info( - "Project ID not set in {}. Using default project from {}.", - TableReference.class.getSimpleName(), - BigQueryOptions.class.getSimpleName()); - tableReference.setProjectId(bqOptions.getProject()); - } - return tableReference; + return tableDef.getTableReference(bqOptions); } @Override public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception { if (tableSizeBytes.get() == null) { - TableReference table = - setDefaultProjectIfAbsent( - options.as(BigQueryOptions.class), - BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class)); - - Table tableRef = - bqServices.getDatasetService(options.as(BigQueryOptions.class)).getTable(table); - Long numBytes = tableRef.getNumBytes(); - if (tableRef.getStreamingBuffer() != null) { - numBytes += tableRef.getStreamingBuffer().getEstimatedBytes().longValue(); + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + TableReference tableRef = tableDef.getTableReference(bqOptions); + Table table = bqServices.getDatasetService(bqOptions).getTable(tableRef); + Long numBytes = table.getNumBytes(); + if (table.getStreamingBuffer() != null) { + numBytes += table.getStreamingBuffer().getEstimatedBytes().longValue(); } tableSizeBytes.compareAndSet(null, numBytes); @@ -121,6 +86,6 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("table", jsonTable)); + builder.add(DisplayData.item("table", tableDef.getJsonTable())); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java similarity index 52% copy from sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java copy to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java index f8ea5e1..07159af 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java @@ -20,52 +20,37 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; -import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** A {@link BigQuerySourceBase} for reading BigQuery tables. */ -@VisibleForTesting -class BigQueryTableSource<T> extends BigQuerySourceBase<T> { - private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableSource.class); - - static <T> BigQueryTableSource<T> create( - String stepUuid, - ValueProvider<TableReference> table, - BigQueryServices bqServices, - Coder<T> coder, - SerializableFunction<SchemaAndRecord, T> parseFn) { - return new BigQueryTableSource<>(stepUuid, table, bqServices, coder, parseFn); - } +class BigQueryTableSourceDef implements BigQuerySourceDef { + private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableSourceDef.class); + private final BigQueryServices bqServices; private final ValueProvider<String> jsonTable; - private final AtomicReference<Long> tableSizeBytes; - private BigQueryTableSource( - String stepUuid, - ValueProvider<TableReference> table, - BigQueryServices bqServices, - Coder<T> coder, - SerializableFunction<SchemaAndRecord, T> parseFn) { - super(stepUuid, bqServices, coder, parseFn); - this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson()); - this.tableSizeBytes = new AtomicReference<>(); + static BigQueryTableSourceDef create( + BigQueryServices bqServices, ValueProvider<TableReference> table) { + ValueProvider<String> jsonTable = + ValueProvider.NestedValueProvider.of( + checkNotNull(table, "table"), new BigQueryHelpers.TableRefToJson()); + return new BigQueryTableSourceDef(bqServices, jsonTable); } - @Override - protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException { + private BigQueryTableSourceDef(BigQueryServices bqServices, ValueProvider<String> jsonTable) { + this.bqServices = bqServices; + this.jsonTable = jsonTable; + } + + TableReference getTableReference(BigQueryOptions bqOptions) throws IOException { TableReference tableReference = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class); return setDefaultProjectIfAbsent(bqOptions, tableReference); @@ -93,34 +78,27 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> { return tableReference; } - @Override - public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - if (tableSizeBytes.get() == null) { - TableReference table = - setDefaultProjectIfAbsent( - options.as(BigQueryOptions.class), - BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class)); - - Table tableRef = - bqServices.getDatasetService(options.as(BigQueryOptions.class)).getTable(table); - Long numBytes = tableRef.getNumBytes(); - if (tableRef.getStreamingBuffer() != null) { - numBytes += tableRef.getStreamingBuffer().getEstimatedBytes().longValue(); - } - - tableSizeBytes.compareAndSet(null, numBytes); - } - return tableSizeBytes.get(); + ValueProvider<String> getJsonTable() { + return jsonTable; } + /** {@inheritDoc} */ @Override - protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { - // Do nothing. + public <T> BigQuerySourceBase<T> toSource( + String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn) { + return BigQueryTableSource.create(stepUuid, this, bqServices, coder, parseFn); } + /** {@inheritDoc} */ @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("table", jsonTable)); + public Schema getBeamSchema(BigQueryOptions bqOptions) { + try { + TableReference tableRef = getTableReference(bqOptions); + TableSchema tableSchema = + bqServices.getDatasetService(bqOptions).getTable(tableRef).getSchema(); + return BigQueryUtils.fromTableSchema(tableSchema); + } catch (IOException | InterruptedException | NullPointerException e) { + throw new BigQuerySchemaRetrievalException("Exception while trying to retrieve schema", e); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index bd1fda3..1a87875 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -31,10 +31,12 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.IntStream; import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.schemas.LogicalTypes; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -130,7 +132,7 @@ public class BigQueryUtils { .put("SqlDateType", StandardSQLTypeName.DATE) .put("SqlTimeType", StandardSQLTypeName.TIME) .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME) - .put("SqlTimestampWithLocalTzType", StandardSQLTypeName.TIMESTAMP) + .put("SqlTimestampWithLocalTzType", StandardSQLTypeName.DATETIME) .put("SqlCharType", StandardSQLTypeName.STRING) .build(); @@ -149,6 +151,79 @@ public class BigQueryUtils { return BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName()); } + /** + * Get the Beam {@link FieldType} from a BigQuery type name. + * + * <p>Supports both standard and legacy SQL types. + * + * @param typeName Name of the type + * @param nestedFields Nested fields for the given type (eg. RECORD type) + * @return Corresponding Beam {@link FieldType} + */ + private static FieldType fromTableFieldSchemaType( + String typeName, List<TableFieldSchema> nestedFields) { + switch (typeName) { + case "STRING": + return FieldType.STRING; + case "BYTES": + return FieldType.BYTES; + case "INT64": + case "INTEGER": + return FieldType.INT64; + case "FLOAT64": + case "FLOAT": + return FieldType.DOUBLE; + case "BOOL": + case "BOOLEAN": + return FieldType.BOOLEAN; + case "TIMESTAMP": + return FieldType.DATETIME; + case "TIME": + return FieldType.logicalType( + new LogicalTypes.PassThroughLogicalType<Instant>( + "SqlTimeType", "", FieldType.DATETIME) {}); + case "DATE": + return FieldType.logicalType( + new LogicalTypes.PassThroughLogicalType<Instant>( + "SqlDateType", "", FieldType.DATETIME) {}); + case "DATETIME": + return FieldType.logicalType( + new LogicalTypes.PassThroughLogicalType<Instant>( + "SqlTimestampWithLocalTzType", "", FieldType.DATETIME) {}); + case "STRUCT": + case "RECORD": + Schema rowSchema = fromTableFieldSchema(nestedFields); + return FieldType.row(rowSchema); + default: + throw new UnsupportedOperationException( + "Converting BigQuery type " + typeName + " to Beam type is unsupported"); + } + } + + private static Schema fromTableFieldSchema(List<TableFieldSchema> tableFieldSchemas) { + Schema.Builder schemaBuilder = Schema.builder(); + for (TableFieldSchema tableFieldSchema : tableFieldSchemas) { + FieldType fieldType = + fromTableFieldSchemaType(tableFieldSchema.getType(), tableFieldSchema.getFields()); + + Optional<Mode> fieldMode = Optional.ofNullable(tableFieldSchema.getMode()).map(Mode::valueOf); + if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) { + fieldType = FieldType.array(fieldType); + } + + // if the mode is not defined or if it is set to NULLABLE, then the field is nullable + boolean nullable = + !fieldMode.isPresent() || fieldMode.filter(m -> m == Mode.NULLABLE).isPresent(); + Field field = Field.of(tableFieldSchema.getName(), fieldType).withNullable(nullable); + if (tableFieldSchema.getDescription() != null + && !"".equals(tableFieldSchema.getDescription())) { + field = field.withDescription(tableFieldSchema.getDescription()); + } + schemaBuilder.addField(field); + } + return schemaBuilder.build(); + } + private static List<TableFieldSchema> toTableFieldSchema(Schema schema) { List<TableFieldSchema> fields = new ArrayList<>(schema.getFieldCount()); for (Field schemaField : schema.getFields()) { @@ -188,6 +263,25 @@ public class BigQueryUtils { return new TableSchema().setFields(toTableFieldSchema(schema)); } + /** Convert a BigQuery {@link TableSchema} to a Beam {@link Schema}. */ + public static Schema fromTableSchema(TableSchema tableSchema) { + return fromTableFieldSchema(tableSchema.getFields()); + } + + private static final BigQueryIO.TypedRead.ToBeamRowFunction<TableRow> + TABLE_ROW_TO_BEAM_ROW_FUNCTION = beamSchema -> (TableRow tr) -> toBeamRow(beamSchema, tr); + + public static final BigQueryIO.TypedRead.ToBeamRowFunction<TableRow> tableRowToBeamRow() { + return TABLE_ROW_TO_BEAM_ROW_FUNCTION; + } + + private static final BigQueryIO.TypedRead.FromBeamRowFunction<TableRow> + TABLE_ROW_FROM_BEAM_ROW_FUNCTION = ignored -> BigQueryUtils::toTableRow; + + public static final BigQueryIO.TypedRead.FromBeamRowFunction<TableRow> tableRowFromBeamRow() { + return TABLE_ROW_FROM_BEAM_ROW_FUNCTION; + } + private static final SerializableFunction<Row, TableRow> ROW_TO_TABLE_ROW = new ToTableRow(SerializableFunctions.identity()); @@ -288,6 +382,36 @@ public class BigQueryUtils { } /** + * Tries to convert a JSON {@link TableRow} from BigQuery into a Beam {@link Row}. + * + * <p>Only supports basic types and arrays. Doesn't support date types or structs. + */ + public static Row toBeamRow(Schema rowSchema, TableRow jsonBqRow) { + // TODO deprecate toBeamRow(Schema, TableSchema, TableRow) function in favour of this function. + // This function attempts to convert TableRows without having access to the + // corresponding TableSchema because: + // 1. TableSchema contains redundant information already available in the Schema object. + // 2. TableSchema objects are not serializable and are therefore harder to propagate through a + // pipeline. + return rowSchema.getFields().stream() + .map(field -> toBeamRowFieldValue(field, jsonBqRow.get(field.getName()))) + .collect(toRow(rowSchema)); + } + + private static Object toBeamRowFieldValue(Field field, Object bqValue) { + if (bqValue == null) { + if (field.getType().getNullable()) { + return null; + } else { + throw new IllegalArgumentException( + "Received null value for non-nullable field " + field.getName()); + } + } + + return toBeamValue(field.getType(), bqValue); + } + + /** * Tries to parse the JSON {@link TableRow} from BigQuery. * * <p>Only supports basic types and arrays. Doesn't support date types. @@ -320,11 +444,14 @@ public class BigQueryUtils { if (jsonBQValue instanceof List) { return ((List<Object>) jsonBQValue) .stream() - .map(v -> ((Map<String, Object>) v).get("v")) .map(v -> toBeamValue(fieldType.getCollectionElementType(), v)) .collect(toList()); } + if (jsonBQValue instanceof TableRow) { + return toBeamRow(fieldType.getRowSchema(), (TableRow) jsonBQValue); + } + throw new UnsupportedOperationException( "Converting BigQuery type '" + jsonBQValue.getClass() diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java index 883bbdd..c74a0d6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java @@ -51,6 +51,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.Select; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; @@ -65,6 +67,7 @@ import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; @@ -405,6 +408,65 @@ public class BigQueryIOReadTest implements Serializable { } @Test + public void testReadTableWithSchema() throws IOException, InterruptedException { + // setup + Table someTable = new Table(); + someTable.setSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))); + someTable.setTableReference( + new TableReference() + .setProjectId("non-executing-project") + .setDatasetId("schema_dataset") + .setTableId("schema_table")); + someTable.setNumBytes(1024L * 1024L); + FakeDatasetService fakeDatasetService = new FakeDatasetService(); + fakeDatasetService.createDataset("non-executing-project", "schema_dataset", "", "", null); + fakeDatasetService.createTable(someTable); + + List<TableRow> records = + Lists.newArrayList( + new TableRow().set("name", "a").set("number", 1L), + new TableRow().set("name", "b").set("number", 2L), + new TableRow().set("name", "c").set("number", 3L)); + + fakeDatasetService.insertAll(someTable.getTableReference(), records, null); + + FakeBigQueryServices fakeBqServices = + new FakeBigQueryServices() + .withJobService(new FakeJobService()) + .withDatasetService(fakeDatasetService); + + // test + BigQueryIO.TypedRead<TableRow> read = + BigQueryIO.readTableRowsWithSchema() + .from("non-executing-project:schema_dataset.schema_table") + .withTestServices(fakeBqServices) + .withoutValidation(); + + PCollection<TableRow> bqRows = p.apply(read); + + Schema expectedSchema = + Schema.of( + Schema.Field.of("name", Schema.FieldType.STRING).withNullable(true), + Schema.Field.of("number", Schema.FieldType.INT64).withNullable(true)); + assertEquals(expectedSchema, bqRows.getSchema()); + + PCollection<Row> output = bqRows.apply(Select.fieldNames("name", "number")); + PAssert.that(output) + .containsInAnyOrder( + ImmutableList.of( + Row.withSchema(expectedSchema).addValues("a", 1L).build(), + Row.withSchema(expectedSchema).addValues("b", 2L).build(), + Row.withSchema(expectedSchema).addValues("c", 3L).build())); + + p.run(); + } + + @Test public void testBuildSourceDisplayDataTable() { String tableSpec = "project:dataset.tableid"; @@ -509,12 +571,8 @@ public class BigQueryIOReadTest implements Serializable { String stepUuid = "testStepUuid"; BoundedSource<TableRow> bqSource = - BigQueryTableSource.create( - stepUuid, - ValueProvider.StaticValueProvider.of(table), - fakeBqServices, - TableRowJsonCoder.of(), - BigQueryIO.TableRowParser.INSTANCE); + BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table)) + .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE); PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(testFolder.getRoot().getAbsolutePath()); @@ -562,12 +620,8 @@ public class BigQueryIOReadTest implements Serializable { String stepUuid = "testStepUuid"; BoundedSource<TableRow> bqSource = - BigQueryTableSource.create( - stepUuid, - ValueProvider.StaticValueProvider.of(table), - fakeBqServices, - TableRowJsonCoder.of(), - BigQueryIO.TableRowParser.INSTANCE); + BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table)) + .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE); PipelineOptions options = PipelineOptionsFactory.create(); assertEquals(108, bqSource.getEstimatedSizeBytes(options)); @@ -600,12 +654,8 @@ public class BigQueryIOReadTest implements Serializable { String stepUuid = "testStepUuid"; BoundedSource<TableRow> bqSource = - BigQueryTableSource.create( - stepUuid, - ValueProvider.StaticValueProvider.of(table), - fakeBqServices, - TableRowJsonCoder.of(), - BigQueryIO.TableRowParser.INSTANCE); + BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table)) + .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE); PipelineOptions options = PipelineOptionsFactory.create(); assertEquals(118, bqSource.getEstimatedSizeBytes(options)); @@ -621,18 +671,16 @@ public class BigQueryIOReadTest implements Serializable { bqOptions.setProject("project"); String stepUuid = "testStepUuid"; - BigQueryQuerySource<TableRow> bqSource = - BigQueryQuerySource.create( - stepUuid, - ValueProvider.StaticValueProvider.of(queryString), - true /* flattenResults */, - true /* useLegacySql */, - fakeBqServices, - TableRowJsonCoder.of(), - BigQueryIO.TableRowParser.INSTANCE, - QueryPriority.BATCH, - null, - null); + BigQuerySourceBase<TableRow> bqSource = + BigQueryQuerySourceDef.create( + fakeBqServices, + ValueProvider.StaticValueProvider.of(queryString), + true, /* flattenResults */ + true, /* useLegacySql */ + QueryPriority.BATCH, + null, + null) + .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE); fakeJobService.expectDryRunQuery( bqOptions.getProject(), @@ -697,17 +745,15 @@ public class BigQueryIOReadTest implements Serializable { .setReferencedTables(ImmutableList.of(sourceTableRef, tempTableReference)))); BoundedSource<TableRow> bqSource = - BigQueryQuerySource.create( - stepUuid, - ValueProvider.StaticValueProvider.of(encodedQuery), - true /* flattenResults */, - true /* useLegacySql */, - fakeBqServices, - TableRowJsonCoder.of(), - BigQueryIO.TableRowParser.INSTANCE, - QueryPriority.BATCH, - null, - null); + BigQueryQuerySourceDef.create( + fakeBqServices, + ValueProvider.StaticValueProvider.of(encodedQuery), + true /* flattenResults */, + true /* useLegacySql */, + QueryPriority.BATCH, + null, + null) + .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE); options.setTempLocation(testFolder.getRoot().getAbsolutePath()); @@ -764,17 +810,15 @@ public class BigQueryIOReadTest implements Serializable { .setReferencedTables(ImmutableList.of()))); BoundedSource<TableRow> bqSource = - BigQueryQuerySource.create( - stepUuid, - ValueProvider.StaticValueProvider.of(encodedQuery), - true /* flattenResults */, - true /* useLegacySql */, - fakeBqServices, - TableRowJsonCoder.of(), - BigQueryIO.TableRowParser.INSTANCE, - QueryPriority.BATCH, - null, - null); + BigQueryQuerySourceDef.create( + fakeBqServices, + ValueProvider.StaticValueProvider.of(encodedQuery), + true /* flattenResults */, + true /* useLegacySql */, + QueryPriority.BATCH, + null, + null) + .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE); options.setTempLocation(testFolder.getRoot().getAbsolutePath()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index a23a3ec..3315598 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.collection.IsMapContaining.hasEntry; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThrows; @@ -31,12 +32,14 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions.TruncateTimestamps; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.Row; import org.joda.time.DateTime; import org.joda.time.Instant; +import org.joda.time.chrono.ISOChronology; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -83,22 +86,73 @@ public class BigQueryUtilsTest { .setType(StandardSQLTypeName.INT64.toString()) .setMode(Mode.REPEATED.toString()); + private static final TableFieldSchema ROW = + new TableFieldSchema() + .setName("row") + .setType(StandardSQLTypeName.STRUCT.toString()) + .setMode(Mode.NULLABLE.toString()) + .setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID)); + + private static final TableFieldSchema ROWS = + new TableFieldSchema() + .setName("rows") + .setType(StandardSQLTypeName.STRUCT.toString()) + .setMode(Mode.REPEATED.toString()) + .setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID)); + private static final Row FLAT_ROW = Row.withSchema(FLAT_TYPE) .addValues(123L, 123.456, "test", new DateTime(123456), false) .build(); + private static final TableRow BQ_FLAT_ROW = + new TableRow() + .set("id", "123") + .set("value", "123.456") + .set("name", "test") + .set( + "timestamp", + String.valueOf( + new DateTime(123456L, ISOChronology.getInstanceUTC()).getMillis() / 1000.0D)) + .set("valid", "false"); + private static final Row NULL_FLAT_ROW = Row.withSchema(FLAT_TYPE).addValues(null, null, null, null, null).build(); + private static final TableRow BQ_NULL_FLAT_ROW = + new TableRow() + .set("id", null) + .set("value", null) + .set("name", null) + .set("timestamp", null) + .set("valid", null); + private static final Row ARRAY_ROW = Row.withSchema(ARRAY_TYPE).addValues((Object) Arrays.asList(123L, 124L)).build(); + private static final TableRow BQ_ARRAY_ROW = + new TableRow().set("ids", Arrays.asList("123", "124")); + private static final Row ROW_ROW = Row.withSchema(ROW_TYPE).addValues(FLAT_ROW).build(); + private static final TableRow BQ_ROW_ROW = new TableRow().set("row", BQ_FLAT_ROW); + private static final Row ARRAY_ROW_ROW = Row.withSchema(ARRAY_ROW_TYPE).addValues((Object) Arrays.asList(FLAT_ROW)).build(); + private static final TableRow BQ_ARRAY_ROW_ROW = + new TableRow().set("rows", Collections.singletonList(BQ_FLAT_ROW)); + + private static final TableSchema BQ_FLAT_TYPE = + new TableSchema().setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID)); + + private static final TableSchema BQ_ARRAY_TYPE = new TableSchema().setFields(Arrays.asList(IDS)); + + private static final TableSchema BQ_ROW_TYPE = new TableSchema().setFields(Arrays.asList(ROW)); + + private static final TableSchema BQ_ARRAY_ROW_TYPE = + new TableSchema().setFields(Arrays.asList(ROWS)); + @Test public void testToTableSchema_flat() { TableSchema schema = toTableSchema(FLAT_TYPE); @@ -140,6 +194,7 @@ public class BigQueryUtilsTest { @Test public void testToTableRow_flat() { TableRow row = toTableRow().apply(FLAT_ROW); + System.out.println(row); assertThat(row.size(), equalTo(5)); assertThat(row, hasEntry("id", "123")); @@ -290,4 +345,58 @@ public class BigQueryUtilsTest { return base.getMillis(); } } + + @Test + public void testFromTableSchema_flat() { + Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_FLAT_TYPE); + assertEquals(FLAT_TYPE, beamSchema); + } + + @Test + public void testFromTableSchema_array() { + Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ARRAY_TYPE); + assertEquals(ARRAY_TYPE, beamSchema); + } + + @Test + public void testFromTableSchema_row() { + Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ROW_TYPE); + assertEquals(ROW_TYPE, beamSchema); + } + + @Test + public void testFromTableSchema_array_row() { + Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ARRAY_ROW_TYPE); + assertEquals(ARRAY_ROW_TYPE, beamSchema); + } + + @Test + public void testToBeamRow_flat() { + Row beamRow = BigQueryUtils.toBeamRow(FLAT_TYPE, BQ_FLAT_ROW); + assertEquals(FLAT_ROW, beamRow); + } + + @Test + public void testToBeamRow_null() { + Row beamRow = BigQueryUtils.toBeamRow(FLAT_TYPE, BQ_NULL_FLAT_ROW); + assertEquals(NULL_FLAT_ROW, beamRow); + } + + @Test + public void testToBeamRow_array() { + Row beamRow = BigQueryUtils.toBeamRow(ARRAY_TYPE, BQ_ARRAY_ROW); + assertEquals(ARRAY_ROW, beamRow); + } + + @Test + public void testToBeamRow_row() { + Row beamRow = BigQueryUtils.toBeamRow(ROW_TYPE, BQ_ROW_ROW); + assertEquals(ROW_ROW, beamRow); + } + + @Test + public void testToBeamRow_array_row() { + Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW); + assertEquals(ARRAY_ROW_ROW, beamRow); + } }