[BEAM-1071] Allow for BigQueryIO to write tables with CREATE_NEVER disposition
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc369522 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc369522 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc369522 Branch: refs/heads/python-sdk Commit: dc369522d1cfa46ae9058919d93229de05db2b6a Parents: 11c3cd7 Author: Sam McVeety <[email protected]> Authored: Mon Dec 12 18:47:20 2016 -0800 Committer: Dan Halperin <[email protected]> Committed: Tue Jan 24 14:41:39 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 51 ++++++++++++++------ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 36 ++++++++++++++ 2 files changed, 71 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/dc369522/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index aff199a..fa49f55 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1925,10 +1925,17 @@ public class BigQueryIO { if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) { // We will use BigQuery's streaming write API -- validate supported dispositions. - checkArgument( - createDisposition != CreateDisposition.CREATE_NEVER, - "CreateDisposition.CREATE_NEVER is not supported for an unbounded PCollection or when" - + " using a tablespec function."); + if (tableRefFunction != null) { + checkArgument( + createDisposition != CreateDisposition.CREATE_NEVER, + "CreateDisposition.CREATE_NEVER is not supported when using a tablespec" + + " function."); + } + if (jsonSchema == null) { + checkArgument( + createDisposition == CreateDisposition.CREATE_NEVER, + "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null."); + } checkArgument( writeDisposition != WriteDisposition.WRITE_TRUNCATE, @@ -1965,7 +1972,9 @@ public class BigQueryIO { if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) { return input.apply( new StreamWithDeDup(getTable(), tableRefFunction, - NestedValueProvider.of(jsonSchema, new JsonSchemaToTableSchema()), bqServices)); + jsonSchema == null ? null : NestedValueProvider.of( + jsonSchema, new JsonSchemaToTableSchema()), + createDisposition, bqServices)); } ValueProvider<TableReference> table = getTableWithDefaultProject(options); @@ -2608,16 +2617,19 @@ public class BigQueryIO { * Implementation of DoFn to perform streaming BigQuery write. */ @SystemDoFnInternal - private static class StreamingWriteFn + @VisibleForTesting + static class StreamingWriteFn extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> { /** TableSchema in JSON. Use String to make the class Serializable. */ - private final ValueProvider<String> jsonTableSchema; + @Nullable private final ValueProvider<String> jsonTableSchema; private final BigQueryServices bqServices; /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */ private transient Map<String, List<TableRow>> tableRows; + private final Write.CreateDisposition createDisposition; + /** The list of unique ids for each BigQuery table row. */ private transient Map<String, List<String>> uniqueIdsForTableRows; @@ -2631,9 +2643,12 @@ public class BigQueryIO { createAggregator("ByteCount", Sum.ofLongs()); /** Constructor. */ - StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices bqServices) { - this.jsonTableSchema = + StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema, + Write.CreateDisposition createDisposition, + BigQueryServices bqServices) { + this.jsonTableSchema = schema == null ? null : NestedValueProvider.of(schema, new TableSchemaToJsonSchema()); + this.createDisposition = createDisposition; this.bqServices = checkNotNull(bqServices, "bqServices"); } @@ -2689,7 +2704,8 @@ public class BigQueryIO { public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec) throws InterruptedException, IOException { TableReference tableReference = parseTableSpec(tableSpec); - if (!createdTables.contains(tableSpec)) { + if (createDisposition != createDisposition.CREATE_NEVER + && !createdTables.contains(tableSpec)) { synchronized (createdTables) { // Another thread may have succeeded in creating the table in the meanwhile, so // check again. This check isn't needed for correctness, but we add it to prevent @@ -2945,19 +2961,22 @@ public class BigQueryIO { * it leverages BigQuery best effort de-dup mechanism. */ private static class StreamWithDeDup extends PTransform<PCollection<TableRow>, PDone> { - private final transient ValueProvider<TableReference> tableReference; - private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction; - private final transient ValueProvider<TableSchema> tableSchema; + @Nullable private final transient ValueProvider<TableReference> tableReference; + @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction; + @Nullable private final transient ValueProvider<TableSchema> tableSchema; + private final Write.CreateDisposition createDisposition; private final BigQueryServices bqServices; /** Constructor. */ StreamWithDeDup(ValueProvider<TableReference> tableReference, - SerializableFunction<BoundedWindow, TableReference> tableRefFunction, - ValueProvider<TableSchema> tableSchema, + @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction, + @Nullable ValueProvider<TableSchema> tableSchema, + Write.CreateDisposition createDisposition, BigQueryServices bqServices) { this.tableReference = tableReference; this.tableRefFunction = tableRefFunction; this.tableSchema = tableSchema; + this.createDisposition = createDisposition; this.bqServices = checkNotNull(bqServices, "bqServices"); } @@ -2989,7 +3008,7 @@ public class BigQueryIO { tagged .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of()) - .apply(ParDo.of(new StreamingWriteFn(tableSchema, bqServices))); + .apply(ParDo.of(new StreamingWriteFn(tableSchema, createDisposition, bqServices))); // Note that the implementation to return PDone here breaks the // implicit assumption about the job execution order. If a user http://git-wip-us.apache.org/repos/asf/beam/blob/dc369522/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 3e8c2c9..ba7f44e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1523,6 +1523,42 @@ public class BigQueryIOTest implements Serializable { } @Test + public void testStreamingWriteFnCreateNever() throws Exception { + BigQueryIO.StreamingWriteFn fn = new BigQueryIO.StreamingWriteFn( + null, CreateDisposition.CREATE_NEVER, new FakeBigQueryServices()); + assertEquals(BigQueryIO.parseTableSpec("dataset.table"), + fn.getOrCreateTable(null, "dataset.table")); + } + + @Test + public void testCreateNeverWithStreaming() throws Exception { + BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + options.setProject("project"); + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + + TableReference tableRef = new TableReference(); + tableRef.setDatasetId("dataset"); + tableRef.setTableId("sometable"); + + PCollection<TableRow> tableRows = + p.apply(CountingInput.unbounded()) + .apply( + MapElements.via( + new SimpleFunction<Long, TableRow>() { + @Override + public TableRow apply(Long input) { + return null; + } + })) + .setCoder(TableRowJsonCoder.of()); + tableRows + .apply(BigQueryIO.Write.to(tableRef) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withoutValidation()); + } + + @Test public void testTableParsing() { TableReference ref = BigQueryIO .parseTableSpec("my-project:data_set.table_name");
