Repository: beam Updated Branches: refs/heads/gearpump-runner ebbb61390 -> 4078c22fd
Refactor batch load job path, and add support for data-dependent tables. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8581caf3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8581caf3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8581caf3 Branch: refs/heads/gearpump-runner Commit: 8581caf388ad688a0e79cfa154262d1e701dee10 Parents: 58ed5c7 Author: Reuven Lax <[email protected]> Authored: Wed Mar 29 07:34:10 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Apr 18 21:12:50 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/bigquery/BatchLoadBigQuery.java | 180 ---------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 203 +++++++++++++++++++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 3 +- .../sdk/io/gcp/bigquery/TableDestination.java | 17 +- .../sdk/io/gcp/bigquery/TableRowWriter.java | 12 +- .../beam/sdk/io/gcp/bigquery/WriteBundles.java | 82 -------- .../io/gcp/bigquery/WriteBundlesToFiles.java | 102 ++++++++++ .../sdk/io/gcp/bigquery/WritePartition.java | 95 ++++++--- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 63 +++--- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 47 ++--- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 27 +-- 11 files changed, 469 insertions(+), 362 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java deleted file mode 100644 index 160b231..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import java.io.IOException; -import java.util.List; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; -import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** - * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. - */ -class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, WriteResult> { - BigQueryIO.Write<T> write; - - BatchLoadBigQuery(BigQueryIO.Write<T> write) { - this.write = write; - } - - @Override - public WriteResult expand(PCollection<T> input) { - Pipeline p = input.getPipeline(); - BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); - ValueProvider<TableReference> table = write.getTableWithDefaultProject(options); - - final String stepUuid = BigQueryHelpers.randomUUIDString(); - - String tempLocation = options.getTempLocation(); - String tempFilePrefix; - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - tempFilePrefix = factory.resolve( - factory.resolve(tempLocation, "BigQueryWriteTemp"), - stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve BigQuery temp location in %s", tempLocation), - e); - } - - // Create a singleton job ID token at execution time. - PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix)); - PCollectionView<String> jobIdTokenView = p - .apply("TriggerIdCreation", Create.of("ignored")) - .apply("CreateJobId", MapElements.via( - new SimpleFunction<String, String>() { - @Override - public String apply(String input) { - return stepUuid; - } - })) - .apply(View.<String>asSingleton()); - - PCollection<T> typedInputInGlobalWindow = - input.apply( - Window.<T>into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()); - // Avoid applying the formatFunction if it is the identity formatter. - PCollection<TableRow> inputInGlobalWindow; - if (write.getFormatFunction() == BigQueryIO.IDENTITY_FORMATTER) { - inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow; - } else { - inputInGlobalWindow = - typedInputInGlobalWindow.apply( - MapElements.into(new TypeDescriptor<TableRow>() {}).via(write.getFormatFunction())); - } - - // PCollection of filename, file byte size. - PCollection<KV<String, Long>> results = inputInGlobalWindow - .apply("WriteBundles", - ParDo.of(new WriteBundles(tempFilePrefix))); - - TupleTag<KV<Long, List<String>>> multiPartitionsTag = - new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {}; - TupleTag<KV<Long, List<String>>> singlePartitionTag = - new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {}; - - // Turn the list of files and record counts in a PCollectionView that can be used as a - // side input. - PCollectionView<Iterable<KV<String, Long>>> resultsView = results - .apply("ResultsView", View.<KV<String, Long>>asIterable()); - PCollectionTuple partitions = singleton.apply(ParDo - .of(new WritePartition( - resultsView, - multiPartitionsTag, - singlePartitionTag)) - .withSideInputs(resultsView) - .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); - - // If WriteBundles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then - // the import needs to be split into multiple partitions, and those partitions will be - // specified in multiPartitionsTag. - PCollection<String> tempTables = partitions.get(multiPartitionsTag) - .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create()) - .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( - false, - write.getBigQueryServices(), - jobIdTokenView, - tempFilePrefix, - NestedValueProvider.of(table, new TableRefToJson()), - write.getJsonSchema(), - WriteDisposition.WRITE_EMPTY, - CreateDisposition.CREATE_IF_NEEDED, - write.getTableDescription())) - .withSideInputs(jobIdTokenView)); - - PCollectionView<Iterable<String>> tempTablesView = tempTables - .apply("TempTablesView", View.<String>asIterable()); - singleton.apply(ParDo - .of(new WriteRename( - write.getBigQueryServices(), - jobIdTokenView, - NestedValueProvider.of(table, new TableRefToJson()), - write.getWriteDisposition(), - write.getCreateDisposition(), - tempTablesView, - write.getTableDescription())) - .withSideInputs(tempTablesView, jobIdTokenView)); - - // Write single partition to final table - partitions.get(singlePartitionTag) - .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create()) - .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( - true, - write.getBigQueryServices(), - jobIdTokenView, - tempFilePrefix, - NestedValueProvider.of(table, new TableRefToJson()), - write.getJsonSchema(), - write.getWriteDisposition(), - write.getCreateDisposition(), - write.getTableDescription())) - .withSideInputs(jobIdTokenView)); - - return WriteResult.in(input.getPipeline()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java new file mode 100644 index 0000000..8594211 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + + +/** + * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. + */ +class BatchLoads<T> extends + PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> { + BigQueryIO.Write<T> write; + + private static class ConstantSchemaFunction implements + SerializableFunction<TableDestination, TableSchema> { + private final @Nullable + String jsonSchema; + + ConstantSchemaFunction(TableSchema schema) { + this.jsonSchema = BigQueryHelpers.toJsonString(schema); + } + + @Override + @Nullable + public TableSchema apply(TableDestination table) { + return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class); + } + } + + BatchLoads(BigQueryIO.Write<T> write) { + this.write = write; + } + + @Override + public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) { + Pipeline p = input.getPipeline(); + BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); + ValueProvider<TableReference> table = write.getTableWithDefaultProject(options); + + final String stepUuid = BigQueryHelpers.randomUUIDString(); + + String tempLocation = options.getTempLocation(); + String tempFilePrefix; + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + tempFilePrefix = factory.resolve( + factory.resolve(tempLocation, "BigQueryWriteTemp"), + stepUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve BigQuery temp location in %s", tempLocation), + e); + } + + // Create a singleton job ID token at execution time. This will be used as the base for all + // load jobs issued from this instance of the transfomr. + PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix)); + PCollectionView<String> jobIdTokenView = p + .apply("TriggerIdCreation", Create.of("ignored")) + .apply("CreateJobId", MapElements.via( + new SimpleFunction<String, String>() { + @Override + public String apply(String input) { + return stepUuid; + } + })) + .apply(View.<String>asSingleton()); + + PCollection<KV<TableDestination, TableRow>> inputInGlobalWindow = + input.apply( + Window.<KV<TableDestination, TableRow>>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + + // PCollection of filename, file byte size, and table destination. + PCollection<WriteBundlesToFiles.Result> results = inputInGlobalWindow + .apply("WriteBundlesToFiles", + ParDo.of(new WriteBundlesToFiles(tempFilePrefix))); + + TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag = + new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag = + new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("singlePartitionTag") {}; + + // Turn the list of files and record counts in a PCollectionView that can be used as a + // side input. + PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = results + .apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable()); + // This transform will look at the set of files written for each table, and if any table has + // too many files or bytes, will partition that table's files into multiple partitions for + // loading. + PCollectionTuple partitions = singleton.apply(ParDo + .of(new WritePartition( + write.getTable(), + write.getTableDescription(), + resultsView, + multiPartitionsTag, + singlePartitionTag)) + .withSideInputs(resultsView) + .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); + + // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant + // schema function here. If no schema is specified, this function will return null. + SerializableFunction<TableDestination, TableSchema> schemaFunction = + new ConstantSchemaFunction(write.getSchema()); + + // If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then + // the import needs to be split into multiple partitions, and those partitions will be + // specified in multiPartitionsTag. + PCollection<KV<TableDestination, String>> tempTables = partitions.get(multiPartitionsTag) + // What's this GroupByKey for? Is this so we have a deterministic temp tables? If so, maybe + // Reshuffle is better here. + .apply("MultiPartitionsGroupByKey", + GroupByKey.<KV<TableDestination, Integer>, List<String>>create()) + .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( + false, + write.getBigQueryServices(), + jobIdTokenView, + tempFilePrefix, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED, + schemaFunction)) + .withSideInputs(jobIdTokenView)); + + // This view maps each final table destination to the set of temporary partitioned tables + // the PCollection was loaded into. + PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = tempTables + .apply("TempTablesView", View.<TableDestination, String>asMultimap()); + + singleton.apply(ParDo + .of(new WriteRename( + write.getBigQueryServices(), + jobIdTokenView, + write.getWriteDisposition(), + write.getCreateDisposition(), + tempTablesView, + write.getTableDescription())) + .withSideInputs(tempTablesView, jobIdTokenView)); + + // Write single partition to final table + partitions.get(singlePartitionTag) + .apply("SinglePartitionGroupByKey", + GroupByKey.<KV<TableDestination, Integer>, List<String>>create()) + .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( + true, + write.getBigQueryServices(), + jobIdTokenView, + tempFilePrefix, + write.getWriteDisposition(), + write.getCreateDisposition(), + schemaFunction)) + .withSideInputs(jobIdTokenView)); + + return WriteResult.in(input.getPipeline()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index af19b83..f1baaf7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -984,7 +984,8 @@ public class BigQueryIO { if (input.isBounded() == IsBounded.UNBOUNDED) { return rowsWithDestination.apply(new StreamingInserts(this)); } else { - return input.apply(new BatchLoadBigQuery<T>(this)); + + return rowsWithDestination.apply(new BatchLoads<T>(this)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index 631afeb..1c2b256 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableReference; +import java.util.Objects; + /** * Encapsulates a BigQuery table destination. */ @@ -42,7 +44,6 @@ public class TableDestination { return tableSpec; } - public TableReference getTableReference() { return BigQueryHelpers.parseTableSpec(tableSpec); } @@ -50,4 +51,18 @@ public class TableDestination { public String getTableDescription() { return tableDescription; } + + @Override + public boolean equals(Object o) { + if (!(o instanceof TableDestination)) { + return false; + } + TableDestination other = (TableDestination) o; + return tableSpec == other.tableSpec && tableDescription == other.tableDescription; + } + + @Override + public int hashCode() { + return Objects.hash(tableSpec, tableDescription); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java index 014c498..a1f6153 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java @@ -48,6 +48,14 @@ class TableRowWriter { protected String mimeType = MimeTypes.TEXT; private CountingOutputStream out; + public class Result { + String filename; + long byteSize; + public Result(String filename, long byteSize) { + this.filename = filename; + this.byteSize = byteSize; + } + } TableRowWriter(String basename) { this.tempFilePrefix = basename; } @@ -77,8 +85,8 @@ class TableRowWriter { out.write(NEWLINE); } - public final KV<String, Long> close() throws IOException { + public final Result close() throws IOException { channel.close(); - return KV.of(fileName, out.getCount()); + return new Result(fileName, out.getCount()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java deleted file mode 100644 index 6219226..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.api.services.bigquery.model.TableRow; -import java.util.UUID; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.KV; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Writes each bundle of {@link TableRow} elements out to a separate file using - * {@link TableRowWriter}. - */ -class WriteBundles extends DoFn<TableRow, KV<String, Long>> { - private static final Logger LOG = LoggerFactory.getLogger(WriteBundles.class); - - private transient TableRowWriter writer = null; - private final String tempFilePrefix; - - WriteBundles(String tempFilePrefix) { - this.tempFilePrefix = tempFilePrefix; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - if (writer == null) { - writer = new TableRowWriter(tempFilePrefix); - writer.open(UUID.randomUUID().toString()); - LOG.debug("Done opening writer {}", writer); - } - try { - writer.write(c.element()); - } catch (Exception e) { - // Discard write result and close the write. - try { - writer.close(); - // The writer does not need to be reset, as this DoFn cannot be reused. - } catch (Exception closeException) { - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); - } - throw e; - } - } - - @FinishBundle - public void finishBundle(Context c) throws Exception { - if (writer != null) { - c.output(writer.close()); - writer = null; - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder - .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) - .withLabel("Temporary File Prefix")); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java new file mode 100644 index 0000000..4e6167b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; + +import java.util.Map; +import java.util.UUID; + +import com.google.common.collect.Maps; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Writes each bundle of {@link TableRow} elements out to a separate file using + * {@link TableRowWriter}. + */ +class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBundlesToFiles.Result> { + private static final Logger LOG = LoggerFactory.getLogger(WriteBundlesToFiles.class); + + // Map from tablespec to a writer for that table. + private transient Map<TableDestination, TableRowWriter> writers; + private final String tempFilePrefix; + + public static class Result { + public String filename; + public Long fileByteSize; + public TableDestination tableDestination; + + public Result(String filename, Long fileByteSize, TableDestination tableDestination) { + this.filename = filename; + this.fileByteSize = fileByteSize; + this.tableDestination = tableDestination; + } + } + WriteBundlesToFiles(String tempFilePrefix) { + this.tempFilePrefix = tempFilePrefix; + this.writers = Maps.newHashMap(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + // ??? can we assume Java8? + TableRowWriter writer = writers.getOrDefault(c.element().getKey(), null); + if (writer == null) { + writer = new TableRowWriter(tempFilePrefix); + writer.open(UUID.randomUUID().toString()); + writers.put(c.element().getKey(), writer); + LOG.debug("Done opening writer {}", writer); + } + try { + writer.write(c.element().getValue()); + } catch (Exception e) { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this DoFn cannot be reused. + } catch (Exception closeException) { + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + for (Map.Entry<TableDestination, TableRowWriter> entry : writers.entrySet()) { + TableRowWriter.Result result = entry.getValue().close(); + c.output(new Result(result.filename, result.byteSize, entry.getKey())); + } + writers.clear(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) + .withLabel("Temporary File Prefix")); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 1b6492e..8e1b16d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -18,27 +18,40 @@ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.api.services.bigquery.model.TableReference; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.util.List; +import java.util.Map; import java.util.UUID; + import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; /** - * Partitions temporary files based on number of files and file sizes. + * Partitions temporary files based on number of files and file sizes. Output key is a pair of + * tablespec and the list of files corresponding to each partition of that table. */ -class WritePartition extends DoFn<String, KV<Long, List<String>>> { - private final PCollectionView<Iterable<KV<String, Long>>> resultsView; - private TupleTag<KV<Long, List<String>>> multiPartitionsTag; - private TupleTag<KV<Long, List<String>>> singlePartitionTag; +class WritePartition extends DoFn<String, KV<KV<TableDestination, Integer>, List<String>>> { + private final ValueProvider<TableReference> singletonOutputTable; + private final String singletonOutputTableDescription; + private final PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView; + private TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag; + private TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag; public WritePartition( - PCollectionView<Iterable<KV<String, Long>>> resultsView, - TupleTag<KV<Long, List<String>>> multiPartitionsTag, - TupleTag<KV<Long, List<String>>> singlePartitionTag) { + ValueProvider<TableReference> singletonOutputTable, + String singletonOutputTableDescription, + PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView, + TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag, + TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag) { + this.singletonOutputTable = singletonOutputTable; + this.singletonOutputTableDescription = singletonOutputTableDescription; this.resultsView = resultsView; this.multiPartitionsTag = multiPartitionsTag; this.singlePartitionTag = singlePartitionTag; @@ -46,34 +59,62 @@ class WritePartition extends DoFn<String, KV<Long, List<String>>> { @ProcessElement public void processElement(ProcessContext c) throws Exception { - List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView)); - if (results.isEmpty()) { - TableRowWriter writer = new TableRowWriter(c.element()); - writer.open(UUID.randomUUID().toString()); - results.add(writer.close()); + List<WriteBundlesToFiles.Result> results = Lists.newArrayList(c.sideInput(resultsView)); + + // If there are no elements to write _and_ the user specified a constant output table, then + // generate an empty table of that name. + if (results.isEmpty() && singletonOutputTable != null) { + TableReference singletonTable = singletonOutputTable.get(); + if (singletonTable != null) { + TableRowWriter writer = new TableRowWriter(c.element()); + writer.open(UUID.randomUUID().toString()); + TableRowWriter.Result writerResult = writer.close(); + results.add(new Result(writerResult.filename, writerResult.byteSize, + new TableDestination(singletonTable, singletonOutputTableDescription))); + } } + long partitionId = 0; - int currNumFiles = 0; - long currSizeBytes = 0; - List<String> currResults = Lists.newArrayList(); + Map<TableDestination, Integer> currNumFilesMap = Maps.newHashMap(); + Map<TableDestination, Long> currSizeBytesMap = Maps.newHashMap(); + Map<TableDestination, List<List<String>>> currResultsMap = Maps.newHashMap(); for (int i = 0; i < results.size(); ++i) { - KV<String, Long> fileResult = results.get(i); + WriteBundlesToFiles.Result fileResult = results.get(i); + TableDestination tableDestination = fileResult.tableDestination; + // JAVA8 + List<List<String>> partitions = currResultsMap.getOrDefault(tableDestination, null); + if (partitions == null) { + partitions = Lists.newArrayList(); + partitions.add(Lists.<String>newArrayList()); + currResultsMap.put(tableDestination, partitions); + } + int currNumFiles = currNumFilesMap.getOrDefault(tableDestination, 0); + long currSizeBytes = currSizeBytesMap.getOrDefault(tableDestination, 0L); if (currNumFiles + 1 > Write.MAX_NUM_FILES - || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) { - c.output(multiPartitionsTag, KV.of(++partitionId, currResults)); - currResults = Lists.newArrayList(); + || currSizeBytes + fileResult.fileByteSize > Write.MAX_SIZE_BYTES) { + // Add a new partition for this table. + partitions.add(Lists.<String>newArrayList()); + // c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); currNumFiles = 0; currSizeBytes = 0; + currNumFilesMap.remove(tableDestination); + currSizeBytesMap.remove(tableDestination); } - ++currNumFiles; - currSizeBytes += fileResult.getValue(); - currResults.add(fileResult.getKey()); + currNumFilesMap.put(tableDestination, currNumFiles + 1); + currSizeBytesMap.put(tableDestination, currSizeBytes + fileResult.fileByteSize); + // Always add to the most recent partition for this table. + partitions.get(partitions.size() - 1).add(fileResult.filename); } - if (partitionId == 0) { - c.output(singlePartitionTag, KV.of(++partitionId, currResults)); - } else { - c.output(multiPartitionsTag, KV.of(++partitionId, currResults)); + + for (Map.Entry<TableDestination, List<List<String>>> entry : currResultsMap.entrySet()) { + TableDestination tableDestination = entry.getKey(); + List<List<String>> partitions = entry.getValue(); + TupleTag<KV<KV<TableDestination, Integer>, List<String>>> outputTag = + (partitions.size() == 1) ? singlePartitionTag : multiPartitionsTag; + for (int i = 0; i < partitions.size(); ++i) { + c.output(outputTag, KV.of(KV.of(tableDestination, i + 1), partitions.get(i))); + } } } } http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 8cb9439..fbfb290 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; +import avro.shaded.com.google.common.collect.Maps; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; @@ -25,6 +26,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.common.collect.Lists; import java.io.IOException; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; @@ -49,24 +51,21 @@ class WriteRename extends DoFn<String, Void> { private final BigQueryServices bqServices; private final PCollectionView<String> jobIdToken; - private final ValueProvider<String> jsonTableRef; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; - private final PCollectionView<Iterable<String>> tempTablesView; + private final PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView; @Nullable private final String tableDescription; public WriteRename( BigQueryServices bqServices, PCollectionView<String> jobIdToken, - ValueProvider<String> jsonTableRef, WriteDisposition writeDisposition, CreateDisposition createDisposition, - PCollectionView<Iterable<String>> tempTablesView, + PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView, @Nullable String tableDescription) { this.bqServices = bqServices; this.jobIdToken = jobIdToken; - this.jsonTableRef = jsonTableRef; this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; this.tempTablesView = tempTablesView; @@ -75,30 +74,40 @@ class WriteRename extends DoFn<String, Void> { @ProcessElement public void processElement(ProcessContext c) throws Exception { - List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView)); + Map<TableDestination, Iterable<String>> tempTablesMap = + Maps.newHashMap(c.sideInput(tempTablesView)); - // Do not copy if no temp tables are provided - if (tempTablesJson.size() == 0) { - return; - } + // Process each destination table. + for (Map.Entry<TableDestination, Iterable<String>> entry : tempTablesMap.entrySet()) { + TableDestination finalTableDestination = entry.getKey(); + List<String> tempTablesJson = Lists.newArrayList(entry.getValue()); + // Do not copy if no temp tables are provided + if (tempTablesJson.size() == 0) { + return; + } + + List<TableReference> tempTables = Lists.newArrayList(); + for (String table : tempTablesJson) { + tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class)); + } + + // Make sure each destination table gets a unique job id. + String jobIdPrefix = String.format( + c.sideInput(jobIdToken) + "0x%08x", finalTableDestination.hashCode()); + copy( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + finalTableDestination.getTableReference(), + tempTables, + writeDisposition, + createDisposition, + tableDescription); - List<TableReference> tempTables = Lists.newArrayList(); - for (String table : tempTablesJson) { - tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class)); + DatasetService tableService = + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); + removeTemporaryTables(tableService, tempTables); } - copy( - bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), - c.sideInput(jobIdToken), - BigQueryHelpers.fromJsonString(jsonTableRef.get(), TableReference.class), - tempTables, - writeDisposition, - createDisposition, - tableDescription); - - DatasetService tableService = - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); - removeTemporaryTables(tableService, tempTables); } private void copy( @@ -170,8 +179,6 @@ class WriteRename extends DoFn<String, Void> { super.populateDisplayData(builder); builder - .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) - .withLabel("Table Reference")) .add(DisplayData.item("writeDisposition", writeDisposition.toString()) .withLabel("Write Disposition")) .add(DisplayData.item("createDisposition", createDisposition.toString()) http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 29680ad..5051c95 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.FileIOChannelFactory; import org.apache.beam.sdk.util.GcsIOChannelFactory; @@ -57,48 +58,45 @@ import org.slf4j.LoggerFactory; /** * Writes partitions to BigQuery tables. */ -class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> { +class WriteTables extends DoFn<KV<KV<TableDestination, Integer>, Iterable<List<String>>>, + KV<TableDestination, String>> { private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class); private final boolean singlePartition; private final BigQueryServices bqServices; private final PCollectionView<String> jobIdToken; private final String tempFilePrefix; - private final ValueProvider<String> jsonTableRef; - private final ValueProvider<String> jsonSchema; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; - @Nullable - private final String tableDescription; + private final SerializableFunction<TableDestination, TableSchema> schemaFunction; public WriteTables( boolean singlePartition, BigQueryServices bqServices, PCollectionView<String> jobIdToken, String tempFilePrefix, - ValueProvider<String> jsonTableRef, - ValueProvider<String> jsonSchema, WriteDisposition writeDisposition, CreateDisposition createDisposition, - @Nullable String tableDescription) { + SerializableFunction<TableDestination, TableSchema> schemaFunction) { this.singlePartition = singlePartition; this.bqServices = bqServices; this.jobIdToken = jobIdToken; this.tempFilePrefix = tempFilePrefix; - this.jsonTableRef = jsonTableRef; - this.jsonSchema = jsonSchema; this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; - this.tableDescription = tableDescription; + this.schemaFunction = schemaFunction; } @ProcessElement public void processElement(ProcessContext c) throws Exception { - List<String> partition = Lists.newArrayList(c.element().getValue()).get(0); + TableDestination tableDestination = c.element().getKey().getKey(); + Integer partition = c.element().getKey().getValue(); + List<String> partitionFiles = Lists.newArrayList(c.element().getValue()).get(0); + // Job ID must be different for each partition of each table. String jobIdPrefix = String.format( - c.sideInput(jobIdToken) + "_%05d", c.element().getKey()); - TableReference ref = BigQueryHelpers.fromJsonString(jsonTableRef.get(), - TableReference.class); + c.sideInput(jobIdToken) + "0x%08x_%05d", tableDestination.hashCode(), partition); + + TableReference ref = tableDestination.getTableReference(); if (!singlePartition) { ref.setTableId(jobIdPrefix); } @@ -108,15 +106,14 @@ class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> { bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, ref, - BigQueryHelpers.fromJsonString( - jsonSchema == null ? null : jsonSchema.get(), TableSchema.class), - partition, + schemaFunction.apply(tableDestination), + partitionFiles, writeDisposition, createDisposition, - tableDescription); - c.output(BigQueryHelpers.toJsonString(ref)); + tableDestination.getTableDescription()); + c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(ref))); - removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition); + removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partitionFiles); } private void load( @@ -202,12 +199,6 @@ class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> { builder .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) - .withLabel("Temporary File Prefix")) - .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) - .withLabel("Table Reference")) - .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema) - .withLabel("Table Schema")) - .addIfNotNull(DisplayData.item("tableDescription", tableDescription) - .withLabel("Table Description")); + .withLabel("Temporary File Prefix")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index d953edd..af39483 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -2078,26 +2078,27 @@ public class BigQueryIOTest implements Serializable { files.add(KV.of(fileName, fileSize)); } - TupleTag<KV<Long, List<String>>> multiPartitionsTag = - new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {}; - TupleTag<KV<Long, List<String>>> singlePartitionTag = - new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {}; - - PCollection<KV<String, Long>> filesPCollection = - p.apply(Create.of(files).withType(new TypeDescriptor<KV<String, Long>>() {})); - PCollectionView<Iterable<KV<String, Long>>> filesView = PCollectionViews.iterableView( - filesPCollection, + TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag = + new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag = + new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("singlePartitionTag") {}; + + PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = + PCollectionViews.iterableView( + p, WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())); WritePartition writePartition = - new WritePartition(filesView, multiPartitionsTag, singlePartitionTag); + new WritePartition(null, null, resultsView, + multiPartitionsTag, singlePartitionTag); - DoFnTester<String, KV<Long, List<String>>> tester = DoFnTester.of(writePartition); - tester.setSideInput(filesView, GlobalWindow.INSTANCE, files); + DoFnTester<String, KV<KV<TableDestination, Integer>, List<String>>> tester = + DoFnTester.of(writePartition); + tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files); tester.processElement(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); - List<KV<Long, List<String>>> partitions; + List<KV<KV<TableDestination, Integer>, List<String>>> partitions; if (expectedNumPartitions > 1) { partitions = tester.takeOutputElements(multiPartitionsTag); } else {
