This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b24e82c  Merge pull request #8620: [BEAM-6673] Add schema support to 
BigQuery reads
b24e82c is described below

commit b24e82cc40e732ea6cff023650cc2b83cf14f32a
Author: Charith Ellawala <chari...@users.noreply.github.com>
AuthorDate: Fri Jun 7 21:41:52 2019 +0100

    Merge pull request #8620: [BEAM-6673] Add schema support to BigQuery reads
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  91 ++++++++++---
 .../sdk/io/gcp/bigquery/BigQueryQuerySource.java   | 103 ++------------
 ...uerySource.java => BigQueryQuerySourceDef.java} | 107 +++++++--------
 .../bigquery/BigQuerySchemaRetrievalException.java |  25 ++++
 .../sdk/io/gcp/bigquery/BigQuerySourceDef.java     |  52 ++++++++
 .../sdk/io/gcp/bigquery/BigQueryTableSource.java   |  61 ++-------
 ...ableSource.java => BigQueryTableSourceDef.java} |  88 +++++-------
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java    | 131 +++++++++++++++++-
 .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java    | 148 +++++++++++++--------
 .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java     | 109 +++++++++++++++
 10 files changed, 598 insertions(+), 317 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index a66c903..5e6e59f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -83,6 +83,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -100,6 +101,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptors;
@@ -405,6 +407,14 @@ public class BigQueryIO {
     return read(new TableRowParser()).withCoder(TableRowJsonCoder.of());
   }
 
+  /** Like {@link #readTableRows()} but with {@link Schema} support. */
+  public static TypedRead<TableRow> readTableRowsWithSchema() {
+    return read(new TableRowParser())
+        .withCoder(TableRowJsonCoder.of())
+        .withBeamRowConverters(
+            BigQueryUtils.tableRowToBeamRow(), 
BigQueryUtils.tableRowFromBeamRow());
+  }
+
   /**
    * Reads from a BigQuery table or query and returns a {@link PCollection} 
with one element per
    * each row of the table or query result, parsed from the BigQuery AVRO 
format using the specified
@@ -593,6 +603,12 @@ public class BigQueryIO {
       DIRECT_READ,
     }
 
+    interface ToBeamRowFunction<T>
+        extends SerializableFunction<Schema, SerializableFunction<T, Row>> {}
+
+    interface FromBeamRowFunction<T>
+        extends SerializableFunction<Schema, SerializableFunction<Row, T>> {}
+
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
@@ -628,6 +644,12 @@ public class BigQueryIO {
       abstract Builder<T> setCoder(Coder<T> coder);
 
       abstract Builder<T> setKmsKey(String kmsKey);
+
+      @Experimental(Experimental.Kind.SCHEMAS)
+      abstract Builder<T> setToBeamRowFn(ToBeamRowFunction<T> toRowFn);
+
+      @Experimental(Experimental.Kind.SCHEMAS)
+      abstract Builder<T> setFromBeamRowFn(FromBeamRowFunction<T> fromRowFn);
     }
 
     @Nullable
@@ -669,6 +691,14 @@ public class BigQueryIO {
     @Nullable
     abstract String getKmsKey();
 
+    @Nullable
+    @Experimental(Experimental.Kind.SCHEMAS)
+    abstract ToBeamRowFunction<T> getToBeamRowFn();
+
+    @Nullable
+    @Experimental(Experimental.Kind.SCHEMAS)
+    abstract FromBeamRowFunction<T> getFromBeamRowFn();
+
     /**
      * An enumeration type for the priority of a query.
      *
@@ -709,27 +739,22 @@ public class BigQueryIO {
       }
     }
 
-    private BigQuerySourceBase<T> createSource(String jobUuid, Coder<T> coder) 
{
-      BigQuerySourceBase<T> source;
+    private BigQuerySourceDef createSourceDef() {
+      BigQuerySourceDef sourceDef;
       if (getQuery() == null) {
-        source =
-            BigQueryTableSource.create(
-                jobUuid, getTableProvider(), getBigQueryServices(), coder, 
getParseFn());
+        sourceDef = BigQueryTableSourceDef.create(getBigQueryServices(), 
getTableProvider());
       } else {
-        source =
-            BigQueryQuerySource.create(
-                jobUuid,
+        sourceDef =
+            BigQueryQuerySourceDef.create(
+                getBigQueryServices(),
                 getQuery(),
                 getFlattenResults(),
                 getUseLegacySql(),
-                getBigQueryServices(),
-                coder,
-                getParseFn(),
                 MoreObjects.firstNonNull(getQueryPriority(), 
QueryPriority.BATCH),
                 getQueryLocation(),
                 getKmsKey());
       }
-      return source;
+      return sourceDef;
     }
 
     private BigQueryStorageQuerySource<T> createStorageQuerySource(
@@ -840,6 +865,12 @@ public class BigQueryIO {
       }
       checkArgument(getParseFn() != null, "A parseFn is required");
 
+      // if both toRowFn and fromRowFn values are set, enable Beam schema 
support
+      boolean beamSchemaEnabled = false;
+      if (getToBeamRowFn() != null && getFromBeamRowFn() != null) {
+        beamSchemaEnabled = true;
+      }
+
       Pipeline p = input.getPipeline();
       final Coder<T> coder = inferCoder(p.getCoderRegistry());
 
@@ -852,6 +883,7 @@ public class BigQueryIO {
           "Invalid BigQueryIO.Read: Specifies table read options, "
               + "which only applies when using Method.DIRECT_READ");
 
+      final BigQuerySourceDef sourceDef = createSourceDef();
       final PCollectionView<String> jobIdTokenView;
       PCollection<String> jobIdTokenCollection;
       PCollection<T> rows;
@@ -862,7 +894,10 @@ public class BigQueryIO {
             p.apply("TriggerIdCreation", Create.of(staticJobUuid))
                 .apply("ViewId", View.asSingleton());
         // Apply the traditional Source model.
-        rows = 
p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid, coder)));
+        rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    sourceDef.toSource(staticJobUuid, coder, getParseFn())));
       } else {
         // Create a singleton job ID token at execution time.
         jobIdTokenCollection =
@@ -888,7 +923,8 @@ public class BigQueryIO {
                           @ProcessElement
                           public void processElement(ProcessContext c) throws 
Exception {
                             String jobUuid = c.element();
-                            BigQuerySourceBase<T> source = 
createSource(jobUuid, coder);
+                            BigQuerySourceBase<T> source =
+                                sourceDef.toSource(jobUuid, coder, 
getParseFn());
                             BigQueryOptions options =
                                 
c.getPipelineOptions().as(BigQueryOptions.class);
                             ExtractResult res = source.extractFiles(options);
@@ -919,7 +955,8 @@ public class BigQueryIO {
                                     BigQueryHelpers.fromJsonString(
                                         c.sideInput(schemaView), 
TableSchema.class);
                                 String jobUuid = c.sideInput(jobIdTokenView);
-                                BigQuerySourceBase<T> source = 
createSource(jobUuid, coder);
+                                BigQuerySourceBase<T> source =
+                                    sourceDef.toSource(jobUuid, coder, 
getParseFn());
                                 List<BoundedSource<T>> sources =
                                     source.createSources(
                                         ImmutableList.of(
@@ -966,7 +1003,18 @@ public class BigQueryIO {
               }
             }
           };
-      return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, 
jobIdTokenView));
+
+      rows = rows.apply(new PassThroughThenCleanup<>(cleanupOperation, 
jobIdTokenView));
+
+      if (beamSchemaEnabled) {
+        BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+        Schema beamSchema = sourceDef.getBeamSchema(bqOptions);
+        SerializableFunction<T, Row> toBeamRow = 
getToBeamRowFn().apply(beamSchema);
+        SerializableFunction<Row, T> fromBeamRow = 
getFromBeamRowFn().apply(beamSchema);
+
+        rows.setSchema(beamSchema, toBeamRow, fromBeamRow);
+      }
+      return rows;
     }
 
     private PCollection<T> expandForDirectRead(PBegin input, Coder<T> 
outputCoder) {
@@ -1201,6 +1249,17 @@ public class BigQueryIO {
       return toBuilder().setKmsKey(kmsKey).build();
     }
 
+    /**
+     * Sets the functions to convert elements to/from {@link Row} objects.
+     *
+     * <p>Setting these conversion functions is necessary to enable {@link 
Schema} support.
+     */
+    @Experimental(Experimental.Kind.SCHEMAS)
+    public TypedRead<T> withBeamRowConverters(
+        ToBeamRowFunction<T> toRowFn, FromBeamRowFunction<T> fromRowFn) {
+      return 
toBuilder().setToBeamRowFn(toRowFn).setFromBeamRowFn(fromRowFn).build();
+    }
+
     /** See {@link Read#from(String)}. */
     public TypedRead<T> from(String tableSpec) {
       return from(StaticValueProvider.of(tableSpec));
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 375cc4f..f2a70da 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -17,20 +17,10 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
-import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;
-import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.TableReference;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
@@ -45,115 +35,44 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> 
{
 
   static <T> BigQueryQuerySource<T> create(
       String stepUuid,
-      ValueProvider<String> query,
-      Boolean flattenResults,
-      Boolean useLegacySql,
+      BigQueryQuerySourceDef queryDef,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
-      QueryPriority priority,
-      String location,
-      String kmsKey) {
-    return new BigQueryQuerySource<>(
-        stepUuid,
-        query,
-        flattenResults,
-        useLegacySql,
-        bqServices,
-        coder,
-        parseFn,
-        priority,
-        location,
-        kmsKey);
+      SerializableFunction<SchemaAndRecord, T> parseFn) {
+    return new BigQueryQuerySource<>(stepUuid, queryDef, bqServices, coder, 
parseFn);
   }
 
-  private final ValueProvider<String> query;
-  private final Boolean flattenResults;
-  private final Boolean useLegacySql;
-  private final QueryPriority priority;
-  private final String location;
-  private final String kmsKey;
-
-  private transient AtomicReference<JobStatistics> dryRunJobStats;
+  private final BigQueryQuerySourceDef queryDef;
 
   private BigQueryQuerySource(
       String stepUuid,
-      ValueProvider<String> query,
-      Boolean flattenResults,
-      Boolean useLegacySql,
+      BigQueryQuerySourceDef queryDef,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
-      QueryPriority priority,
-      String location,
-      String kmsKey) {
+      SerializableFunction<SchemaAndRecord, T> parseFn) {
     super(stepUuid, bqServices, coder, parseFn);
-    this.query = checkNotNull(query, "query");
-    this.flattenResults = checkNotNull(flattenResults, "flattenResults");
-    this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
-    this.priority = priority;
-    this.location = location;
-    this.kmsKey = kmsKey;
-    dryRunJobStats = new AtomicReference<>();
-  }
-
-  /**
-   * Since the query helper reference is declared as transient, neither the 
AtomicReference nor the
-   * structure it refers to are persisted across serialization boundaries. The 
code below is
-   * resilient to the QueryHelper object disappearing in between method calls, 
but the reference
-   * object must be recreated at deserialization time.
-   */
-  private void readObject(ObjectInputStream in) throws ClassNotFoundException, 
IOException {
-    in.defaultReadObject();
-    dryRunJobStats = new AtomicReference<>();
+    this.queryDef = queryDef;
   }
 
   @Override
   public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-    return BigQueryQueryHelper.dryRunQueryIfNeeded(
-            bqServices,
-            options.as(BigQueryOptions.class),
-            dryRunJobStats,
-            query.get(),
-            flattenResults,
-            useLegacySql,
-            location)
-        .getQuery()
-        .getTotalBytesProcessed();
+    return queryDef.getEstimatedSizeBytes(options.as(BigQueryOptions.class));
   }
 
   @Override
   protected TableReference getTableToExtract(BigQueryOptions bqOptions)
       throws IOException, InterruptedException {
-    return BigQueryQueryHelper.executeQuery(
-        bqServices,
-        bqOptions,
-        dryRunJobStats,
-        stepUuid,
-        query.get(),
-        flattenResults,
-        useLegacySql,
-        priority,
-        location,
-        kmsKey);
+    return queryDef.getTableReference(bqOptions, stepUuid);
   }
 
   @Override
   protected void cleanupTempResource(BigQueryOptions bqOptions) throws 
Exception {
-    TableReference tableToRemove =
-        createTempTableReference(
-            bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), 
stepUuid));
-
-    DatasetService tableService = bqServices.getDatasetService(bqOptions);
-    LOG.info("Deleting temporary table with query results {}", tableToRemove);
-    tableService.deleteTable(tableToRemove);
-    LOG.info("Deleting temporary dataset with query results {}", 
tableToRemove.getDatasetId());
-    tableService.deleteDataset(tableToRemove.getProjectId(), 
tableToRemove.getDatasetId());
+    queryDef.cleanupTempResource(bqOptions, stepUuid);
   }
 
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-    builder.add(DisplayData.item("query", query));
+    builder.add(DisplayData.item("query", queryDef.getQuery()));
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
similarity index 67%
copy from 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
copy to 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
index 375cc4f..1f2366f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
@@ -23,74 +23,54 @@ import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** A {@link BigQuerySourceBase} for querying BigQuery tables. */
-@VisibleForTesting
-class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryQuerySource.class);
-
-  static <T> BigQueryQuerySource<T> create(
-      String stepUuid,
-      ValueProvider<String> query,
-      Boolean flattenResults,
-      Boolean useLegacySql,
-      BigQueryServices bqServices,
-      Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
-      QueryPriority priority,
-      String location,
-      String kmsKey) {
-    return new BigQueryQuerySource<>(
-        stepUuid,
-        query,
-        flattenResults,
-        useLegacySql,
-        bqServices,
-        coder,
-        parseFn,
-        priority,
-        location,
-        kmsKey);
-  }
+class BigQueryQuerySourceDef implements BigQuerySourceDef {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryQuerySourceDef.class);
 
+  private final BigQueryServices bqServices;
   private final ValueProvider<String> query;
   private final Boolean flattenResults;
   private final Boolean useLegacySql;
-  private final QueryPriority priority;
+  private final BigQueryIO.TypedRead.QueryPriority priority;
   private final String location;
   private final String kmsKey;
 
   private transient AtomicReference<JobStatistics> dryRunJobStats;
 
-  private BigQueryQuerySource(
-      String stepUuid,
+  static BigQueryQuerySourceDef create(
+      BigQueryServices bqServices,
       ValueProvider<String> query,
       Boolean flattenResults,
       Boolean useLegacySql,
+      BigQueryIO.TypedRead.QueryPriority priority,
+      String location,
+      String kmsKey) {
+    return new BigQueryQuerySourceDef(
+        bqServices, query, flattenResults, useLegacySql, priority, location, 
kmsKey);
+  }
+
+  private BigQueryQuerySourceDef(
       BigQueryServices bqServices,
-      Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
-      QueryPriority priority,
+      ValueProvider<String> query,
+      Boolean flattenResults,
+      Boolean useLegacySql,
+      BigQueryIO.TypedRead.QueryPriority priority,
       String location,
       String kmsKey) {
-    super(stepUuid, bqServices, coder, parseFn);
     this.query = checkNotNull(query, "query");
     this.flattenResults = checkNotNull(flattenResults, "flattenResults");
     this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
+    this.bqServices = bqServices;
     this.priority = priority;
     this.location = location;
     this.kmsKey = kmsKey;
@@ -108,11 +88,10 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> 
{
     dryRunJobStats = new AtomicReference<>();
   }
 
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+  long getEstimatedSizeBytes(BigQueryOptions bqOptions) throws Exception {
     return BigQueryQueryHelper.dryRunQueryIfNeeded(
             bqServices,
-            options.as(BigQueryOptions.class),
+            bqOptions,
             dryRunJobStats,
             query.get(),
             flattenResults,
@@ -122,8 +101,7 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
         .getTotalBytesProcessed();
   }
 
-  @Override
-  protected TableReference getTableToExtract(BigQueryOptions bqOptions)
+  TableReference getTableReference(BigQueryOptions bqOptions, String stepUuid)
       throws IOException, InterruptedException {
     return BigQueryQueryHelper.executeQuery(
         bqServices,
@@ -138,22 +116,47 @@ class BigQueryQuerySource<T> extends 
BigQuerySourceBase<T> {
         kmsKey);
   }
 
-  @Override
-  protected void cleanupTempResource(BigQueryOptions bqOptions) throws 
Exception {
+  void cleanupTempResource(BigQueryOptions bqOptions, String stepUuid) throws 
Exception {
     TableReference tableToRemove =
         createTempTableReference(
             bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), 
stepUuid));
 
-    DatasetService tableService = bqServices.getDatasetService(bqOptions);
+    BigQueryServices.DatasetService tableService = 
bqServices.getDatasetService(bqOptions);
     LOG.info("Deleting temporary table with query results {}", tableToRemove);
     tableService.deleteTable(tableToRemove);
     LOG.info("Deleting temporary dataset with query results {}", 
tableToRemove.getDatasetId());
     tableService.deleteDataset(tableToRemove.getProjectId(), 
tableToRemove.getDatasetId());
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public <T> BigQuerySourceBase<T> toSource(
+      String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, 
T> parseFn) {
+    return BigQueryQuerySource.create(stepUuid, this, bqServices, coder, 
parseFn);
+  }
+
+  /** {@inheritDoc} */
   @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-    builder.add(DisplayData.item("query", query));
+  public Schema getBeamSchema(BigQueryOptions bqOptions) {
+    try {
+      JobStatistics stats =
+          BigQueryQueryHelper.dryRunQueryIfNeeded(
+              bqServices,
+              bqOptions,
+              dryRunJobStats,
+              query.get(),
+              flattenResults,
+              useLegacySql,
+              location);
+      TableSchema tableSchema = stats.getQuery().getSchema();
+      return BigQueryUtils.fromTableSchema(tableSchema);
+    } catch (IOException | InterruptedException | NullPointerException e) {
+      throw new BigQuerySchemaRetrievalException(
+          "Exception while trying to retrieve schema of query", e);
+    }
+  }
+
+  ValueProvider<String> getQuery() {
+    return query;
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaRetrievalException.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaRetrievalException.java
new file mode 100644
index 0000000..2736e56
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaRetrievalException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+/** Exception to signal that BigQuery schema retrieval failed. */
+public class BigQuerySchemaRetrievalException extends RuntimeException {
+  BigQuerySchemaRetrievalException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
new file mode 100644
index 0000000..0f3de1d
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+/**
+ * Represents a source used for {@link BigQueryIO#read(SerializableFunction)}. 
Currently this could
+ * be either a table or a query. Direct read sources are not yet supported.
+ */
+interface BigQuerySourceDef extends Serializable {
+  /**
+   * Convert this source definition into a concrete source implementation.
+   *
+   * @param stepUuid Job UUID
+   * @param coder Coder
+   * @param parseFn Parse function
+   * @param <T> Type of the resulting PCollection
+   * @return An implementation of {@link BigQuerySourceBase}
+   */
+  <T> BigQuerySourceBase<T> toSource(
+      String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, 
T> parseFn);
+
+  /**
+   * Extract the Beam {@link Schema} corresponding to this source.
+   *
+   * @param bqOptions BigQueryOptions
+   * @return Beam schema of the source
+   * @throws BigQuerySchemaRetrievalException if schema retrieval fails
+   */
+  @Experimental(Experimental.Kind.SCHEMAS)
+  Schema getBeamSchema(BigQueryOptions bqOptions);
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index f8ea5e1..4334f7e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -17,22 +17,15 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
-
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,69 +36,41 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
 
   static <T> BigQueryTableSource<T> create(
       String stepUuid,
-      ValueProvider<TableReference> table,
+      BigQueryTableSourceDef tableDef,
       BigQueryServices bqServices,
       Coder<T> coder,
       SerializableFunction<SchemaAndRecord, T> parseFn) {
-    return new BigQueryTableSource<>(stepUuid, table, bqServices, coder, 
parseFn);
+    return new BigQueryTableSource<>(stepUuid, tableDef, bqServices, coder, 
parseFn);
   }
 
-  private final ValueProvider<String> jsonTable;
+  private final BigQueryTableSourceDef tableDef;
   private final AtomicReference<Long> tableSizeBytes;
 
   private BigQueryTableSource(
       String stepUuid,
-      ValueProvider<TableReference> table,
+      BigQueryTableSourceDef tableDef,
       BigQueryServices bqServices,
       Coder<T> coder,
       SerializableFunction<SchemaAndRecord, T> parseFn) {
     super(stepUuid, bqServices, coder, parseFn);
-    this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new 
TableRefToJson());
+    this.tableDef = tableDef;
     this.tableSizeBytes = new AtomicReference<>();
   }
 
   @Override
   protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws 
IOException {
-    TableReference tableReference =
-        BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), 
TableReference.class);
-    return setDefaultProjectIfAbsent(bqOptions, tableReference);
-  }
-
-  /**
-   * Sets the {@link TableReference#projectId} of the provided table reference 
to the id of the
-   * default project if the table reference does not have a project ID 
specified.
-   */
-  private TableReference setDefaultProjectIfAbsent(
-      BigQueryOptions bqOptions, TableReference tableReference) {
-    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
-      checkState(
-          !Strings.isNullOrEmpty(bqOptions.getProject()),
-          "No project ID set in %s or %s, cannot construct a complete %s",
-          TableReference.class.getSimpleName(),
-          BigQueryOptions.class.getSimpleName(),
-          TableReference.class.getSimpleName());
-      LOG.info(
-          "Project ID not set in {}. Using default project from {}.",
-          TableReference.class.getSimpleName(),
-          BigQueryOptions.class.getSimpleName());
-      tableReference.setProjectId(bqOptions.getProject());
-    }
-    return tableReference;
+    return tableDef.getTableReference(bqOptions);
   }
 
   @Override
   public synchronized long getEstimatedSizeBytes(PipelineOptions options) 
throws Exception {
     if (tableSizeBytes.get() == null) {
-      TableReference table =
-          setDefaultProjectIfAbsent(
-              options.as(BigQueryOptions.class),
-              BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), 
TableReference.class));
-
-      Table tableRef =
-          
bqServices.getDatasetService(options.as(BigQueryOptions.class)).getTable(table);
-      Long numBytes = tableRef.getNumBytes();
-      if (tableRef.getStreamingBuffer() != null) {
-        numBytes += 
tableRef.getStreamingBuffer().getEstimatedBytes().longValue();
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+      TableReference tableRef = tableDef.getTableReference(bqOptions);
+      Table table = bqServices.getDatasetService(bqOptions).getTable(tableRef);
+      Long numBytes = table.getNumBytes();
+      if (table.getStreamingBuffer() != null) {
+        numBytes += table.getStreamingBuffer().getEstimatedBytes().longValue();
       }
 
       tableSizeBytes.compareAndSet(null, numBytes);
@@ -121,6 +86,6 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-    builder.add(DisplayData.item("table", jsonTable));
+    builder.add(DisplayData.item("table", tableDef.getJsonTable()));
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
similarity index 52%
copy from 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
copy to 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
index f8ea5e1..07159af 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
@@ -20,52 +20,37 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
 
-import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** A {@link BigQuerySourceBase} for reading BigQuery tables. */
-@VisibleForTesting
-class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryTableSource.class);
-
-  static <T> BigQueryTableSource<T> create(
-      String stepUuid,
-      ValueProvider<TableReference> table,
-      BigQueryServices bqServices,
-      Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn) {
-    return new BigQueryTableSource<>(stepUuid, table, bqServices, coder, 
parseFn);
-  }
+class BigQueryTableSourceDef implements BigQuerySourceDef {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryTableSourceDef.class);
 
+  private final BigQueryServices bqServices;
   private final ValueProvider<String> jsonTable;
-  private final AtomicReference<Long> tableSizeBytes;
 
-  private BigQueryTableSource(
-      String stepUuid,
-      ValueProvider<TableReference> table,
-      BigQueryServices bqServices,
-      Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn) {
-    super(stepUuid, bqServices, coder, parseFn);
-    this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new 
TableRefToJson());
-    this.tableSizeBytes = new AtomicReference<>();
+  static BigQueryTableSourceDef create(
+      BigQueryServices bqServices, ValueProvider<TableReference> table) {
+    ValueProvider<String> jsonTable =
+        ValueProvider.NestedValueProvider.of(
+            checkNotNull(table, "table"), new 
BigQueryHelpers.TableRefToJson());
+    return new BigQueryTableSourceDef(bqServices, jsonTable);
   }
 
-  @Override
-  protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws 
IOException {
+  private BigQueryTableSourceDef(BigQueryServices bqServices, 
ValueProvider<String> jsonTable) {
+    this.bqServices = bqServices;
+    this.jsonTable = jsonTable;
+  }
+
+  TableReference getTableReference(BigQueryOptions bqOptions) throws 
IOException {
     TableReference tableReference =
         BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), 
TableReference.class);
     return setDefaultProjectIfAbsent(bqOptions, tableReference);
@@ -93,34 +78,27 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
     return tableReference;
   }
 
-  @Override
-  public synchronized long getEstimatedSizeBytes(PipelineOptions options) 
throws Exception {
-    if (tableSizeBytes.get() == null) {
-      TableReference table =
-          setDefaultProjectIfAbsent(
-              options.as(BigQueryOptions.class),
-              BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), 
TableReference.class));
-
-      Table tableRef =
-          
bqServices.getDatasetService(options.as(BigQueryOptions.class)).getTable(table);
-      Long numBytes = tableRef.getNumBytes();
-      if (tableRef.getStreamingBuffer() != null) {
-        numBytes += 
tableRef.getStreamingBuffer().getEstimatedBytes().longValue();
-      }
-
-      tableSizeBytes.compareAndSet(null, numBytes);
-    }
-    return tableSizeBytes.get();
+  ValueProvider<String> getJsonTable() {
+    return jsonTable;
   }
 
+  /** {@inheritDoc} */
   @Override
-  protected void cleanupTempResource(BigQueryOptions bqOptions) throws 
Exception {
-    // Do nothing.
+  public <T> BigQuerySourceBase<T> toSource(
+      String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, 
T> parseFn) {
+    return BigQueryTableSource.create(stepUuid, this, bqServices, coder, 
parseFn);
   }
 
+  /** {@inheritDoc} */
   @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-    builder.add(DisplayData.item("table", jsonTable));
+  public Schema getBeamSchema(BigQueryOptions bqOptions) {
+    try {
+      TableReference tableRef = getTableReference(bqOptions);
+      TableSchema tableSchema =
+          
bqServices.getDatasetService(bqOptions).getTable(tableRef).getSchema();
+      return BigQueryUtils.fromTableSchema(tableSchema);
+    } catch (IOException | InterruptedException | NullPointerException e) {
+      throw new BigQuerySchemaRetrievalException("Exception while trying to 
retrieve schema", e);
+    }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index bd1fda3..1a87875 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -31,10 +31,12 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.LogicalTypes;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -130,7 +132,7 @@ public class BigQueryUtils {
           .put("SqlDateType", StandardSQLTypeName.DATE)
           .put("SqlTimeType", StandardSQLTypeName.TIME)
           .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
-          .put("SqlTimestampWithLocalTzType", StandardSQLTypeName.TIMESTAMP)
+          .put("SqlTimestampWithLocalTzType", StandardSQLTypeName.DATETIME)
           .put("SqlCharType", StandardSQLTypeName.STRING)
           .build();
 
@@ -149,6 +151,79 @@ public class BigQueryUtils {
     return BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName());
   }
 
+  /**
+   * Get the Beam {@link FieldType} from a BigQuery type name.
+   *
+   * <p>Supports both standard and legacy SQL types.
+   *
+   * @param typeName Name of the type
+   * @param nestedFields Nested fields for the given type (eg. RECORD type)
+   * @return Corresponding Beam {@link FieldType}
+   */
+  private static FieldType fromTableFieldSchemaType(
+      String typeName, List<TableFieldSchema> nestedFields) {
+    switch (typeName) {
+      case "STRING":
+        return FieldType.STRING;
+      case "BYTES":
+        return FieldType.BYTES;
+      case "INT64":
+      case "INTEGER":
+        return FieldType.INT64;
+      case "FLOAT64":
+      case "FLOAT":
+        return FieldType.DOUBLE;
+      case "BOOL":
+      case "BOOLEAN":
+        return FieldType.BOOLEAN;
+      case "TIMESTAMP":
+        return FieldType.DATETIME;
+      case "TIME":
+        return FieldType.logicalType(
+            new LogicalTypes.PassThroughLogicalType<Instant>(
+                "SqlTimeType", "", FieldType.DATETIME) {});
+      case "DATE":
+        return FieldType.logicalType(
+            new LogicalTypes.PassThroughLogicalType<Instant>(
+                "SqlDateType", "", FieldType.DATETIME) {});
+      case "DATETIME":
+        return FieldType.logicalType(
+            new LogicalTypes.PassThroughLogicalType<Instant>(
+                "SqlTimestampWithLocalTzType", "", FieldType.DATETIME) {});
+      case "STRUCT":
+      case "RECORD":
+        Schema rowSchema = fromTableFieldSchema(nestedFields);
+        return FieldType.row(rowSchema);
+      default:
+        throw new UnsupportedOperationException(
+            "Converting BigQuery type " + typeName + " to Beam type is 
unsupported");
+    }
+  }
+
+  private static Schema fromTableFieldSchema(List<TableFieldSchema> 
tableFieldSchemas) {
+    Schema.Builder schemaBuilder = Schema.builder();
+    for (TableFieldSchema tableFieldSchema : tableFieldSchemas) {
+      FieldType fieldType =
+          fromTableFieldSchemaType(tableFieldSchema.getType(), 
tableFieldSchema.getFields());
+
+      Optional<Mode> fieldMode = 
Optional.ofNullable(tableFieldSchema.getMode()).map(Mode::valueOf);
+      if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
+        fieldType = FieldType.array(fieldType);
+      }
+
+      // if the mode is not defined or if it is set to NULLABLE, then the 
field is nullable
+      boolean nullable =
+          !fieldMode.isPresent() || fieldMode.filter(m -> m == 
Mode.NULLABLE).isPresent();
+      Field field = Field.of(tableFieldSchema.getName(), 
fieldType).withNullable(nullable);
+      if (tableFieldSchema.getDescription() != null
+          && !"".equals(tableFieldSchema.getDescription())) {
+        field = field.withDescription(tableFieldSchema.getDescription());
+      }
+      schemaBuilder.addField(field);
+    }
+    return schemaBuilder.build();
+  }
+
   private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
     List<TableFieldSchema> fields = new ArrayList<>(schema.getFieldCount());
     for (Field schemaField : schema.getFields()) {
@@ -188,6 +263,25 @@ public class BigQueryUtils {
     return new TableSchema().setFields(toTableFieldSchema(schema));
   }
 
+  /** Convert a BigQuery {@link TableSchema} to a Beam {@link Schema}. */
+  public static Schema fromTableSchema(TableSchema tableSchema) {
+    return fromTableFieldSchema(tableSchema.getFields());
+  }
+
+  private static final BigQueryIO.TypedRead.ToBeamRowFunction<TableRow>
+      TABLE_ROW_TO_BEAM_ROW_FUNCTION = beamSchema -> (TableRow tr) -> 
toBeamRow(beamSchema, tr);
+
+  public static final BigQueryIO.TypedRead.ToBeamRowFunction<TableRow> 
tableRowToBeamRow() {
+    return TABLE_ROW_TO_BEAM_ROW_FUNCTION;
+  }
+
+  private static final BigQueryIO.TypedRead.FromBeamRowFunction<TableRow>
+      TABLE_ROW_FROM_BEAM_ROW_FUNCTION = ignored -> BigQueryUtils::toTableRow;
+
+  public static final BigQueryIO.TypedRead.FromBeamRowFunction<TableRow> 
tableRowFromBeamRow() {
+    return TABLE_ROW_FROM_BEAM_ROW_FUNCTION;
+  }
+
   private static final SerializableFunction<Row, TableRow> ROW_TO_TABLE_ROW =
       new ToTableRow(SerializableFunctions.identity());
 
@@ -288,6 +382,36 @@ public class BigQueryUtils {
   }
 
   /**
+   * Tries to convert a JSON {@link TableRow} from BigQuery into a Beam {@link 
Row}.
+   *
+   * <p>Only supports basic types and arrays. Doesn't support date types or 
structs.
+   */
+  public static Row toBeamRow(Schema rowSchema, TableRow jsonBqRow) {
+    // TODO deprecate toBeamRow(Schema, TableSchema, TableRow) function in 
favour of this function.
+    // This function attempts to convert TableRows without  having access to 
the
+    // corresponding TableSchema because:
+    // 1. TableSchema contains redundant information already available in the 
Schema object.
+    // 2. TableSchema objects are not serializable and are therefore harder to 
propagate through a
+    // pipeline.
+    return rowSchema.getFields().stream()
+        .map(field -> toBeamRowFieldValue(field, 
jsonBqRow.get(field.getName())))
+        .collect(toRow(rowSchema));
+  }
+
+  private static Object toBeamRowFieldValue(Field field, Object bqValue) {
+    if (bqValue == null) {
+      if (field.getType().getNullable()) {
+        return null;
+      } else {
+        throw new IllegalArgumentException(
+            "Received null value for non-nullable field " + field.getName());
+      }
+    }
+
+    return toBeamValue(field.getType(), bqValue);
+  }
+
+  /**
    * Tries to parse the JSON {@link TableRow} from BigQuery.
    *
    * <p>Only supports basic types and arrays. Doesn't support date types.
@@ -320,11 +444,14 @@ public class BigQueryUtils {
     if (jsonBQValue instanceof List) {
       return ((List<Object>) jsonBQValue)
           .stream()
-              .map(v -> ((Map<String, Object>) v).get("v"))
               .map(v -> toBeamValue(fieldType.getCollectionElementType(), v))
               .collect(toList());
     }
 
+    if (jsonBQValue instanceof TableRow) {
+      return toBeamRow(fieldType.getRowSchema(), (TableRow) jsonBQValue);
+    }
+
     throw new UnsupportedOperationException(
         "Converting BigQuery type '"
             + jsonBQValue.getClass()
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 883bbdd..c74a0d6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -51,6 +51,8 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.Select;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -65,6 +67,7 @@ import 
org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
@@ -405,6 +408,65 @@ public class BigQueryIOReadTest implements Serializable {
   }
 
   @Test
+  public void testReadTableWithSchema() throws IOException, 
InterruptedException {
+    // setup
+    Table someTable = new Table();
+    someTable.setSchema(
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new 
TableFieldSchema().setName("number").setType("INTEGER"))));
+    someTable.setTableReference(
+        new TableReference()
+            .setProjectId("non-executing-project")
+            .setDatasetId("schema_dataset")
+            .setTableId("schema_table"));
+    someTable.setNumBytes(1024L * 1024L);
+    FakeDatasetService fakeDatasetService = new FakeDatasetService();
+    fakeDatasetService.createDataset("non-executing-project", 
"schema_dataset", "", "", null);
+    fakeDatasetService.createTable(someTable);
+
+    List<TableRow> records =
+        Lists.newArrayList(
+            new TableRow().set("name", "a").set("number", 1L),
+            new TableRow().set("name", "b").set("number", 2L),
+            new TableRow().set("name", "c").set("number", 3L));
+
+    fakeDatasetService.insertAll(someTable.getTableReference(), records, null);
+
+    FakeBigQueryServices fakeBqServices =
+        new FakeBigQueryServices()
+            .withJobService(new FakeJobService())
+            .withDatasetService(fakeDatasetService);
+
+    // test
+    BigQueryIO.TypedRead<TableRow> read =
+        BigQueryIO.readTableRowsWithSchema()
+            .from("non-executing-project:schema_dataset.schema_table")
+            .withTestServices(fakeBqServices)
+            .withoutValidation();
+
+    PCollection<TableRow> bqRows = p.apply(read);
+
+    Schema expectedSchema =
+        Schema.of(
+            Schema.Field.of("name", 
Schema.FieldType.STRING).withNullable(true),
+            Schema.Field.of("number", 
Schema.FieldType.INT64).withNullable(true));
+    assertEquals(expectedSchema, bqRows.getSchema());
+
+    PCollection<Row> output = bqRows.apply(Select.fieldNames("name", 
"number"));
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(
+                Row.withSchema(expectedSchema).addValues("a", 1L).build(),
+                Row.withSchema(expectedSchema).addValues("b", 2L).build(),
+                Row.withSchema(expectedSchema).addValues("c", 3L).build()));
+
+    p.run();
+  }
+
+  @Test
   public void testBuildSourceDisplayDataTable() {
     String tableSpec = "project:dataset.tableid";
 
@@ -509,12 +571,8 @@ public class BigQueryIOReadTest implements Serializable {
 
     String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource =
-        BigQueryTableSource.create(
-            stepUuid,
-            ValueProvider.StaticValueProvider.of(table),
-            fakeBqServices,
-            TableRowJsonCoder.of(),
-            BigQueryIO.TableRowParser.INSTANCE);
+        BigQueryTableSourceDef.create(fakeBqServices, 
ValueProvider.StaticValueProvider.of(table))
+            .toSource(stepUuid, TableRowJsonCoder.of(), 
BigQueryIO.TableRowParser.INSTANCE);
 
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());
@@ -562,12 +620,8 @@ public class BigQueryIOReadTest implements Serializable {
 
     String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource =
-        BigQueryTableSource.create(
-            stepUuid,
-            ValueProvider.StaticValueProvider.of(table),
-            fakeBqServices,
-            TableRowJsonCoder.of(),
-            BigQueryIO.TableRowParser.INSTANCE);
+        BigQueryTableSourceDef.create(fakeBqServices, 
ValueProvider.StaticValueProvider.of(table))
+            .toSource(stepUuid, TableRowJsonCoder.of(), 
BigQueryIO.TableRowParser.INSTANCE);
 
     PipelineOptions options = PipelineOptionsFactory.create();
     assertEquals(108, bqSource.getEstimatedSizeBytes(options));
@@ -600,12 +654,8 @@ public class BigQueryIOReadTest implements Serializable {
 
     String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource =
-        BigQueryTableSource.create(
-            stepUuid,
-            ValueProvider.StaticValueProvider.of(table),
-            fakeBqServices,
-            TableRowJsonCoder.of(),
-            BigQueryIO.TableRowParser.INSTANCE);
+        BigQueryTableSourceDef.create(fakeBqServices, 
ValueProvider.StaticValueProvider.of(table))
+            .toSource(stepUuid, TableRowJsonCoder.of(), 
BigQueryIO.TableRowParser.INSTANCE);
 
     PipelineOptions options = PipelineOptionsFactory.create();
     assertEquals(118, bqSource.getEstimatedSizeBytes(options));
@@ -621,18 +671,16 @@ public class BigQueryIOReadTest implements Serializable {
     bqOptions.setProject("project");
     String stepUuid = "testStepUuid";
 
-    BigQueryQuerySource<TableRow> bqSource =
-        BigQueryQuerySource.create(
-            stepUuid,
-            ValueProvider.StaticValueProvider.of(queryString),
-            true /* flattenResults */,
-            true /* useLegacySql */,
-            fakeBqServices,
-            TableRowJsonCoder.of(),
-            BigQueryIO.TableRowParser.INSTANCE,
-            QueryPriority.BATCH,
-            null,
-            null);
+    BigQuerySourceBase<TableRow> bqSource =
+        BigQueryQuerySourceDef.create(
+                fakeBqServices,
+                ValueProvider.StaticValueProvider.of(queryString),
+                true, /* flattenResults */
+                true, /* useLegacySql */
+                QueryPriority.BATCH,
+                null,
+                null)
+            .toSource(stepUuid, TableRowJsonCoder.of(), 
BigQueryIO.TableRowParser.INSTANCE);
 
     fakeJobService.expectDryRunQuery(
         bqOptions.getProject(),
@@ -697,17 +745,15 @@ public class BigQueryIOReadTest implements Serializable {
                     .setReferencedTables(ImmutableList.of(sourceTableRef, 
tempTableReference))));
 
     BoundedSource<TableRow> bqSource =
-        BigQueryQuerySource.create(
-            stepUuid,
-            ValueProvider.StaticValueProvider.of(encodedQuery),
-            true /* flattenResults */,
-            true /* useLegacySql */,
-            fakeBqServices,
-            TableRowJsonCoder.of(),
-            BigQueryIO.TableRowParser.INSTANCE,
-            QueryPriority.BATCH,
-            null,
-            null);
+        BigQueryQuerySourceDef.create(
+                fakeBqServices,
+                ValueProvider.StaticValueProvider.of(encodedQuery),
+                true /* flattenResults */,
+                true /* useLegacySql */,
+                QueryPriority.BATCH,
+                null,
+                null)
+            .toSource(stepUuid, TableRowJsonCoder.of(), 
BigQueryIO.TableRowParser.INSTANCE);
 
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());
 
@@ -764,17 +810,15 @@ public class BigQueryIOReadTest implements Serializable {
                     .setReferencedTables(ImmutableList.of())));
 
     BoundedSource<TableRow> bqSource =
-        BigQueryQuerySource.create(
-            stepUuid,
-            ValueProvider.StaticValueProvider.of(encodedQuery),
-            true /* flattenResults */,
-            true /* useLegacySql */,
-            fakeBqServices,
-            TableRowJsonCoder.of(),
-            BigQueryIO.TableRowParser.INSTANCE,
-            QueryPriority.BATCH,
-            null,
-            null);
+        BigQueryQuerySourceDef.create(
+                fakeBqServices,
+                ValueProvider.StaticValueProvider.of(encodedQuery),
+                true /* flattenResults */,
+                true /* useLegacySql */,
+                QueryPriority.BATCH,
+                null,
+                null)
+            .toSource(stepUuid, TableRowJsonCoder.of(), 
BigQueryIO.TableRowParser.INSTANCE);
 
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index a23a3ec..3315598 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.collection.IsMapContaining.hasEntry;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertThrows;
 
@@ -31,12 +32,14 @@ import 
com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions.TruncateTimestamps;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.values.Row;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
+import org.joda.time.chrono.ISOChronology;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -83,22 +86,73 @@ public class BigQueryUtilsTest {
           .setType(StandardSQLTypeName.INT64.toString())
           .setMode(Mode.REPEATED.toString());
 
+  private static final TableFieldSchema ROW =
+      new TableFieldSchema()
+          .setName("row")
+          .setType(StandardSQLTypeName.STRUCT.toString())
+          .setMode(Mode.NULLABLE.toString())
+          .setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID));
+
+  private static final TableFieldSchema ROWS =
+      new TableFieldSchema()
+          .setName("rows")
+          .setType(StandardSQLTypeName.STRUCT.toString())
+          .setMode(Mode.REPEATED.toString())
+          .setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID));
+
   private static final Row FLAT_ROW =
       Row.withSchema(FLAT_TYPE)
           .addValues(123L, 123.456, "test", new DateTime(123456), false)
           .build();
 
+  private static final TableRow BQ_FLAT_ROW =
+      new TableRow()
+          .set("id", "123")
+          .set("value", "123.456")
+          .set("name", "test")
+          .set(
+              "timestamp",
+              String.valueOf(
+                  new DateTime(123456L, 
ISOChronology.getInstanceUTC()).getMillis() / 1000.0D))
+          .set("valid", "false");
+
   private static final Row NULL_FLAT_ROW =
       Row.withSchema(FLAT_TYPE).addValues(null, null, null, null, 
null).build();
 
+  private static final TableRow BQ_NULL_FLAT_ROW =
+      new TableRow()
+          .set("id", null)
+          .set("value", null)
+          .set("name", null)
+          .set("timestamp", null)
+          .set("valid", null);
+
   private static final Row ARRAY_ROW =
       Row.withSchema(ARRAY_TYPE).addValues((Object) Arrays.asList(123L, 
124L)).build();
 
+  private static final TableRow BQ_ARRAY_ROW =
+      new TableRow().set("ids", Arrays.asList("123", "124"));
+
   private static final Row ROW_ROW = 
Row.withSchema(ROW_TYPE).addValues(FLAT_ROW).build();
 
+  private static final TableRow BQ_ROW_ROW = new TableRow().set("row", 
BQ_FLAT_ROW);
+
   private static final Row ARRAY_ROW_ROW =
       Row.withSchema(ARRAY_ROW_TYPE).addValues((Object) 
Arrays.asList(FLAT_ROW)).build();
 
+  private static final TableRow BQ_ARRAY_ROW_ROW =
+      new TableRow().set("rows", Collections.singletonList(BQ_FLAT_ROW));
+
+  private static final TableSchema BQ_FLAT_TYPE =
+      new TableSchema().setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, 
VALID));
+
+  private static final TableSchema BQ_ARRAY_TYPE = new 
TableSchema().setFields(Arrays.asList(IDS));
+
+  private static final TableSchema BQ_ROW_TYPE = new 
TableSchema().setFields(Arrays.asList(ROW));
+
+  private static final TableSchema BQ_ARRAY_ROW_TYPE =
+      new TableSchema().setFields(Arrays.asList(ROWS));
+
   @Test
   public void testToTableSchema_flat() {
     TableSchema schema = toTableSchema(FLAT_TYPE);
@@ -140,6 +194,7 @@ public class BigQueryUtilsTest {
   @Test
   public void testToTableRow_flat() {
     TableRow row = toTableRow().apply(FLAT_ROW);
+    System.out.println(row);
 
     assertThat(row.size(), equalTo(5));
     assertThat(row, hasEntry("id", "123"));
@@ -290,4 +345,58 @@ public class BigQueryUtilsTest {
       return base.getMillis();
     }
   }
+
+  @Test
+  public void testFromTableSchema_flat() {
+    Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_FLAT_TYPE);
+    assertEquals(FLAT_TYPE, beamSchema);
+  }
+
+  @Test
+  public void testFromTableSchema_array() {
+    Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ARRAY_TYPE);
+    assertEquals(ARRAY_TYPE, beamSchema);
+  }
+
+  @Test
+  public void testFromTableSchema_row() {
+    Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ROW_TYPE);
+    assertEquals(ROW_TYPE, beamSchema);
+  }
+
+  @Test
+  public void testFromTableSchema_array_row() {
+    Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ARRAY_ROW_TYPE);
+    assertEquals(ARRAY_ROW_TYPE, beamSchema);
+  }
+
+  @Test
+  public void testToBeamRow_flat() {
+    Row beamRow = BigQueryUtils.toBeamRow(FLAT_TYPE, BQ_FLAT_ROW);
+    assertEquals(FLAT_ROW, beamRow);
+  }
+
+  @Test
+  public void testToBeamRow_null() {
+    Row beamRow = BigQueryUtils.toBeamRow(FLAT_TYPE, BQ_NULL_FLAT_ROW);
+    assertEquals(NULL_FLAT_ROW, beamRow);
+  }
+
+  @Test
+  public void testToBeamRow_array() {
+    Row beamRow = BigQueryUtils.toBeamRow(ARRAY_TYPE, BQ_ARRAY_ROW);
+    assertEquals(ARRAY_ROW, beamRow);
+  }
+
+  @Test
+  public void testToBeamRow_row() {
+    Row beamRow = BigQueryUtils.toBeamRow(ROW_TYPE, BQ_ROW_ROW);
+    assertEquals(ROW_ROW, beamRow);
+  }
+
+  @Test
+  public void testToBeamRow_array_row() {
+    Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW);
+    assertEquals(ARRAY_ROW_ROW, beamRow);
+  }
 }

Reply via email to