Repository: beam Updated Branches: refs/heads/master 92021c5dc -> 2689ca43c
Fix BEAM-1301. Support table description in BigQuery IO. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e8810b8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e8810b8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e8810b8 Branch: refs/heads/master Commit: 3e8810b888b40b4b3a9548db8ffc457b16d41eab Parents: 92021c5 Author: Rafal Wojdyla <[email protected]> Authored: Mon Jan 23 18:14:15 2017 -0500 Committer: Rafal Wojdyla <[email protected]> Committed: Wed Feb 1 12:34:42 2017 -0500 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 128 ++++++++++++++----- .../sdk/io/gcp/bigquery/BigQueryServices.java | 4 + .../io/gcp/bigquery/BigQueryServicesImpl.java | 25 ++++ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 +- 4 files changed, 127 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3e8810b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index b15807e..5dbec54 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1632,6 +1632,11 @@ public class BigQueryIO { return new Bound().withWriteDisposition(disposition); } + /** Creates a write transformation with the specified table description. */ + public static Bound withTableDescription(@Nullable String tableDescription) { + return new Bound().withTableDescription(tableDescription); + } + /** * Creates a write transformation with BigQuery table validation disabled. */ @@ -1672,6 +1677,8 @@ public class BigQueryIO { // WRITE_APPEND and WRITE_EMPTY. final WriteDisposition writeDisposition; + @Nullable final String tableDescription; + // An option to indicate if table validation is desired. Default is true. final boolean validate; @@ -1708,6 +1715,7 @@ public class BigQueryIO { null /* jsonSchema */, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, + null /* tableDescription */, true /* validate */, null /* bigQueryServices */); } @@ -1715,7 +1723,10 @@ public class BigQueryIO { private Bound(String name, @Nullable ValueProvider<String> jsonTableRef, @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction, @Nullable ValueProvider<String> jsonSchema, - CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate, + CreateDisposition createDisposition, + WriteDisposition writeDisposition, + @Nullable String tableDescription, + boolean validate, @Nullable BigQueryServices bigQueryServices) { super(name); this.jsonTableRef = jsonTableRef; @@ -1723,6 +1734,7 @@ public class BigQueryIO { this.jsonSchema = jsonSchema; this.createDisposition = checkNotNull(createDisposition, "createDisposition"); this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); + this.tableDescription = tableDescription; this.validate = validate; this.bigQueryServices = bigQueryServices; } @@ -1766,7 +1778,7 @@ public class BigQueryIO { return new Bound(name, NestedValueProvider.of(table, new TableRefToJson()), tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, bigQueryServices); + writeDisposition, tableDescription, validate, bigQueryServices); } /** @@ -1795,7 +1807,7 @@ public class BigQueryIO { public Bound toTableReference( SerializableFunction<BoundedWindow, TableReference> tableRefFunction) { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, bigQueryServices); + writeDisposition, tableDescription, validate, bigQueryServices); } /** @@ -1807,7 +1819,7 @@ public class BigQueryIO { public Bound withSchema(TableSchema schema) { return new Bound(name, jsonTableRef, tableRefFunction, StaticValueProvider.of(toJsonString(schema)), - createDisposition, writeDisposition, validate, bigQueryServices); + createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); } /** @@ -1816,7 +1828,7 @@ public class BigQueryIO { public Bound withSchema(ValueProvider<TableSchema> schema) { return new Bound(name, jsonTableRef, tableRefFunction, NestedValueProvider.of(schema, new TableSchemaToJsonSchema()), - createDisposition, writeDisposition, validate, bigQueryServices); + createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); } /** @@ -1826,7 +1838,7 @@ public class BigQueryIO { */ public Bound withCreateDisposition(CreateDisposition createDisposition) { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, - createDisposition, writeDisposition, validate, bigQueryServices); + createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); } /** @@ -1836,7 +1848,17 @@ public class BigQueryIO { */ public Bound withWriteDisposition(WriteDisposition writeDisposition) { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, - createDisposition, writeDisposition, validate, bigQueryServices); + createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + } + + /** + * Returns a copy of this write transformation, but using the specified table description. + * + * <p>Does not modify this object. + */ + public Bound withTableDescription(@Nullable String tableDescription) { + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, + createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); } /** @@ -1846,13 +1868,13 @@ public class BigQueryIO { */ public Bound withoutValidation() { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, false, bigQueryServices); + writeDisposition, tableDescription, false, bigQueryServices); } @VisibleForTesting Bound withTestServices(BigQueryServices testServices) { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, testServices); + writeDisposition, tableDescription, validate, testServices); } private static void verifyTableNotExistOrEmpty( @@ -1962,7 +1984,9 @@ public class BigQueryIO { new StreamWithDeDup(getTable(), tableRefFunction, jsonSchema == null ? null : NestedValueProvider.of( jsonSchema, new JsonSchemaToTableSchema()), - createDisposition, bqServices)); + createDisposition, + tableDescription, + bqServices)); } ValueProvider<TableReference> table = getTableWithDefaultProject(options); @@ -2024,7 +2048,8 @@ public class BigQueryIO { NestedValueProvider.of(table, new TableRefToJson()), jsonSchema, WriteDisposition.WRITE_EMPTY, - CreateDisposition.CREATE_IF_NEEDED))); + CreateDisposition.CREATE_IF_NEEDED, + tableDescription))); PCollectionView<Iterable<String>> tempTablesView = tempTables .apply("TempTablesView", View.<String>asIterable()); @@ -2035,7 +2060,8 @@ public class BigQueryIO { NestedValueProvider.of(table, new TableRefToJson()), writeDisposition, createDisposition, - tempTablesView)) + tempTablesView, + tableDescription)) .withSideInputs(tempTablesView)); // Write single partition to final table @@ -2049,7 +2075,8 @@ public class BigQueryIO { NestedValueProvider.of(table, new TableRefToJson()), jsonSchema, writeDisposition, - createDisposition))); + createDisposition, + tableDescription))); return PDone.in(input.getPipeline()); } @@ -2128,7 +2155,9 @@ public class BigQueryIO { .add(DisplayData.item("writeDisposition", writeDisposition.toString()) .withLabel("Table WriteDisposition")) .addIfNotDefault(DisplayData.item("validation", validate) - .withLabel("Validation Enabled"), true); + .withLabel("Validation Enabled"), true) + .addIfNotNull(DisplayData.item("tableDescription", tableDescription) + .withLabel("Table Description")); } /** Returns the create disposition. */ @@ -2302,6 +2331,7 @@ public class BigQueryIO { private final ValueProvider<String> jsonSchema; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; + @Nullable private final String tableDescription; public WriteTables( boolean singlePartition, @@ -2311,7 +2341,8 @@ public class BigQueryIO { ValueProvider<String> jsonTableRef, ValueProvider<String> jsonSchema, WriteDisposition writeDisposition, - CreateDisposition createDisposition) { + CreateDisposition createDisposition, + @Nullable String tableDescription) { this.singlePartition = singlePartition; this.bqServices = bqServices; this.jobIdToken = jobIdToken; @@ -2320,6 +2351,7 @@ public class BigQueryIO { this.jsonSchema = jsonSchema; this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; + this.tableDescription = tableDescription; } @ProcessElement @@ -2333,13 +2365,15 @@ public class BigQueryIO { load( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, ref, fromJsonString( jsonSchema == null ? null : jsonSchema.get(), TableSchema.class), partition, writeDisposition, - createDisposition); + createDisposition, + tableDescription); c.output(toJsonString(ref)); removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition); @@ -2347,12 +2381,14 @@ public class BigQueryIO { private void load( JobService jobService, + DatasetService datasetService, String jobIdPrefix, TableReference ref, @Nullable TableSchema schema, List<String> gcsUris, WriteDisposition writeDisposition, - CreateDisposition createDisposition) throws InterruptedException, IOException { + CreateDisposition createDisposition, + @Nullable String tableDescription) throws InterruptedException, IOException { JobConfigurationLoad loadConfig = new JobConfigurationLoad() .setDestinationTable(ref) .setSchema(schema) @@ -2373,6 +2409,9 @@ public class BigQueryIO { Status jobStatus = parseStatus(loadJob); switch (jobStatus) { case SUCCEEDED: + if (tableDescription != null) { + datasetService.patchTableDescription(ref, tableDescription); + } return; case UNKNOWN: throw new RuntimeException(String.format( @@ -2428,7 +2467,9 @@ public class BigQueryIO { .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) .withLabel("Table Reference")) .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema) - .withLabel("Table Schema")); + .withLabel("Table Schema")) + .addIfNotNull(DisplayData.item("tableDescription", tableDescription) + .withLabel("Table Description")); } } @@ -2442,6 +2483,7 @@ public class BigQueryIO { private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; private final PCollectionView<Iterable<String>> tempTablesView; + @Nullable private final String tableDescription; public WriteRename( BigQueryServices bqServices, @@ -2449,13 +2491,15 @@ public class BigQueryIO { ValueProvider<String> jsonTableRef, WriteDisposition writeDisposition, CreateDisposition createDisposition, - PCollectionView<Iterable<String>> tempTablesView) { + PCollectionView<Iterable<String>> tempTablesView, + @Nullable String tableDescription) { this.bqServices = bqServices; this.jobIdToken = jobIdToken; this.jsonTableRef = jsonTableRef; this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; this.tempTablesView = tempTablesView; + this.tableDescription = tableDescription; } @ProcessElement @@ -2473,11 +2517,13 @@ public class BigQueryIO { } copy( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdToken.get(), fromJsonString(jsonTableRef.get(), TableReference.class), tempTables, writeDisposition, - createDisposition); + createDisposition, + tableDescription); DatasetService tableService = bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); @@ -2486,11 +2532,13 @@ public class BigQueryIO { private void copy( JobService jobService, + DatasetService datasetService, String jobIdPrefix, TableReference ref, List<TableReference> tempTables, WriteDisposition writeDisposition, - CreateDisposition createDisposition) throws InterruptedException, IOException { + CreateDisposition createDisposition, + @Nullable String tableDescription) throws InterruptedException, IOException { JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy() .setSourceTables(tempTables) .setDestinationTable(ref) @@ -2509,6 +2557,9 @@ public class BigQueryIO { Status jobStatus = parseStatus(copyJob); switch (jobStatus) { case SUCCEEDED: + if (tableDescription != null) { + datasetService.patchTableDescription(ref, tableDescription); + } return; case UNKNOWN: throw new RuntimeException(String.format( @@ -2628,6 +2679,8 @@ public class BigQueryIO { /** TableSchema in JSON. Use String to make the class Serializable. */ @Nullable private final ValueProvider<String> jsonTableSchema; + @Nullable private final String tableDescription; + private final BigQueryServices bqServices; /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */ @@ -2649,12 +2702,13 @@ public class BigQueryIO { /** Constructor. */ StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema, - Write.CreateDisposition createDisposition, - BigQueryServices bqServices) { + Write.CreateDisposition createDisposition, + @Nullable String tableDescription, BigQueryServices bqServices) { this.jsonTableSchema = schema == null ? null : NestedValueProvider.of(schema, new TableSchemaToJsonSchema()); this.createDisposition = createDisposition; this.bqServices = checkNotNull(bqServices, "bqServices"); + this.tableDescription = tableDescription; } /** @@ -2702,8 +2756,11 @@ public class BigQueryIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.addIfNotNull(DisplayData.item("schema", jsonTableSchema) - .withLabel("Table Schema")); + builder + .addIfNotNull(DisplayData.item("schema", jsonTableSchema) + .withLabel("Table Schema")) + .addIfNotNull(DisplayData.item("tableDescription", tableDescription) + .withLabel("Table Description")); } public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec) @@ -2721,7 +2778,10 @@ public class BigQueryIO { TableSchema tableSchema = JSON_FACTORY.fromString( jsonTableSchema.get(), TableSchema.class); datasetService.createTable( - new Table().setTableReference(tableReference).setSchema(tableSchema)); + new Table() + .setTableReference(tableReference) + .setSchema(tableSchema) + .setDescription(tableDescription)); } createdTables.add(tableSpec); } @@ -2967,18 +3027,20 @@ public class BigQueryIO { @Nullable private final transient ValueProvider<TableSchema> tableSchema; private final Write.CreateDisposition createDisposition; private final BigQueryServices bqServices; + @Nullable private final String tableDescription; /** Constructor. */ StreamWithDeDup(ValueProvider<TableReference> tableReference, - @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction, - @Nullable ValueProvider<TableSchema> tableSchema, - Write.CreateDisposition createDisposition, - BigQueryServices bqServices) { + @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction, + @Nullable ValueProvider<TableSchema> tableSchema, + Write.CreateDisposition createDisposition, + @Nullable String tableDescription, BigQueryServices bqServices) { this.tableReference = tableReference; this.tableRefFunction = tableRefFunction; this.tableSchema = tableSchema; this.createDisposition = createDisposition; this.bqServices = checkNotNull(bqServices, "bqServices"); + this.tableDescription = tableDescription; } @Override @@ -3009,7 +3071,11 @@ public class BigQueryIO { tagged .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of()) - .apply(ParDo.of(new StreamingWriteFn(tableSchema, createDisposition, bqServices))); + .apply(ParDo.of(new StreamingWriteFn( + tableSchema, + createDisposition, + tableDescription, + bqServices))); // Note that the implementation to return PDone here breaks the // implicit assumption about the job execution order. If a user http://git-wip-us.apache.org/repos/asf/beam/blob/3e8810b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index a85d16d..ebff6c1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -166,6 +166,10 @@ interface BigQueryServices extends Serializable { */ long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) throws IOException, InterruptedException; + + /** Patch BigQuery {@link Table} description. */ + Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription) + throws IOException, InterruptedException; } /** http://git-wip-us.apache.org/repos/asf/beam/blob/3e8810b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index b958c8d..15ca262 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -789,6 +789,31 @@ class BigQueryServicesImpl implements BigQueryServices { return insertAll( ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); } + + + @Override + public Table patchTableDescription(TableReference tableReference, + @Nullable String tableDescription) + throws IOException, InterruptedException { + Table table = new Table(); + table.setDescription(tableDescription); + + BackOff backoff = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); + return executeWithRetries( + client.tables().patch( + tableReference.getProjectId(), + tableReference.getDatasetId(), + tableReference.getTableId(), + table), + String.format( + "Unable to patch table description: %s, aborting after %d retries.", + tableReference, MAX_RPC_RETRIES), + Sleeper.DEFAULT, + backoff, + ALWAYS_RETRY); + } } private static class BigQueryJsonReaderImpl implements BigQueryJsonReader { http://git-wip-us.apache.org/repos/asf/beam/blob/3e8810b8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index c0ce027..8b1b6dd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1556,7 +1556,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testStreamingWriteFnCreateNever() throws Exception { BigQueryIO.StreamingWriteFn fn = new BigQueryIO.StreamingWriteFn( - null, CreateDisposition.CREATE_NEVER, new FakeBigQueryServices()); + null, CreateDisposition.CREATE_NEVER, null, new FakeBigQueryServices()); assertEquals(BigQueryIO.parseTableSpec("dataset.table"), fn.getOrCreateTable(null, "dataset.table")); }
