Use AutoValue for BigQueryIO.Read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32cba321 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32cba321 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32cba321 Branch: refs/heads/master Commit: 32cba321c04c5e3fb18856c84ea10b3513264dd5 Parents: d6b3e11 Author: Eugene Kirpichov <[email protected]> Authored: Thu Mar 2 17:51:04 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Mar 14 15:54:27 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 215 +++++++------------ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 21 +- 2 files changed, 84 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/32cba321/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 90d7f67..e5db60e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -36,9 +36,11 @@ import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.auto.value.AutoValue; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.MoreObjects; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -453,97 +455,86 @@ public class BigQueryIO { * } * }}</pre> */ - public static class Read extends PTransform<PBegin, PCollection<TableRow>> { - @Nullable final ValueProvider<String> jsonTableRef; - @Nullable final ValueProvider<String> query; + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<TableRow>> { + @Nullable abstract ValueProvider<String> getJsonTableRef(); + @Nullable abstract ValueProvider<String> getQuery(); + abstract boolean getValidate(); + @Nullable abstract Boolean getFlattenResults(); + @Nullable abstract Boolean getUseLegacySql(); + @Nullable abstract BigQueryServices getBigQueryServices(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef); + abstract Builder setQuery(ValueProvider<String> query); + abstract Builder setValidate(boolean validate); + abstract Builder setFlattenResults(@Nullable Boolean flattenResults); + abstract Builder setUseLegacySql(@Nullable Boolean useLegacySql); + abstract Builder setBigQueryServices(@Nullable BigQueryServices bigQueryServices); + + abstract Read build(); + } + + private static Builder builder() { + return new AutoValue_BigQueryIO_Read.Builder() + .setValidate(true) + .setBigQueryServices(new BigQueryServicesImpl()); + } /** * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or * {@code "[dataset_id].[table_id]"} for tables within the current project. */ public static Read from(String tableSpec) { - return new Read().from(StaticValueProvider.of(tableSpec)); + return from(StaticValueProvider.of(tableSpec)); } /** * Same as {@code from(String)}, but with a {@link ValueProvider}. */ public static Read from(ValueProvider<String> tableSpec) { - return new Read().from(tableSpec); + return builder() + .setJsonTableRef( + NestedValueProvider.of( + NestedValueProvider.of(tableSpec, new TableSpecToTableRef()), + new TableRefToJson())).build(); } /** * Reads results received after executing the given query. */ public static Read fromQuery(String query) { - return new Read().fromQuery(StaticValueProvider.of(query)); + return fromQuery(StaticValueProvider.of(query)); } /** * Same as {@code from(String)}, but with a {@link ValueProvider}. */ public static Read fromQuery(ValueProvider<String> query) { - return new Read().fromQuery(query); + return builder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build(); } /** * Reads a BigQuery table specified as a {@link TableReference} object. */ public static Read from(TableReference table) { - return new Read().from(table); + return from(StaticValueProvider.of(toTableSpec(table))); } - /** - * Disable validation that the table exists or the query succeeds prior to pipeline - * submission. Basic validation (such as ensuring that a query or table is specified) still - * occurs. - */ - final boolean validate; - @Nullable final Boolean flattenResults; - @Nullable final Boolean useLegacySql; - @Nullable BigQueryServices bigQueryServices; - - @VisibleForTesting @Nullable String stepUuid; - @VisibleForTesting @Nullable ValueProvider<String> jobUuid; - private static final String QUERY_VALIDATION_FAILURE_ERROR = "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the" + " pipeline, This validation can be disabled using #withoutValidation."; - private Read() { - this( - null /* name */, - null /* query */, - null /* jsonTableRef */, - true /* validate */, - null /* flattenResults */, - null /* useLegacySql */, - null /* bigQueryServices */); - } - - private Read( - String name, @Nullable ValueProvider<String> query, - @Nullable ValueProvider<String> jsonTableRef, boolean validate, - @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql, - @Nullable BigQueryServices bigQueryServices) { - super(name); - this.jsonTableRef = jsonTableRef; - this.query = query; - this.validate = validate; - this.flattenResults = flattenResults; - this.useLegacySql = useLegacySql; - this.bigQueryServices = bigQueryServices; - } - /** * Disable validation that the table exists or the query succeeds prior to pipeline * submission. Basic validation (such as ensuring that a query or table is specified) still * occurs. */ public Read withoutValidation() { - return new Read( - name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql, - bigQueryServices); + return toBuilder().setValidate(false).build(); } /** @@ -554,9 +545,7 @@ public class BigQueryIO { * from a table will cause an error during validation. */ public Read withoutResultFlattening() { - return new Read( - name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql, - bigQueryServices); + return toBuilder().setFlattenResults(false).build(); } /** @@ -566,15 +555,12 @@ public class BigQueryIO { * from a table will cause an error during validation. */ public Read usingStandardSql() { - return new Read( - name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */, - bigQueryServices); + return toBuilder().setUseLegacySql(false).build(); } @VisibleForTesting Read withTestServices(BigQueryServices testServices) { - return new Read( - name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices); + return toBuilder().setBigQueryServices(testServices).build(); } @Override @@ -587,7 +573,7 @@ public class BigQueryIO { checkArgument( !Strings.isNullOrEmpty(tempLocation), "BigQueryIO.Read needs a GCS temp location to store temp files."); - if (bigQueryServices == null) { + if (getBigQueryServices() == null) { try { GcsPath.fromUri(tempLocation); } catch (IllegalArgumentException e) { @@ -602,63 +588,65 @@ public class BigQueryIO { ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions); checkState( - table == null || query == null, + table == null || getQuery() == null, "Invalid BigQueryIO.Read: table reference and query may not both be set"); checkState( - table != null || query != null, + table != null || getQuery() != null, "Invalid BigQueryIO.Read: one of table reference and query must be set"); if (table != null) { checkState( - flattenResults == null, + getFlattenResults() == null, "Invalid BigQueryIO.Read: Specifies a table with a result flattening" + " preference, which only applies to queries"); checkState( - useLegacySql == null, + getUseLegacySql() == null, "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect" + " preference, which only applies to queries"); } else /* query != null */ { - checkState(flattenResults != null, "flattenResults should not be null if query is set"); - checkState(useLegacySql != null, "useLegacySql should not be null if query is set"); + checkState( + getFlattenResults() != null, "flattenResults should not be null if query is set"); + checkState(getUseLegacySql() != null, "useLegacySql should not be null if query is set"); } // Note that a table or query check can fail if the table or dataset are created by // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline. // For these cases the withoutValidation method can be used to disable the check. - if (validate && table != null) { + if (getValidate() && table != null) { checkState(table.isAccessible(), "Cannot call validate if table is dynamically set."); // Check for source table presence for early failure notification. DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions); verifyDatasetPresence(datasetService, table.get()); verifyTablePresence(datasetService, table.get()); - } else if (validate && query != null) { - checkState(query.isAccessible(), "Cannot call validate if query is dynamically set."); + } else if (getValidate() && getQuery() != null) { + checkState(getQuery().isAccessible(), "Cannot call validate if query is dynamically set."); JobService jobService = getBigQueryServices().getJobService(bqOptions); try { jobService.dryRunQuery( bqOptions.getProject(), new JobConfigurationQuery() - .setQuery(query.get()) - .setFlattenResults(flattenResults) - .setUseLegacySql(useLegacySql)); + .setQuery(getQuery().get()) + .setFlattenResults(getFlattenResults()) + .setUseLegacySql(getUseLegacySql())); } catch (Exception e) { throw new IllegalArgumentException( - String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e); + String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e); } } } @Override public PCollection<TableRow> expand(PBegin input) { - stepUuid = randomUUIDString(); + String stepUuid = randomUUIDString(); BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); - jobUuid = NestedValueProvider.of( + ValueProvider<String> jobUuid = NestedValueProvider.of( StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid)); final ValueProvider<String> jobIdToken = NestedValueProvider.of( jobUuid, new BeamJobUuidToBigQueryJobUuid()); BoundedSource<TableRow> source; - final BigQueryServices bqServices = getBigQueryServices(); + final BigQueryServices bqServices = + MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl()); final String extractDestinationDir; String tempLocation = bqOptions.getTempLocation(); @@ -671,11 +659,18 @@ public class BigQueryIO { } final String executingProject = bqOptions.getProject(); - if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) { - source = BigQueryQuerySource.create( - jobIdToken, query, NestedValueProvider.of( - jobUuid, new CreateJsonTableRefFromUuid(executingProject)), - flattenResults, useLegacySql, extractDestinationDir, bqServices); + if (getQuery() != null + && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) { + source = + BigQueryQuerySource.create( + jobIdToken, + getQuery(), + NestedValueProvider.of( + jobUuid, new CreateJsonTableRefFromUuid(executingProject)), + getFlattenResults(), + getUseLegacySql(), + extractDestinationDir, + bqServices); } else { ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions); source = BigQueryTableSource.create( @@ -726,13 +721,13 @@ public class BigQueryIO { builder .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider())) .withLabel("Table")) - .addIfNotNull(DisplayData.item("query", query) + .addIfNotNull(DisplayData.item("query", getQuery()) .withLabel("Query")) - .addIfNotNull(DisplayData.item("flattenResults", flattenResults) + .addIfNotNull(DisplayData.item("flattenResults", getFlattenResults()) .withLabel("Flatten Query Results")) - .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql) + .addIfNotNull(DisplayData.item("useLegacySql", getUseLegacySql()) .withLabel("Use Legacy SQL Dialect")) - .addIfNotDefault(DisplayData.item("validation", validate) + .addIfNotDefault(DisplayData.item("validation", getValidate()) .withLabel("Validation Enabled"), true); } @@ -769,8 +764,8 @@ public class BigQueryIO { */ @Nullable public ValueProvider<TableReference> getTableProvider() { - return jsonTableRef == null - ? null : NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef()); + return getJsonTableRef() == null + ? null : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef()); } /** * Returns the table to read, or {@code null} if reading from a query instead. @@ -780,52 +775,6 @@ public class BigQueryIO { ValueProvider<TableReference> provider = getTableProvider(); return provider == null ? null : provider.get(); } - - /** - * Returns the query to be read, or {@code null} if reading from a table instead. - */ - @Nullable - public String getQuery() { - return query == null ? null : query.get(); - } - - /** - * Returns the query to be read, or {@code null} if reading from a table instead. - */ - @Nullable - public ValueProvider<String> getQueryProvider() { - return query; - } - - /** - * Returns true if table validation is enabled. - */ - public boolean getValidate() { - return validate; - } - - /** - * Returns true/false if result flattening is enabled/disabled, or null if not applicable. - */ - public Boolean getFlattenResults() { - return flattenResults; - } - - /** - * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null - * if not applicable. - */ - @Nullable - public Boolean getUseLegacySql() { - return useLegacySql; - } - - private BigQueryServices getBigQueryServices() { - if (bigQueryServices == null) { - bigQueryServices = new BigQueryServicesImpl(); - } - return bigQueryServices; - } } /** @@ -1863,7 +1812,7 @@ public class BigQueryIO { ValueProvider<TableReference> table = getTableWithDefaultProject(options); - String stepUuid = randomUUIDString(); + final String stepUuid = randomUUIDString(); String tempLocation = options.getTempLocation(); String tempFilePrefix; @@ -1886,7 +1835,7 @@ public class BigQueryIO { new SimpleFunction<String, String>() { @Override public String apply(String input) { - return randomUUIDString(); + return stepUuid; } })) .apply(View.<String>asSingleton()); http://git-wip-us.apache.org/repos/asf/beam/blob/32cba321/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index f403c5a..fdaa81c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -26,7 +26,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -678,14 +677,14 @@ public class BigQueryIOTest implements Serializable { assertEquals(project, read.getTable().getProjectId()); assertEquals(dataset, read.getTable().getDatasetId()); assertEquals(table, read.getTable().getTableId()); - assertNull(read.query); + assertNull(read.getQuery()); assertEquals(validate, read.getValidate()); } private void checkReadQueryObjectWithValidate( BigQueryIO.Read read, String query, boolean validate) { assertNull(read.getTable()); - assertEquals(query, read.getQuery()); + assertEquals(query, read.getQuery().get()); assertEquals(validate, read.getValidate()); } @@ -2433,20 +2432,4 @@ public class BigQueryIOTest implements Serializable { BigQueryIO.TableRowInfoCoder.of()), IntervalWindow.getCoder())); } - - @Test - public void testUniqueStepIdRead() { - RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - Pipeline pipeline = TestPipeline.create(options); - bqOptions.setTempLocation("gs://testbucket/testdir"); - BigQueryIO.Read read1 = BigQueryIO.Read.fromQuery( - options.getInputQuery()).withoutValidation(); - pipeline.apply(read1); - BigQueryIO.Read read2 = BigQueryIO.Read.fromQuery( - options.getInputQuery()).withoutValidation(); - pipeline.apply(read2); - assertNotEquals(read1.stepUuid, read2.stepUuid); - assertNotEquals(read1.jobUuid.get(), read2.jobUuid.get()); - } }
