Use AutoValue for BigQueryIO.Write
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c715896 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c715896 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c715896 Branch: refs/heads/master Commit: 5c715896e683f7bec9f150ea17c78d1dae00ee4c Parents: 32cba32 Author: Eugene Kirpichov <[email protected]> Authored: Thu Mar 2 18:14:39 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Mar 14 15:54:30 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 314 ++++++------------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 8 +- 2 files changed, 108 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5c715896/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index e5db60e..d2f6ba6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -471,9 +471,9 @@ public class BigQueryIO { abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef); abstract Builder setQuery(ValueProvider<String> query); abstract Builder setValidate(boolean validate); - abstract Builder setFlattenResults(@Nullable Boolean flattenResults); - abstract Builder setUseLegacySql(@Nullable Boolean useLegacySql); - abstract Builder setBigQueryServices(@Nullable BigQueryServices bigQueryServices); + abstract Builder setFlattenResults(Boolean flattenResults); + abstract Builder setUseLegacySql(Boolean useLegacySql); + abstract Builder setBigQueryServices(BigQueryServices bigQueryServices); abstract Read build(); } @@ -1364,10 +1364,13 @@ public class BigQueryIO { * } * }}</pre> */ - public static class Write extends PTransform<PCollection<TableRow>, PDone> { + @AutoValue + public abstract static class Write extends PTransform<PCollection<TableRow>, PDone> { + @VisibleForTesting // Maximum number of files in a single partition. static final int MAX_NUM_FILES = 10000; + @VisibleForTesting // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit. static final long MAX_SIZE_BYTES = 11 * (1L << 40); @@ -1378,28 +1381,41 @@ public class BigQueryIO { // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; - @Nullable private final ValueProvider<String> jsonTableRef; - - @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction; - - // Table schema. The schema is required only if the table does not exist. - @Nullable private final ValueProvider<String> jsonSchema; - - // Options for creating the table. Valid values are CREATE_IF_NEEDED and - // CREATE_NEVER. - final CreateDisposition createDisposition; + @Nullable abstract ValueProvider<String> getJsonTableRef(); + @Nullable abstract SerializableFunction<BoundedWindow, TableReference> getTableRefFunction(); + /** Table schema. The schema is required only if the table does not exist. */ + @Nullable abstract ValueProvider<String> getJsonSchema(); + abstract CreateDisposition getCreateDisposition(); + abstract WriteDisposition getWriteDisposition(); + @Nullable abstract String getTableDescription(); + /** An option to indicate if table validation is desired. Default is true. */ + abstract boolean getValidate(); + @Nullable abstract BigQueryServices getBigQueryServices(); - // Options for writing to the table. Valid values are WRITE_TRUNCATE, - // WRITE_APPEND and WRITE_EMPTY. - final WriteDisposition writeDisposition; + abstract Builder toBuilder(); - @Nullable - final String tableDescription; + @AutoValue.Builder + abstract static class Builder { + abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef); + abstract Builder setTableRefFunction( + SerializableFunction<BoundedWindow, TableReference> tableRefFunction); + abstract Builder setJsonSchema(ValueProvider<String> jsonSchema); + abstract Builder setCreateDisposition(CreateDisposition createDisposition); + abstract Builder setWriteDisposition(WriteDisposition writeDisposition); + abstract Builder setTableDescription(String tableDescription); + abstract Builder setValidate(boolean validate); + abstract Builder setBigQueryServices(BigQueryServices bigQueryServices); - // An option to indicate if table validation is desired. Default is true. - final boolean validate; + abstract Write build(); + } - @Nullable private BigQueryServices bigQueryServices; + private static Builder builder() { + return new AutoValue_BigQueryIO_Write.Builder() + .setValidate(true) + .setBigQueryServices(new BigQueryServicesImpl()) + .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .setWriteDisposition(WriteDisposition.WRITE_EMPTY); + } /** * An enumeration type for the BigQuery create disposition strings. @@ -1474,17 +1490,22 @@ public class BigQueryIO { * <p>Refer to {@link #parseTableSpec(String)} for the specification format. */ public static Write to(String tableSpec) { - return new Write().withTableSpec(tableSpec); + return to(StaticValueProvider.of(tableSpec)); } /** Creates a write transformation for the given table. */ - public static Write to(ValueProvider<String> tableSpec) { - return new Write().withTableSpec(tableSpec); + public static Write to(TableReference table) { + return to(StaticValueProvider.of(toTableSpec(table))); } /** Creates a write transformation for the given table. */ - public static Write to(TableReference table) { - return new Write().withTableRef(table); + public static Write to(ValueProvider<String> tableSpec) { + return builder() + .setJsonTableRef( + NestedValueProvider.of( + NestedValueProvider.of(tableSpec, new TableSpecToTableRef()), + new TableRefToJson())) + .build(); } /** @@ -1499,7 +1520,7 @@ public class BigQueryIO { * always return the same table specification. */ public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) { - return new Write().withTableSpec(tableSpecFunction); + return toTableReference(new TranslateTableSpecFunction(tableSpecFunction)); } /** @@ -1511,7 +1532,7 @@ public class BigQueryIO { */ private static Write toTableReference( SerializableFunction<BoundedWindow, TableReference> tableRefFunction) { - return new Write().withTableRef(tableRefFunction); + return builder().setTableRefFunction(tableRefFunction).build(); } private static class TranslateTableSpecFunction implements @@ -1528,109 +1549,6 @@ public class BigQueryIO { } } - private Write() { - this( - null /* name */, - null /* jsonTableRef */, - null /* tableRefFunction */, - null /* jsonSchema */, - CreateDisposition.CREATE_IF_NEEDED, - WriteDisposition.WRITE_EMPTY, - null /* tableDescription */, - true /* validate */, - null /* bigQueryServices */); - } - - private Write(String name, @Nullable ValueProvider<String> jsonTableRef, - @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction, - @Nullable ValueProvider<String> jsonSchema, - CreateDisposition createDisposition, - WriteDisposition writeDisposition, - @Nullable String tableDescription, - boolean validate, - @Nullable BigQueryServices bigQueryServices) { - super(name); - this.jsonTableRef = jsonTableRef; - this.tableRefFunction = tableRefFunction; - this.jsonSchema = jsonSchema; - this.createDisposition = checkNotNull(createDisposition, "createDisposition"); - this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); - this.tableDescription = tableDescription; - this.validate = validate; - this.bigQueryServices = bigQueryServices; - } - - /** - * Returns a copy of this write transformation, but writing to the specified table. Refer to - * {@link #parseTableSpec(String)} for the specification format. - * - * <p>Does not modify this object. - */ - private Write withTableSpec(String tableSpec) { - return withTableRef(NestedValueProvider.of( - StaticValueProvider.of(tableSpec), new TableSpecToTableRef())); - } - - /** - * Returns a copy of this write transformation, but writing to the specified table. - * - * <p>Does not modify this object. - */ - public Write withTableRef(TableReference table) { - return withTableSpec(StaticValueProvider.of(toTableSpec(table))); - } - - /** - * Returns a copy of this write transformation, but writing to the specified table. Refer to - * {@link #parseTableSpec(String)} for the specification format. - * - * <p>Does not modify this object. - */ - public Write withTableSpec(ValueProvider<String> tableSpec) { - return withTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef())); - } - - /** - * Returns a copy of this write transformation, but writing to the specified table. - * - * <p>Does not modify this object. - */ - private Write withTableRef(ValueProvider<TableReference> table) { - return new Write(name, - NestedValueProvider.of(table, new TableRefToJson()), - tableRefFunction, jsonSchema, createDisposition, - writeDisposition, tableDescription, validate, bigQueryServices); - } - - /** - * Returns a copy of this write transformation, but using the specified function to determine - * which table to write to for each window. - * - * <p>Does not modify this object. - * - * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it - * should always return the same table specification. - */ - private Write withTableSpec( - SerializableFunction<BoundedWindow, String> tableSpecFunction) { - return toTableReference(new TranslateTableSpecFunction(tableSpecFunction)); - } - - /** - * Returns a copy of this write transformation, but using the specified function to determine - * which table to write to for each window. - * - * <p>Does not modify this object. - * - * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should - * always return the same table reference. - */ - private Write withTableRef( - SerializableFunction<BoundedWindow, TableReference> tableRefFunction) { - return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, tableDescription, validate, bigQueryServices); - } - /** * Returns a copy of this write transformation, but using the specified schema for rows * to be written. @@ -1638,18 +1556,18 @@ public class BigQueryIO { * <p>Does not modify this object. */ public Write withSchema(TableSchema schema) { - return new Write(name, jsonTableRef, tableRefFunction, - StaticValueProvider.of(toJsonString(schema)), - createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + return toBuilder() + .setJsonSchema(StaticValueProvider.of(toJsonString(schema))) + .build(); } /** * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}. */ public Write withSchema(ValueProvider<TableSchema> schema) { - return new Write(name, jsonTableRef, tableRefFunction, - NestedValueProvider.of(schema, new TableSchemaToJsonSchema()), - createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + return toBuilder() + .setJsonSchema(NestedValueProvider.of(schema, new TableSchemaToJsonSchema())) + .build(); } /** @@ -1658,8 +1576,7 @@ public class BigQueryIO { * <p>Does not modify this object. */ public Write withCreateDisposition(CreateDisposition createDisposition) { - return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, - createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + return toBuilder().setCreateDisposition(createDisposition).build(); } /** @@ -1668,8 +1585,7 @@ public class BigQueryIO { * <p>Does not modify this object. */ public Write withWriteDisposition(WriteDisposition writeDisposition) { - return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, - createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + return toBuilder().setWriteDisposition(writeDisposition).build(); } /** @@ -1677,9 +1593,8 @@ public class BigQueryIO { * * <p>Does not modify this object. */ - public Write withTableDescription(@Nullable String tableDescription) { - return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, - createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + public Write withTableDescription(String tableDescription) { + return toBuilder().setTableDescription(tableDescription).build(); } /** @@ -1688,14 +1603,12 @@ public class BigQueryIO { * <p>Does not modify this object. */ public Write withoutValidation() { - return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, tableDescription, false, bigQueryServices); + return toBuilder().setValidate(false).build(); } @VisibleForTesting Write withTestServices(BigQueryServices testServices) { - return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, tableDescription, validate, testServices); + return toBuilder().setBigQueryServices(testServices).build(); } private static void verifyTableNotExistOrEmpty( @@ -1724,23 +1637,25 @@ public class BigQueryIO { // Exactly one of the table and table reference can be configured. checkState( - jsonTableRef != null || tableRefFunction != null, + getJsonTableRef() != null || getTableRefFunction() != null, "must set the table reference of a BigQueryIO.Write transform"); checkState( - jsonTableRef == null || tableRefFunction == null, + getJsonTableRef() == null || getTableRefFunction() == null, "Cannot set both a table reference and a table function for a BigQueryIO.Write" + " transform"); // Require a schema if creating one or more tables. checkArgument( - createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null, + getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED || getJsonSchema() != null, "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided."); // The user specified a table. - if (jsonTableRef != null && validate) { + BigQueryServices bqServices = + MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl()); + if (getJsonTableRef() != null && getValidate()) { TableReference table = getTableWithDefaultProject(options).get(); - DatasetService datasetService = getBigQueryServices().getDatasetService(options); + DatasetService datasetService = bqServices.getDatasetService(options); // Check for destination table presence and emptiness for early failure notification. // Note that a presence check can fail when the table or dataset is created by an earlier // stage of the pipeline. For these cases the #withoutValidation method can be used to @@ -1754,22 +1669,22 @@ public class BigQueryIO { } } - if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) { + if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || getTableRefFunction() != null) { // We will use BigQuery's streaming write API -- validate supported dispositions. - if (tableRefFunction != null) { + if (getTableRefFunction() != null) { checkArgument( - createDisposition != CreateDisposition.CREATE_NEVER, + getCreateDisposition() != CreateDisposition.CREATE_NEVER, "CreateDisposition.CREATE_NEVER is not supported when using a tablespec" + " function."); } - if (jsonSchema == null) { + if (getJsonSchema() == null) { checkArgument( - createDisposition == CreateDisposition.CREATE_NEVER, + getCreateDisposition() == CreateDisposition.CREATE_NEVER, "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null."); } checkArgument( - writeDisposition != WriteDisposition.WRITE_TRUNCATE, + getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE, "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or" + " when using a tablespec function."); } else { @@ -1778,7 +1693,7 @@ public class BigQueryIO { checkArgument( !Strings.isNullOrEmpty(tempLocation), "BigQueryIO.Write needs a GCS temp location to store temp files."); - if (bigQueryServices == null) { + if (bqServices == null) { try { GcsPath.fromUri(tempLocation); } catch (IllegalArgumentException e) { @@ -1796,17 +1711,18 @@ public class BigQueryIO { public PDone expand(PCollection<TableRow> input) { Pipeline p = input.getPipeline(); BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); - BigQueryServices bqServices = getBigQueryServices(); + BigQueryServices bqServices = + MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl()); // When writing an Unbounded PCollection, or when a tablespec function is defined, we use // StreamWithDeDup and BigQuery's streaming import API. - if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) { + if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) { return input.apply( - new StreamWithDeDup(getTable(), tableRefFunction, - jsonSchema == null ? null : NestedValueProvider.of( - jsonSchema, new JsonSchemaToTableSchema()), - createDisposition, - tableDescription, + new StreamWithDeDup(getTable(), getTableRefFunction(), + getJsonSchema() == null ? null : NestedValueProvider.of( + getJsonSchema(), new JsonSchemaToTableSchema()), + getCreateDisposition(), + getTableDescription(), bqServices)); } @@ -1874,10 +1790,10 @@ public class BigQueryIO { jobIdTokenView, tempFilePrefix, NestedValueProvider.of(table, new TableRefToJson()), - jsonSchema, + getJsonSchema(), WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, - tableDescription)) + getTableDescription())) .withSideInputs(jobIdTokenView)); PCollectionView<Iterable<String>> tempTablesView = tempTables @@ -1887,10 +1803,10 @@ public class BigQueryIO { bqServices, jobIdTokenView, NestedValueProvider.of(table, new TableRefToJson()), - writeDisposition, - createDisposition, + getWriteDisposition(), + getCreateDisposition(), tempTablesView, - tableDescription)) + getTableDescription())) .withSideInputs(tempTablesView, jobIdTokenView)); // Write single partition to final table @@ -1902,10 +1818,10 @@ public class BigQueryIO { jobIdTokenView, tempFilePrefix, NestedValueProvider.of(table, new TableRefToJson()), - jsonSchema, - writeDisposition, - createDisposition, - tableDescription)) + getJsonSchema(), + getWriteDisposition(), + getCreateDisposition(), + getTableDescription())) .withSideInputs(jobIdTokenView)); return PDone.in(input.getPipeline()); @@ -1969,41 +1885,31 @@ public class BigQueryIO { super.populateDisplayData(builder); builder - .addIfNotNull(DisplayData.item("table", jsonTableRef) + .addIfNotNull(DisplayData.item("table", getJsonTableRef()) .withLabel("Table Reference")) - .addIfNotNull(DisplayData.item("schema", jsonSchema) + .addIfNotNull(DisplayData.item("schema", getJsonSchema()) .withLabel("Table Schema")); - if (tableRefFunction != null) { - builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()) + if (getTableRefFunction() != null) { + builder.add(DisplayData.item("tableFn", getTableRefFunction().getClass()) .withLabel("Table Reference Function")); } builder - .add(DisplayData.item("createDisposition", createDisposition.toString()) + .add(DisplayData.item("createDisposition", getCreateDisposition().toString()) .withLabel("Table CreateDisposition")) - .add(DisplayData.item("writeDisposition", writeDisposition.toString()) + .add(DisplayData.item("writeDisposition", getWriteDisposition().toString()) .withLabel("Table WriteDisposition")) - .addIfNotDefault(DisplayData.item("validation", validate) + .addIfNotDefault(DisplayData.item("validation", getValidate()) .withLabel("Validation Enabled"), true) - .addIfNotNull(DisplayData.item("tableDescription", tableDescription) + .addIfNotNull(DisplayData.item("tableDescription", getTableDescription()) .withLabel("Table Description")); } - /** Returns the create disposition. */ - public CreateDisposition getCreateDisposition() { - return createDisposition; - } - - /** Returns the write disposition. */ - public WriteDisposition getWriteDisposition() { - return writeDisposition; - } - /** Returns the table schema. */ public TableSchema getSchema() { return fromJsonString( - jsonSchema == null ? null : jsonSchema.get(), TableSchema.class); + getJsonSchema() == null ? null : getJsonSchema().get(), TableSchema.class); } /** @@ -2036,20 +1942,8 @@ public class BigQueryIO { /** Returns the table reference, or {@code null}. */ @Nullable public ValueProvider<TableReference> getTable() { - return jsonTableRef == null ? null : - NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef()); - } - - /** Returns {@code true} if table validation is enabled. */ - public boolean getValidate() { - return validate; - } - - private BigQueryServices getBigQueryServices() { - if (bigQueryServices == null) { - bigQueryServices = new BigQueryServicesImpl(); - } - return bigQueryServices; + return getJsonTableRef() == null ? null : + NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef()); } static class TableRowWriter { http://git-wip-us.apache.org/repos/asf/beam/blob/5c715896/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index fdaa81c..888d9c1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -712,10 +712,10 @@ public class BigQueryIOTest implements Serializable { assertEquals(dataset, write.getTable().get().getDatasetId()); assertEquals(table, write.getTable().get().getTableId()); assertEquals(schema, write.getSchema()); - assertEquals(createDisposition, write.createDisposition); - assertEquals(writeDisposition, write.writeDisposition); - assertEquals(tableDescription, write.tableDescription); - assertEquals(validate, write.validate); + assertEquals(createDisposition, write.getCreateDisposition()); + assertEquals(writeDisposition, write.getWriteDisposition()); + assertEquals(tableDescription, write.getTableDescription()); + assertEquals(validate, write.getValidate()); } @Before
