Condense BigQueryIO.Read.Bound into BigQueryIO.Read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/825338aa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/825338aa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/825338aa Branch: refs/heads/master Commit: 825338aaa5d7e5ead1afa13f63c65fb316e1aa6a Parents: 30f3634 Author: Eugene Kirpichov <[email protected]> Authored: Thu Mar 2 17:15:47 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Mar 14 15:54:22 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 690 +++++++++---------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 117 ++-- 2 files changed, 359 insertions(+), 448 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/825338aa/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 2902c2b..f6c8575 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -39,7 +39,6 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.MoreObjects; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -454,448 +453,379 @@ public class BigQueryIO { * } * }}</pre> */ - public static class Read { + public static class Read extends PTransform<PBegin, PCollection<TableRow>> { + @Nullable final ValueProvider<String> jsonTableRef; + @Nullable final ValueProvider<String> query; /** * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or * {@code "[dataset_id].[table_id]"} for tables within the current project. */ - public static Bound from(String tableSpec) { - return new Bound().from(StaticValueProvider.of(tableSpec)); + public static Read from(String tableSpec) { + return new Read().from(StaticValueProvider.of(tableSpec)); } /** * Same as {@code from(String)}, but with a {@link ValueProvider}. */ - public static Bound from(ValueProvider<String> tableSpec) { - return new Bound().from(tableSpec); + public static Read from(ValueProvider<String> tableSpec) { + return new Read().from(tableSpec); } /** * Reads results received after executing the given query. */ - public static Bound fromQuery(String query) { - return new Bound().fromQuery(StaticValueProvider.of(query)); + public static Read fromQuery(String query) { + return new Read().fromQuery(StaticValueProvider.of(query)); } /** * Same as {@code from(String)}, but with a {@link ValueProvider}. */ - public static Bound fromQuery(ValueProvider<String> query) { - return new Bound().fromQuery(query); + public static Read fromQuery(ValueProvider<String> query) { + return new Read().fromQuery(query); } /** * Reads a BigQuery table specified as a {@link TableReference} object. */ - public static Bound from(TableReference table) { - return new Bound().from(table); + public static Read from(TableReference table) { + return new Read().from(table); } /** - * Disables BigQuery table validation, which is enabled by default. + * Disable validation that the table exists or the query succeeds prior to pipeline + * submission. Basic validation (such as ensuring that a query or table is specified) still + * occurs. */ - public static Bound withoutValidation() { - return new Bound().withoutValidation(); + final boolean validate; + @Nullable final Boolean flattenResults; + @Nullable final Boolean useLegacySql; + @Nullable BigQueryServices bigQueryServices; + + @VisibleForTesting @Nullable String stepUuid; + @VisibleForTesting @Nullable ValueProvider<String> jobUuid; + + private static final String QUERY_VALIDATION_FAILURE_ERROR = + "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the" + + " pipeline, This validation can be disabled using #withoutValidation."; + + private Read() { + this( + null /* name */, + null /* query */, + null /* jsonTableRef */, + true /* validate */, + null /* flattenResults */, + null /* useLegacySql */, + null /* bigQueryServices */); + } + + private Read( + String name, @Nullable ValueProvider<String> query, + @Nullable ValueProvider<String> jsonTableRef, boolean validate, + @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql, + @Nullable BigQueryServices bigQueryServices) { + super(name); + this.jsonTableRef = jsonTableRef; + this.query = query; + this.validate = validate; + this.flattenResults = flattenResults; + this.useLegacySql = useLegacySql; + this.bigQueryServices = bigQueryServices; } /** - * A {@link PTransform} that reads from a BigQuery table and returns a bounded - * {@link PCollection} of {@link TableRow TableRows}. + * Disable validation that the table exists or the query succeeds prior to pipeline + * submission. Basic validation (such as ensuring that a query or table is specified) still + * occurs. */ - public static class Bound extends PTransform<PBegin, PCollection<TableRow>> { - @Nullable final ValueProvider<String> jsonTableRef; - @Nullable final ValueProvider<String> query; - - /** - * Disable validation that the table exists or the query succeeds prior to pipeline - * submission. Basic validation (such as ensuring that a query or table is specified) still - * occurs. - */ - final boolean validate; - @Nullable final Boolean flattenResults; - @Nullable final Boolean useLegacySql; - @Nullable BigQueryServices bigQueryServices; - - @VisibleForTesting @Nullable String stepUuid; - @VisibleForTesting @Nullable ValueProvider<String> jobUuid; - - private static final String QUERY_VALIDATION_FAILURE_ERROR = - "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the" - + " pipeline, This validation can be disabled using #withoutValidation."; - - private Bound() { - this( - null /* name */, - null /* query */, - null /* jsonTableRef */, - true /* validate */, - null /* flattenResults */, - null /* useLegacySql */, - null /* bigQueryServices */); - } - - private Bound( - String name, @Nullable ValueProvider<String> query, - @Nullable ValueProvider<String> jsonTableRef, boolean validate, - @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql, - @Nullable BigQueryServices bigQueryServices) { - super(name); - this.jsonTableRef = jsonTableRef; - this.query = query; - this.validate = validate; - this.flattenResults = flattenResults; - this.useLegacySql = useLegacySql; - this.bigQueryServices = bigQueryServices; - } - - /** - * Returns a copy of this transform that reads from the specified table. Refer to - * {@link #parseTableSpec(String)} for the specification format. - * - * <p>Does not modify this object. - */ - public Bound from(ValueProvider<String> tableSpec) { - return new Bound( - name, query, - NestedValueProvider.of( - NestedValueProvider.of( - tableSpec, new TableSpecToTableRef()), - new TableRefToJson()), - validate, flattenResults, useLegacySql, bigQueryServices); - } - - /** - * Returns a copy of this transform that reads from the specified table. - * - * <p>Does not modify this object. - */ - public Bound from(TableReference table) { - return from(StaticValueProvider.of(toTableSpec(table))); - } - - /** - * Returns a copy of this transform that reads the results of the specified query. - * - * <p>Does not modify this object. - * - * <p>By default, the query results will be flattened -- see - * "flattenResults" in the <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs"> - * Jobs documentation</a> for more information. To disable flattening, use - * {@link BigQueryIO.Read.Bound#withoutResultFlattening}. - * - * <p>By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery - * Standard SQL dialect, use {@link BigQueryIO.Read.Bound#usingStandardSql}. - */ - public Bound fromQuery(String query) { - return fromQuery(StaticValueProvider.of(query)); - } - - /** - * Like {@link #fromQuery(String)}, but from a {@link ValueProvider}. - */ - public Bound fromQuery(ValueProvider<String> query) { - return new Bound(name, query, jsonTableRef, validate, - MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), - MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE), - bigQueryServices); - } + public Read withoutValidation() { + return new Read( + name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql, + bigQueryServices); + } - /** - * Disable validation that the table exists or the query succeeds prior to pipeline - * submission. Basic validation (such as ensuring that a query or table is specified) still - * occurs. - */ - public Bound withoutValidation() { - return new Bound( - name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql, - bigQueryServices); - } + /** + * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs"> + * flattening of query results</a>. + * + * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading + * from a table will cause an error during validation. + */ + public Read withoutResultFlattening() { + return new Read( + name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql, + bigQueryServices); + } - /** - * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs"> - * flattening of query results</a>. - * - * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading - * from a table will cause an error during validation. - */ - public Bound withoutResultFlattening() { - return new Bound( - name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql, - bigQueryServices); - } + /** + * Enables BigQuery's Standard SQL dialect when reading from a query. + * + * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading + * from a table will cause an error during validation. + */ + public Read usingStandardSql() { + return new Read( + name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */, + bigQueryServices); + } - /** - * Enables BigQuery's Standard SQL dialect when reading from a query. - * - * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading - * from a table will cause an error during validation. - */ - public Bound usingStandardSql() { - return new Bound( - name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */, - bigQueryServices); - } + @VisibleForTesting + Read withTestServices(BigQueryServices testServices) { + return new Read( + name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices); + } - @VisibleForTesting - Bound withTestServices(BigQueryServices testServices) { - return new Bound( - name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices); + @Override + public void validate(PBegin input) { + // Even if existence validation is disabled, we need to make sure that the BigQueryIO + // read is properly specified. + BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + + String tempLocation = bqOptions.getTempLocation(); + checkArgument( + !Strings.isNullOrEmpty(tempLocation), + "BigQueryIO.Read needs a GCS temp location to store temp files."); + if (bigQueryServices == null) { + try { + GcsPath.fromUri(tempLocation); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format( + "BigQuery temp location expected a valid 'gs://' path, but was given '%s'", + tempLocation), + e); + } } - @Override - public void validate(PBegin input) { - // Even if existence validation is disabled, we need to make sure that the BigQueryIO - // read is properly specified. - BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions); - String tempLocation = bqOptions.getTempLocation(); - checkArgument( - !Strings.isNullOrEmpty(tempLocation), - "BigQueryIO.Read needs a GCS temp location to store temp files."); - if (bigQueryServices == null) { - try { - GcsPath.fromUri(tempLocation); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException( - String.format( - "BigQuery temp location expected a valid 'gs://' path, but was given '%s'", - tempLocation), - e); - } - } - - ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions); + checkState( + table == null || query == null, + "Invalid BigQueryIO.Read: table reference and query may not both be set"); + checkState( + table != null || query != null, + "Invalid BigQueryIO.Read: one of table reference and query must be set"); + if (table != null) { checkState( - table == null || query == null, - "Invalid BigQueryIO.Read: table reference and query may not both be set"); + flattenResults == null, + "Invalid BigQueryIO.Read: Specifies a table with a result flattening" + + " preference, which only applies to queries"); checkState( - table != null || query != null, - "Invalid BigQueryIO.Read: one of table reference and query must be set"); - - if (table != null) { - checkState( - flattenResults == null, - "Invalid BigQueryIO.Read: Specifies a table with a result flattening" - + " preference, which only applies to queries"); - checkState( - useLegacySql == null, - "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect" - + " preference, which only applies to queries"); - } else /* query != null */ { - checkState(flattenResults != null, "flattenResults should not be null if query is set"); - checkState(useLegacySql != null, "useLegacySql should not be null if query is set"); - } - - // Note that a table or query check can fail if the table or dataset are created by - // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline. - // For these cases the withoutValidation method can be used to disable the check. - if (validate && table != null) { - checkState(table.isAccessible(), "Cannot call validate if table is dynamically set."); - // Check for source table presence for early failure notification. - DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions); - verifyDatasetPresence(datasetService, table.get()); - verifyTablePresence(datasetService, table.get()); - } else if (validate && query != null) { - checkState(query.isAccessible(), "Cannot call validate if query is dynamically set."); - JobService jobService = getBigQueryServices().getJobService(bqOptions); - try { - jobService.dryRunQuery( - bqOptions.getProject(), - new JobConfigurationQuery() - .setQuery(query.get()) - .setFlattenResults(flattenResults) - .setUseLegacySql(useLegacySql)); - } catch (Exception e) { - throw new IllegalArgumentException( - String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e); - } - } - } - - @Override - public PCollection<TableRow> expand(PBegin input) { - stepUuid = randomUUIDString(); - BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); - jobUuid = NestedValueProvider.of( - StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid)); - final ValueProvider<String> jobIdToken = NestedValueProvider.of( - jobUuid, new BeamJobUuidToBigQueryJobUuid()); - - BoundedSource<TableRow> source; - final BigQueryServices bqServices = getBigQueryServices(); - - final String extractDestinationDir; - String tempLocation = bqOptions.getTempLocation(); + useLegacySql == null, + "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect" + + " preference, which only applies to queries"); + } else /* query != null */ { + checkState(flattenResults != null, "flattenResults should not be null if query is set"); + checkState(useLegacySql != null, "useLegacySql should not be null if query is set"); + } + + // Note that a table or query check can fail if the table or dataset are created by + // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline. + // For these cases the withoutValidation method can be used to disable the check. + if (validate && table != null) { + checkState(table.isAccessible(), "Cannot call validate if table is dynamically set."); + // Check for source table presence for early failure notification. + DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions); + verifyDatasetPresence(datasetService, table.get()); + verifyTablePresence(datasetService, table.get()); + } else if (validate && query != null) { + checkState(query.isAccessible(), "Cannot call validate if query is dynamically set."); + JobService jobService = getBigQueryServices().getJobService(bqOptions); try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - extractDestinationDir = factory.resolve(tempLocation, stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve extract destination directory in %s", tempLocation)); - } - - final String executingProject = bqOptions.getProject(); - if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) { - source = BigQueryQuerySource.create( - jobIdToken, query, NestedValueProvider.of( - jobUuid, new CreateJsonTableRefFromUuid(executingProject)), - flattenResults, useLegacySql, extractDestinationDir, bqServices); - } else { - ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions); - source = BigQueryTableSource.create( - jobIdToken, inputTable, extractDestinationDir, bqServices, - StaticValueProvider.of(executingProject)); + jobService.dryRunQuery( + bqOptions.getProject(), + new JobConfigurationQuery() + .setQuery(query.get()) + .setFlattenResults(flattenResults) + .setUseLegacySql(useLegacySql)); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e); } - PassThroughThenCleanup.CleanupOperation cleanupOperation = - new PassThroughThenCleanup.CleanupOperation() { - @Override - void cleanup(PipelineOptions options) throws Exception { - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - - JobReference jobRef = new JobReference() - .setProjectId(executingProject) - .setJobId(getExtractJobId(jobIdToken)); - - Job extractJob = bqServices.getJobService(bqOptions) - .getJob(jobRef); - - Collection<String> extractFiles = null; - if (extractJob != null) { - extractFiles = getExtractFilePaths(extractDestinationDir, extractJob); - } else { - IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); - Collection<String> dirMatch = factory.match(extractDestinationDir); - if (!dirMatch.isEmpty()) { - extractFiles = factory.match(factory.resolve(extractDestinationDir, "*")); - } - } - if (extractFiles != null && !extractFiles.isEmpty()) { - new GcsUtilFactory().create(options).remove(extractFiles); - } - }}; - return input.getPipeline() - .apply(org.apache.beam.sdk.io.Read.from(source)) - .setCoder(getDefaultOutputCoder()) - .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation)); } + } - @Override - protected Coder<TableRow> getDefaultOutputCoder() { - return TableRowJsonCoder.of(); + @Override + public PCollection<TableRow> expand(PBegin input) { + stepUuid = randomUUIDString(); + BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + jobUuid = NestedValueProvider.of( + StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid)); + final ValueProvider<String> jobIdToken = NestedValueProvider.of( + jobUuid, new BeamJobUuidToBigQueryJobUuid()); + + BoundedSource<TableRow> source; + final BigQueryServices bqServices = getBigQueryServices(); + + final String extractDestinationDir; + String tempLocation = bqOptions.getTempLocation(); + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + extractDestinationDir = factory.resolve(tempLocation, stepUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve extract destination directory in %s", tempLocation)); } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider())) - .withLabel("Table")) - .addIfNotNull(DisplayData.item("query", query) - .withLabel("Query")) - .addIfNotNull(DisplayData.item("flattenResults", flattenResults) - .withLabel("Flatten Query Results")) - .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql) - .withLabel("Use Legacy SQL Dialect")) - .addIfNotDefault(DisplayData.item("validation", validate) - .withLabel("Validation Enabled"), - true); + final String executingProject = bqOptions.getProject(); + if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) { + source = BigQueryQuerySource.create( + jobIdToken, query, NestedValueProvider.of( + jobUuid, new CreateJsonTableRefFromUuid(executingProject)), + flattenResults, useLegacySql, extractDestinationDir, bqServices); + } else { + ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions); + source = BigQueryTableSource.create( + jobIdToken, inputTable, extractDestinationDir, bqServices, + StaticValueProvider.of(executingProject)); } + PassThroughThenCleanup.CleanupOperation cleanupOperation = + new PassThroughThenCleanup.CleanupOperation() { + @Override + void cleanup(PipelineOptions options) throws Exception { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + + JobReference jobRef = new JobReference() + .setProjectId(executingProject) + .setJobId(getExtractJobId(jobIdToken)); + + Job extractJob = bqServices.getJobService(bqOptions) + .getJob(jobRef); + + Collection<String> extractFiles = null; + if (extractJob != null) { + extractFiles = getExtractFilePaths(extractDestinationDir, extractJob); + } else { + IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); + Collection<String> dirMatch = factory.match(extractDestinationDir); + if (!dirMatch.isEmpty()) { + extractFiles = factory.match(factory.resolve(extractDestinationDir, "*")); + } + } + if (extractFiles != null && !extractFiles.isEmpty()) { + new GcsUtilFactory().create(options).remove(extractFiles); + } + }}; + return input.getPipeline() + .apply(org.apache.beam.sdk.io.Read.from(source)) + .setCoder(getDefaultOutputCoder()) + .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation)); + } - /** - * Returns the table to read, or {@code null} if reading from a query instead. - * - * <p>If the table's project is not specified, use the executing project. - */ - @Nullable private ValueProvider<TableReference> getTableWithDefaultProject( - BigQueryOptions bqOptions) { - ValueProvider<TableReference> table = getTableProvider(); - if (table == null) { - return table; - } - if (!table.isAccessible()) { - LOG.info("Using a dynamic value for table input. This must contain a project" - + " in the table reference: {}", table); - return table; - } - if (Strings.isNullOrEmpty(table.get().getProjectId())) { - // If user does not specify a project we assume the table to be located in - // the default project. - TableReference tableRef = table.get(); - tableRef.setProjectId(bqOptions.getProject()); - return NestedValueProvider.of(StaticValueProvider.of( - toJsonString(tableRef)), new JsonTableRefToTableRef()); - } + @Override + protected Coder<TableRow> getDefaultOutputCoder() { + return TableRowJsonCoder.of(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider())) + .withLabel("Table")) + .addIfNotNull(DisplayData.item("query", query) + .withLabel("Query")) + .addIfNotNull(DisplayData.item("flattenResults", flattenResults) + .withLabel("Flatten Query Results")) + .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql) + .withLabel("Use Legacy SQL Dialect")) + .addIfNotDefault(DisplayData.item("validation", validate) + .withLabel("Validation Enabled"), + true); + } + + /** + * Returns the table to read, or {@code null} if reading from a query instead. + * + * <p>If the table's project is not specified, use the executing project. + */ + @Nullable private ValueProvider<TableReference> getTableWithDefaultProject( + BigQueryOptions bqOptions) { + ValueProvider<TableReference> table = getTableProvider(); + if (table == null) { return table; } - - /** - * Returns the table to read, or {@code null} if reading from a query instead. - */ - @Nullable - public ValueProvider<TableReference> getTableProvider() { - return jsonTableRef == null - ? null : NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef()); + if (!table.isAccessible()) { + LOG.info("Using a dynamic value for table input. This must contain a project" + + " in the table reference: {}", table); + return table; } - /** - * Returns the table to read, or {@code null} if reading from a query instead. - */ - @Nullable - public TableReference getTable() { - ValueProvider<TableReference> provider = getTableProvider(); - return provider == null ? null : provider.get(); + if (Strings.isNullOrEmpty(table.get().getProjectId())) { + // If user does not specify a project we assume the table to be located in + // the default project. + TableReference tableRef = table.get(); + tableRef.setProjectId(bqOptions.getProject()); + return NestedValueProvider.of(StaticValueProvider.of( + toJsonString(tableRef)), new JsonTableRefToTableRef()); } + return table; + } - /** - * Returns the query to be read, or {@code null} if reading from a table instead. - */ - @Nullable - public String getQuery() { - return query == null ? null : query.get(); - } + /** + * Returns the table to read, or {@code null} if reading from a query instead. + */ + @Nullable + public ValueProvider<TableReference> getTableProvider() { + return jsonTableRef == null + ? null : NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef()); + } + /** + * Returns the table to read, or {@code null} if reading from a query instead. + */ + @Nullable + public TableReference getTable() { + ValueProvider<TableReference> provider = getTableProvider(); + return provider == null ? null : provider.get(); + } - /** - * Returns the query to be read, or {@code null} if reading from a table instead. - */ - @Nullable - public ValueProvider<String> getQueryProvider() { - return query; - } + /** + * Returns the query to be read, or {@code null} if reading from a table instead. + */ + @Nullable + public String getQuery() { + return query == null ? null : query.get(); + } - /** - * Returns true if table validation is enabled. - */ - public boolean getValidate() { - return validate; - } + /** + * Returns the query to be read, or {@code null} if reading from a table instead. + */ + @Nullable + public ValueProvider<String> getQueryProvider() { + return query; + } - /** - * Returns true/false if result flattening is enabled/disabled, or null if not applicable. - */ - public Boolean getFlattenResults() { - return flattenResults; - } + /** + * Returns true if table validation is enabled. + */ + public boolean getValidate() { + return validate; + } - /** - * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null - * if not applicable. - */ - @Nullable - public Boolean getUseLegacySql() { - return useLegacySql; - } + /** + * Returns true/false if result flattening is enabled/disabled, or null if not applicable. + */ + public Boolean getFlattenResults() { + return flattenResults; + } - private BigQueryServices getBigQueryServices() { - if (bigQueryServices == null) { - bigQueryServices = new BigQueryServicesImpl(); - } - return bigQueryServices; - } + /** + * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null + * if not applicable. + */ + @Nullable + public Boolean getUseLegacySql() { + return useLegacySql; } - /** Disallow construction of utility class. */ - private Read() {} + private BigQueryServices getBigQueryServices() { + if (bigQueryServices == null) { + bigQueryServices = new BigQueryServicesImpl(); + } + return bigQueryServices; + } } /** http://git-wip-us.apache.org/repos/asf/beam/blob/825338aa/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index c9061a3..bb1528b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -665,28 +665,28 @@ public class BigQueryIOTest implements Serializable { @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService; private void checkReadTableObject( - BigQueryIO.Read.Bound bound, String project, String dataset, String table) { - checkReadTableObjectWithValidate(bound, project, dataset, table, true); + BigQueryIO.Read read, String project, String dataset, String table) { + checkReadTableObjectWithValidate(read, project, dataset, table, true); } - private void checkReadQueryObject(BigQueryIO.Read.Bound bound, String query) { - checkReadQueryObjectWithValidate(bound, query, true); + private void checkReadQueryObject(BigQueryIO.Read read, String query) { + checkReadQueryObjectWithValidate(read, query, true); } private void checkReadTableObjectWithValidate( - BigQueryIO.Read.Bound bound, String project, String dataset, String table, boolean validate) { - assertEquals(project, bound.getTable().getProjectId()); - assertEquals(dataset, bound.getTable().getDatasetId()); - assertEquals(table, bound.getTable().getTableId()); - assertNull(bound.query); - assertEquals(validate, bound.getValidate()); + BigQueryIO.Read read, String project, String dataset, String table, boolean validate) { + assertEquals(project, read.getTable().getProjectId()); + assertEquals(dataset, read.getTable().getDatasetId()); + assertEquals(table, read.getTable().getTableId()); + assertNull(read.query); + assertEquals(validate, read.getValidate()); } private void checkReadQueryObjectWithValidate( - BigQueryIO.Read.Bound bound, String query, boolean validate) { - assertNull(bound.getTable()); - assertEquals(query, bound.getQuery()); - assertEquals(validate, bound.getValidate()); + BigQueryIO.Read read, String query, boolean validate) { + assertNull(read.getTable()); + assertEquals(query, read.getQuery()); + assertEquals(validate, read.getValidate()); } private void checkWriteObject( @@ -728,39 +728,39 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildTableBasedSource() { - BigQueryIO.Read.Bound bound = BigQueryIO.Read.from("foo.com:project:somedataset.sometable"); - checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable"); + BigQueryIO.Read read = BigQueryIO.Read.from("foo.com:project:somedataset.sometable"); + checkReadTableObject(read, "foo.com:project", "somedataset", "sometable"); } @Test public void testBuildQueryBasedSource() { - BigQueryIO.Read.Bound bound = BigQueryIO.Read.fromQuery("foo_query"); - checkReadQueryObject(bound, "foo_query"); + BigQueryIO.Read read = BigQueryIO.Read.fromQuery("foo_query"); + checkReadQueryObject(read, "foo_query"); } @Test public void testBuildTableBasedSourceWithoutValidation() { // This test just checks that using withoutValidation will not trigger object // construction errors. - BigQueryIO.Read.Bound bound = + BigQueryIO.Read read = BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation(); - checkReadTableObjectWithValidate(bound, "foo.com:project", "somedataset", "sometable", false); + checkReadTableObjectWithValidate(read, "foo.com:project", "somedataset", "sometable", false); } @Test public void testBuildQueryBasedSourceWithoutValidation() { // This test just checks that using withoutValidation will not trigger object // construction errors. - BigQueryIO.Read.Bound bound = + BigQueryIO.Read read = BigQueryIO.Read.fromQuery("some_query").withoutValidation(); - checkReadQueryObjectWithValidate(bound, "some_query", false); + checkReadQueryObjectWithValidate(read, "some_query", false); } @Test public void testBuildTableBasedSourceWithDefaultProject() { - BigQueryIO.Read.Bound bound = + BigQueryIO.Read read = BigQueryIO.Read.from("somedataset.sometable"); - checkReadTableObject(bound, null, "somedataset", "sometable"); + checkReadTableObject(read, null, "somedataset", "sometable"); } @Test @@ -769,8 +769,8 @@ public class BigQueryIOTest implements Serializable { .setProjectId("foo.com:project") .setDatasetId("somedataset") .setTableId("sometable"); - BigQueryIO.Read.Bound bound = BigQueryIO.Read.from(table); - checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable"); + BigQueryIO.Read read = BigQueryIO.Read.from(table); + checkReadTableObject(read, "foo.com:project", "somedataset", "sometable"); } @Test @@ -807,39 +807,6 @@ public class BigQueryIOTest implements Serializable { @Test @Category(NeedsRunner.class) - public void testBuildSourceWithoutTableQueryOrValidation() { - BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); - bqOptions.setTempLocation("gs://testbucket/testdir"); - - Pipeline p = TestPipeline.create(bqOptions); - thrown.expect(IllegalStateException.class); - thrown.expectMessage( - "Invalid BigQueryIO.Read: one of table reference and query must be set"); - p.apply(BigQueryIO.Read.withoutValidation()); - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testBuildSourceWithTableAndQuery() { - BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); - bqOptions.setTempLocation("gs://testbucket/testdir"); - - Pipeline p = TestPipeline.create(bqOptions); - thrown.expect(IllegalStateException.class); - thrown.expectMessage( - "Invalid BigQueryIO.Read: table reference and query may not both be set"); - p.apply("ReadMyTable", - BigQueryIO.Read - .from("foo.com:project:somedataset.sometable") - .fromQuery("query")); - p.run(); - } - - @Test - @Category(NeedsRunner.class) public void testBuildSourceWithTableAndFlatten() { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultProject"); @@ -1291,12 +1258,11 @@ public class BigQueryIOTest implements Serializable { } @Test - public void testBuildSourceDisplayData() { + public void testBuildSourceDisplayDataTable() { String tableSpec = "project:dataset.tableid"; - BigQueryIO.Read.Bound read = BigQueryIO.Read + BigQueryIO.Read read = BigQueryIO.Read .from(tableSpec) - .fromQuery("myQuery") .withoutResultFlattening() .usingStandardSql() .withoutValidation(); @@ -1304,6 +1270,21 @@ public class BigQueryIOTest implements Serializable { DisplayData displayData = DisplayData.from(read); assertThat(displayData, hasDisplayItem("table", tableSpec)); + assertThat(displayData, hasDisplayItem("flattenResults", false)); + assertThat(displayData, hasDisplayItem("useLegacySql", false)); + assertThat(displayData, hasDisplayItem("validation", false)); + } + + @Test + public void testBuildSourceDisplayDataQuery() { + BigQueryIO.Read read = BigQueryIO.Read + .fromQuery("myQuery") + .withoutResultFlattening() + .usingStandardSql() + .withoutValidation(); + + DisplayData displayData = DisplayData.from(read); + assertThat(displayData, hasDisplayItem("query", "myQuery")); assertThat(displayData, hasDisplayItem("flattenResults", false)); assertThat(displayData, hasDisplayItem("useLegacySql", false)); @@ -1315,7 +1296,7 @@ public class BigQueryIOTest implements Serializable { @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient") public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - BigQueryIO.Read.Bound read = BigQueryIO.Read + BigQueryIO.Read read = BigQueryIO.Read .from("project:dataset.tableId") .withTestServices(new FakeBigQueryServices() .withDatasetService(mockDatasetService) @@ -1332,7 +1313,7 @@ public class BigQueryIOTest implements Serializable { @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient") public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - BigQueryIO.Read.Bound read = BigQueryIO.Read + BigQueryIO.Read read = BigQueryIO.Read .fromQuery("foobar") .withTestServices(new FakeBigQueryServices() .withDatasetService(mockDatasetService) @@ -2375,7 +2356,7 @@ public class BigQueryIOTest implements Serializable { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); bqOptions.setTempLocation("gs://testbucket/testdir"); Pipeline pipeline = TestPipeline.create(options); - BigQueryIO.Read.Bound read = BigQueryIO.Read.from( + BigQueryIO.Read read = BigQueryIO.Read.from( options.getInputTable()).withoutValidation(); pipeline.apply(read); // Test that this doesn't throw. @@ -2388,7 +2369,7 @@ public class BigQueryIOTest implements Serializable { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); bqOptions.setTempLocation("gs://testbucket/testdir"); Pipeline pipeline = TestPipeline.create(options); - BigQueryIO.Read.Bound read = BigQueryIO.Read.fromQuery( + BigQueryIO.Read read = BigQueryIO.Read.fromQuery( options.getInputQuery()).withoutValidation(); pipeline.apply(read); // Test that this doesn't throw. @@ -2497,10 +2478,10 @@ public class BigQueryIOTest implements Serializable { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); Pipeline pipeline = TestPipeline.create(options); bqOptions.setTempLocation("gs://testbucket/testdir"); - BigQueryIO.Read.Bound read1 = BigQueryIO.Read.fromQuery( + BigQueryIO.Read read1 = BigQueryIO.Read.fromQuery( options.getInputQuery()).withoutValidation(); pipeline.apply(read1); - BigQueryIO.Read.Bound read2 = BigQueryIO.Read.fromQuery( + BigQueryIO.Read read2 = BigQueryIO.Read.fromQuery( options.getInputQuery()).withoutValidation(); pipeline.apply(read2); assertNotEquals(read1.stepUuid, read2.stepUuid);
