Set the Project of a Table Reference at Runtime Instead of using the project at job submission time, use the project at job execution time.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b9e65779 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b9e65779 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b9e65779 Branch: refs/heads/jstorm-runner Commit: b9e657790c69ae4f9eead893655c595e34ded4da Parents: a25c7d3 Author: Thomas Groh <[email protected]> Authored: Mon Apr 17 15:41:57 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon Apr 17 18:20:07 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 59 +++++++------------- .../io/gcp/bigquery/BigQueryTableSource.java | 30 +++++++++- 2 files changed, 50 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b9e65779/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index f5f93b3..9753da5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -410,7 +410,7 @@ public class BigQueryIO { } } - ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions); + ValueProvider<TableReference> table = getTableProvider(); checkState( table == null || getQuery() == null, @@ -428,6 +428,12 @@ public class BigQueryIO { getUseLegacySql() == null, "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect" + " preference, which only applies to queries"); + if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) { + LOG.info( + "Project of {} not set. The value of {}.getProject() at execution time will be used.", + TableReference.class.getSimpleName(), + BigQueryOptions.class.getSimpleName()); + } } else /* query != null */ { checkState( getFlattenResults() != null, "flattenResults should not be null if query is set"); @@ -495,10 +501,13 @@ public class BigQueryIO { extractDestinationDir, getBigQueryServices()); } else { - ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions); - source = BigQueryTableSource.create( - jobIdToken, inputTable, extractDestinationDir, getBigQueryServices(), - StaticValueProvider.of(executingProject)); + source = + BigQueryTableSource.create( + jobIdToken, + getTableProvider(), + extractDestinationDir, + getBigQueryServices(), + StaticValueProvider.of(executingProject)); } PassThroughThenCleanup.CleanupOperation cleanupOperation = new PassThroughThenCleanup.CleanupOperation() { @@ -506,12 +515,12 @@ public class BigQueryIO { void cleanup(PipelineOptions options) throws Exception { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - JobReference jobRef = new JobReference() - .setProjectId(executingProject) - .setJobId(getExtractJobId(jobIdToken)); + JobReference jobRef = + new JobReference() + .setProjectId(executingProject) + .setJobId(getExtractJobId(jobIdToken)); - Job extractJob = getBigQueryServices().getJobService(bqOptions) - .getJob(jobRef); + Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef); Collection<String> extractFiles = null; if (extractJob != null) { @@ -526,7 +535,8 @@ public class BigQueryIO { if (extractFiles != null && !extractFiles.isEmpty()) { new GcsUtilFactory().create(options).remove(extractFiles); } - }}; + } + }; return input.getPipeline() .apply(org.apache.beam.sdk.io.Read.from(source)) .setCoder(getDefaultOutputCoder()) @@ -557,33 +567,6 @@ public class BigQueryIO { /** * Returns the table to read, or {@code null} if reading from a query instead. - * - * <p>If the table's project is not specified, use the executing project. - */ - @Nullable ValueProvider<TableReference> getTableWithDefaultProject( - BigQueryOptions bqOptions) { - ValueProvider<TableReference> table = getTableProvider(); - if (table == null) { - return table; - } - if (!table.isAccessible()) { - LOG.info("Using a dynamic value for table input. This must contain a project" - + " in the table reference: {}", table); - return table; - } - if (Strings.isNullOrEmpty(table.get().getProjectId())) { - // If user does not specify a project we assume the table to be located in - // the default project. - TableReference tableRef = table.get(); - tableRef.setProjectId(bqOptions.getProject()); - return NestedValueProvider.of(StaticValueProvider.of( - BigQueryHelpers.toJsonString(tableRef)), new JsonTableRefToTableRef()); - } - return table; - } - - /** - * Returns the table to read, or {@code null} if reading from a query instead. */ @Nullable public ValueProvider<TableReference> getTableProvider() { http://git-wip-us.apache.org/repos/asf/beam/blob/b9e65779/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java index cbd5781..22aba64 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java @@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; @@ -32,12 +33,15 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A {@link BigQuerySourceBase} for reading BigQuery tables. */ @VisibleForTesting class BigQueryTableSource extends BigQuerySourceBase { + private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableSource.class); static BigQueryTableSource create( ValueProvider<String> jobIdToken, @@ -66,7 +70,31 @@ class BigQueryTableSource extends BigQuerySourceBase { @Override protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException { checkState(jsonTable.isAccessible()); - return BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class); + TableReference tableReference = + BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class); + return setDefaultProjectIfAbsent(bqOptions, tableReference); + } + + /** + * Sets the {@link TableReference#projectId} of the provided table reference to the id of the + * default project if the table reference does not have a project ID specified. + */ + private TableReference setDefaultProjectIfAbsent( + BigQueryOptions bqOptions, TableReference tableReference) { + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + checkState( + !Strings.isNullOrEmpty(bqOptions.getProject()), + "No project ID set in %s or %s, cannot construct a complete %s", + TableReference.class.getSimpleName(), + BigQueryOptions.class.getSimpleName(), + TableReference.class.getSimpleName()); + LOG.info( + "Project ID not set in {}. Using default project from {}.", + TableReference.class.getSimpleName(), + BigQueryOptions.class.getSimpleName()); + tableReference.setProjectId(bqOptions.getProject()); + } + return tableReference; } @Override
