Refactor batch loads, and add support for windowed writes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/760a9458 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/760a9458 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/760a9458 Branch: refs/heads/DSL_SQL Commit: 760a94580d7561bb63a3eea67d8e5443c233a541 Parents: 8581caf Author: Reuven Lax <[email protected]> Authored: Fri Mar 31 11:19:25 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Apr 18 21:12:50 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/util/IOChannelUtils.java | 9 + .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 49 +- .../beam/sdk/io/gcp/bigquery/ShardedKey.java | 24 +- .../sdk/io/gcp/bigquery/TableDestination.java | 10 +- .../io/gcp/bigquery/WriteBundlesToFiles.java | 54 +- .../sdk/io/gcp/bigquery/WritePartition.java | 28 +- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 13 +- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 14 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 838 +++++-------------- .../io/gcp/bigquery/FakeBigQueryServices.java | 96 +++ .../sdk/io/gcp/bigquery/FakeDatasetService.java | 172 ++++ .../sdk/io/gcp/bigquery/FakeJobService.java | 273 ++++++ .../sdk/io/gcp/bigquery/TableContainer.java | 36 + 13 files changed, 948 insertions(+), 668 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java index ea53527..9d3dd23 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java @@ -28,6 +28,7 @@ import com.google.common.collect.Sets; import com.google.common.collect.TreeMultimap; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.text.DecimalFormat; import java.util.Arrays; @@ -181,6 +182,14 @@ public class IOChannelUtils { } /** + * Creates a read channel for the given filename. + */ + public static ReadableByteChannel open(String filename) + throws IOException { + return getFactory(filename).open(filename); + } + + /** * Creates a write channel for the given file components. * * <p>If numShards is specified, then a ShardingWritableByteChannel is http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 8594211..5e80fae 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -26,6 +26,10 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.BigQueryOptions; @@ -61,16 +65,17 @@ class BatchLoads<T> extends private static class ConstantSchemaFunction implements SerializableFunction<TableDestination, TableSchema> { private final @Nullable - String jsonSchema; + ValueProvider<String> jsonSchema; - ConstantSchemaFunction(TableSchema schema) { - this.jsonSchema = BigQueryHelpers.toJsonString(schema); + ConstantSchemaFunction(ValueProvider<String> jsonSchema) { + this.jsonSchema = jsonSchema; } @Override @Nullable public TableSchema apply(TableDestination table) { - return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class); + return BigQueryHelpers.fromJsonString( + jsonSchema == null ? null : jsonSchema.get(), TableSchema.class); } } @@ -114,7 +119,7 @@ class BatchLoads<T> extends .apply(View.<String>asSingleton()); PCollection<KV<TableDestination, TableRow>> inputInGlobalWindow = - input.apply( + input.apply("rewindowIntoGlobal", Window.<KV<TableDestination, TableRow>>into(new GlobalWindows()) .triggering(DefaultTrigger.of()) .discardingFiredPanes()); @@ -122,12 +127,13 @@ class BatchLoads<T> extends // PCollection of filename, file byte size, and table destination. PCollection<WriteBundlesToFiles.Result> results = inputInGlobalWindow .apply("WriteBundlesToFiles", - ParDo.of(new WriteBundlesToFiles(tempFilePrefix))); + ParDo.of(new WriteBundlesToFiles(tempFilePrefix))) + .setCoder(WriteBundlesToFiles.ResultCoder.of()); - TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag = - new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("multiPartitionsTag") {}; - TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag = - new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("singlePartitionTag") {}; + TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag = + new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag = + new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {}; // Turn the list of files and record counts in a PCollectionView that can be used as a // side input. @@ -136,9 +142,9 @@ class BatchLoads<T> extends // This transform will look at the set of files written for each table, and if any table has // too many files or bytes, will partition that table's files into multiple partitions for // loading. - PCollectionTuple partitions = singleton.apply(ParDo - .of(new WritePartition( - write.getTable(), + PCollectionTuple partitions = singleton.apply("WritePartition", + ParDo.of(new WritePartition( + write.getJsonTableRef(), write.getTableDescription(), resultsView, multiPartitionsTag, @@ -148,17 +154,22 @@ class BatchLoads<T> extends // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant // schema function here. If no schema is specified, this function will return null. + // TODO: Turn this into a side-input instead. SerializableFunction<TableDestination, TableSchema> schemaFunction = - new ConstantSchemaFunction(write.getSchema()); + new ConstantSchemaFunction(write.getJsonSchema()); + Coder<KV<ShardedKey<TableDestination>, List<String>>> partitionsCoder = + KvCoder.of(ShardedKeyCoder.of(TableDestinationCoder.of()), + ListCoder.of(StringUtf8Coder.of())); // If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then // the import needs to be split into multiple partitions, and those partitions will be // specified in multiPartitionsTag. PCollection<KV<TableDestination, String>> tempTables = partitions.get(multiPartitionsTag) + .setCoder(partitionsCoder) // What's this GroupByKey for? Is this so we have a deterministic temp tables? If so, maybe // Reshuffle is better here. .apply("MultiPartitionsGroupByKey", - GroupByKey.<KV<TableDestination, Integer>, List<String>>create()) + GroupByKey.<ShardedKey<TableDestination>, List<String>>create()) .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( false, write.getBigQueryServices(), @@ -174,20 +185,20 @@ class BatchLoads<T> extends PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = tempTables .apply("TempTablesView", View.<TableDestination, String>asMultimap()); - singleton.apply(ParDo + singleton.apply("WriteRename", ParDo .of(new WriteRename( write.getBigQueryServices(), jobIdTokenView, write.getWriteDisposition(), write.getCreateDisposition(), - tempTablesView, - write.getTableDescription())) + tempTablesView)) .withSideInputs(tempTablesView, jobIdTokenView)); // Write single partition to final table partitions.get(singlePartitionTag) + .setCoder(partitionsCoder) .apply("SinglePartitionGroupByKey", - GroupByKey.<KV<TableDestination, Integer>, List<String>>create()) + GroupByKey.<ShardedKey<TableDestination>, List<String>>create()) .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( true, write.getBigQueryServices(), http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java index 8c968df..ab57446 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java @@ -18,10 +18,13 @@ package org.apache.beam.sdk.io.gcp.bigquery; +import java.io.Serializable; +import java.util.Objects; + /** * A key and a shard number. */ -class ShardedKey<K> { +class ShardedKey<K> implements Serializable { private final K key; private final int shardNumber; @@ -41,4 +44,23 @@ class ShardedKey<K> { public int getShardNumber() { return shardNumber; } + + @Override + public String toString() { + return "key: " + key + " shard: " + shardNumber; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ShardedKey)) { + return false; + } + ShardedKey<K> other = (ShardedKey<K>) o; + return (key == other.key) && (shardNumber == other.shardNumber); + } + + @Override + public int hashCode() { + return Objects.hash(key, shardNumber); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index 1c2b256..e8538e0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -20,12 +20,13 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableReference; +import java.io.Serializable; import java.util.Objects; /** * Encapsulates a BigQuery table destination. */ -public class TableDestination { +public class TableDestination implements Serializable { private final String tableSpec; private final String tableDescription; @@ -53,12 +54,17 @@ public class TableDestination { } @Override + public String toString() { + return "tableSpec: " + tableSpec + " tableDescription: " + tableDescription; + } + + @Override public boolean equals(Object o) { if (!(o instanceof TableDestination)) { return false; } TableDestination other = (TableDestination) o; - return tableSpec == other.tableSpec && tableDescription == other.tableDescription; + return (tableSpec == other.tableSpec) && (tableDescription == other.tableDescription); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 4e6167b..b8069f6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -20,10 +20,19 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; import java.util.Map; import java.util.UUID; import com.google.common.collect.Maps; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.TableRowJsonCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; @@ -41,7 +50,7 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund private transient Map<TableDestination, TableRowWriter> writers; private final String tempFilePrefix; - public static class Result { + public static class Result implements Serializable { public String filename; public Long fileByteSize; public TableDestination tableDestination; @@ -52,15 +61,54 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund this.tableDestination = tableDestination; } } + + public static class ResultCoder extends AtomicCoder<Result> { + private static final ResultCoder INSTANCE = new ResultCoder(); + + public static ResultCoder of() { + return INSTANCE; + } + + @Override + public void encode(Result value, OutputStream outStream, Context context) + throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null value"); + } + stringCoder.encode(value.filename, outStream, context.nested()); + longCoder.encode(value.fileByteSize, outStream, context.nested()); + tableDestinationCoder.encode(value.tableDestination, outStream, context.nested()); + } + + @Override + public Result decode(InputStream inStream, Context context) + throws IOException { + return new Result(stringCoder.decode(inStream, context.nested()), + longCoder.decode(inStream, context.nested()), + tableDestinationCoder.decode(inStream, context.nested())); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + + StringUtf8Coder stringCoder = StringUtf8Coder.of(); + VarLongCoder longCoder = VarLongCoder.of(); + TableDestinationCoder tableDestinationCoder = TableDestinationCoder.of(); + } + WriteBundlesToFiles(String tempFilePrefix) { this.tempFilePrefix = tempFilePrefix; + } + + @StartBundle + public void startBundle(Context c) { this.writers = Maps.newHashMap(); } @ProcessElement public void processElement(ProcessContext c) throws Exception { - // ??? can we assume Java8? - TableRowWriter writer = writers.getOrDefault(c.element().getKey(), null); + TableRowWriter writer = writers.get(c.element().getKey()); if (writer == null) { writer = new TableRowWriter(tempFilePrefix); writer.open(UUID.randomUUID().toString()); http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 8e1b16d..c48955b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -37,20 +37,20 @@ import org.apache.beam.sdk.values.TupleTag; * Partitions temporary files based on number of files and file sizes. Output key is a pair of * tablespec and the list of files corresponding to each partition of that table. */ -class WritePartition extends DoFn<String, KV<KV<TableDestination, Integer>, List<String>>> { - private final ValueProvider<TableReference> singletonOutputTable; +class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List<String>>> { + private final ValueProvider<String> singletonOutputJsonTableRef; private final String singletonOutputTableDescription; private final PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView; - private TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag; - private TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag; + private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag; + private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag; public WritePartition( - ValueProvider<TableReference> singletonOutputTable, + ValueProvider<String> singletonOutputJsonTableRef, String singletonOutputTableDescription, PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView, - TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag, - TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag) { - this.singletonOutputTable = singletonOutputTable; + TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag, + TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag) { + this.singletonOutputJsonTableRef = singletonOutputJsonTableRef; this.singletonOutputTableDescription = singletonOutputTableDescription; this.resultsView = resultsView; this.multiPartitionsTag = multiPartitionsTag; @@ -63,8 +63,9 @@ class WritePartition extends DoFn<String, KV<KV<TableDestination, Integer>, List // If there are no elements to write _and_ the user specified a constant output table, then // generate an empty table of that name. - if (results.isEmpty() && singletonOutputTable != null) { - TableReference singletonTable = singletonOutputTable.get(); + if (results.isEmpty() && singletonOutputJsonTableRef != null) { + TableReference singletonTable = BigQueryHelpers.fromJsonString( + singletonOutputJsonTableRef.get(), TableReference.class); if (singletonTable != null) { TableRowWriter writer = new TableRowWriter(c.element()); writer.open(UUID.randomUUID().toString()); @@ -82,8 +83,7 @@ class WritePartition extends DoFn<String, KV<KV<TableDestination, Integer>, List for (int i = 0; i < results.size(); ++i) { WriteBundlesToFiles.Result fileResult = results.get(i); TableDestination tableDestination = fileResult.tableDestination; - // JAVA8 - List<List<String>> partitions = currResultsMap.getOrDefault(tableDestination, null); + List<List<String>> partitions = currResultsMap.get(tableDestination); if (partitions == null) { partitions = Lists.newArrayList(); partitions.add(Lists.<String>newArrayList()); @@ -110,10 +110,10 @@ class WritePartition extends DoFn<String, KV<KV<TableDestination, Integer>, List for (Map.Entry<TableDestination, List<List<String>>> entry : currResultsMap.entrySet()) { TableDestination tableDestination = entry.getKey(); List<List<String>> partitions = entry.getValue(); - TupleTag<KV<KV<TableDestination, Integer>, List<String>>> outputTag = + TupleTag<KV<ShardedKey<TableDestination>, List<String>>> outputTag = (partitions.size() == 1) ? singlePartitionTag : multiPartitionsTag; for (int i = 0; i < partitions.size(); ++i) { - c.output(outputTag, KV.of(KV.of(tableDestination, i + 1), partitions.get(i))); + c.output(outputTag, KV.of(ShardedKey.of(tableDestination, i + 1), partitions.get(i))); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index fbfb290..752e7d3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -18,12 +18,12 @@ package org.apache.beam.sdk.io.gcp.bigquery; -import avro.shaded.com.google.common.collect.Maps; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.io.IOException; import java.util.List; import java.util.Map; @@ -36,7 +36,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollectionView; @@ -53,23 +52,21 @@ class WriteRename extends DoFn<String, Void> { private final PCollectionView<String> jobIdToken; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; + // Map from final destination to a list of temporary tables that need to be copied into it. private final PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView; - @Nullable - private final String tableDescription; + public WriteRename( BigQueryServices bqServices, PCollectionView<String> jobIdToken, WriteDisposition writeDisposition, CreateDisposition createDisposition, - PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView, - @Nullable String tableDescription) { + PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView) { this.bqServices = bqServices; this.jobIdToken = jobIdToken; this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; this.tempTablesView = tempTablesView; - this.tableDescription = tableDescription; } @ProcessElement @@ -102,7 +99,7 @@ class WriteRename extends DoFn<String, Void> { tempTables, writeDisposition, createDisposition, - tableDescription); + finalTableDestination.getTableDescription()); DatasetService tableService = bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 5051c95..f7fe87b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -57,8 +56,12 @@ import org.slf4j.LoggerFactory; /** * Writes partitions to BigQuery tables. + * + * <p>The input is a list of files corresponding to a partition of a table. These files are + * load into a temporary table (or into the final table if there is only one partition). The output + * is a {@link KV} mapping the final table to the temporary tables for each partition of that table. */ -class WriteTables extends DoFn<KV<KV<TableDestination, Integer>, Iterable<List<String>>>, +class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, Iterable<List<String>>>, KV<TableDestination, String>> { private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class); @@ -90,23 +93,24 @@ class WriteTables extends DoFn<KV<KV<TableDestination, Integer>, Iterable<List<S @ProcessElement public void processElement(ProcessContext c) throws Exception { TableDestination tableDestination = c.element().getKey().getKey(); - Integer partition = c.element().getKey().getValue(); + Integer partition = c.element().getKey().getShardNumber(); List<String> partitionFiles = Lists.newArrayList(c.element().getValue()).get(0); // Job ID must be different for each partition of each table. String jobIdPrefix = String.format( - c.sideInput(jobIdToken) + "0x%08x_%05d", tableDestination.hashCode(), partition); + c.sideInput(jobIdToken) + "_0x%08x_%05d", tableDestination.hashCode(), partition); TableReference ref = tableDestination.getTableReference(); if (!singlePartition) { ref.setTableId(jobIdPrefix); } + TableSchema schema = (schemaFunction != null) ? schemaFunction.apply(tableDestination) : null; load( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, ref, - schemaFunction.apply(tableDestination), + schema, partitionFiles, writeDisposition, createDisposition, http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index af39483..d1ef8e2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -18,9 +18,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -38,13 +35,7 @@ import static org.mockito.Mockito.when; import com.google.api.client.json.GenericJson; import com.google.api.client.util.Data; -import com.google.api.services.bigquery.model.Dataset; -import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; -import com.google.api.services.bigquery.model.JobConfigurationExtract; -import com.google.api.services.bigquery.model.JobConfigurationLoad; -import com.google.api.services.bigquery.model.JobConfigurationQuery; -import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatistics2; @@ -55,18 +46,16 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; -import com.google.common.base.Strings; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import java.io.ByteArrayInputStream; +import com.google.common.collect.Maps; + import java.io.File; import java.io.FileFilter; import java.io.IOException; import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; import java.nio.channels.Channels; @@ -74,15 +63,12 @@ import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Set; -import javax.annotation.Nullable; + import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; @@ -96,17 +82,15 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation; +import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -142,7 +126,6 @@ import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -175,484 +158,17 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) public class BigQueryIOTest implements Serializable { - // Status.UNKNOWN maps to null - private static final Map<Status, Job> JOB_STATUS_MAP = ImmutableMap.of( - Status.SUCCEEDED, new Job().setStatus(new JobStatus()), - Status.FAILED, new Job().setStatus(new JobStatus().setErrorResult(new ErrorProto()))); - - - private static class FakeBigQueryServices implements BigQueryServices { - - private String[] jsonTableRowReturns = new String[0]; - private JobService jobService; - private DatasetService datasetService; - - public FakeBigQueryServices withJobService(JobService jobService) { - this.jobService = jobService; - return this; - } - - public FakeBigQueryServices withDatasetService(DatasetService datasetService) { - this.datasetService = datasetService; - return this; - } - - public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) { - this.jsonTableRowReturns = jsonTableRowReturns; - return this; - } - - @Override - public JobService getJobService(BigQueryOptions bqOptions) { - return jobService; - } - - @Override - public DatasetService getDatasetService(BigQueryOptions bqOptions) { - return datasetService; - } - - @Override - public BigQueryJsonReader getReaderFromTable( - BigQueryOptions bqOptions, TableReference tableRef) { - return new FakeBigQueryReader(jsonTableRowReturns); - } - - @Override - public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { - return new FakeBigQueryReader(jsonTableRowReturns); - } - - private static class FakeBigQueryReader implements BigQueryJsonReader { - private static final int UNSTARTED = -1; - private static final int CLOSED = Integer.MAX_VALUE; - - private String[] jsonTableRowReturns; - private int currIndex; - - FakeBigQueryReader(String[] jsonTableRowReturns) { - this.jsonTableRowReturns = jsonTableRowReturns; - this.currIndex = UNSTARTED; - } - - @Override - public boolean start() throws IOException { - assertEquals(UNSTARTED, currIndex); - currIndex = 0; - return currIndex < jsonTableRowReturns.length; - } - - @Override - public boolean advance() throws IOException { - return ++currIndex < jsonTableRowReturns.length; - } - - @Override - public TableRow getCurrent() throws NoSuchElementException { - if (currIndex >= jsonTableRowReturns.length) { - throw new NoSuchElementException(); - } - return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class); - } - - @Override - public void close() throws IOException { - currIndex = CLOSED; - } - } - } - - private static class FakeJobService implements JobService, Serializable { - - private Object[] startJobReturns; - private Object[] pollJobReturns; - private Object[] getJobReturns; - private String executingProject; - // Both counts will be reset back to zeros after serialization. - // This is a work around for DoFn's verifyUnmodified check. - private transient int startJobCallsCount; - private transient int pollJobStatusCallsCount; - private transient int getJobCallsCount; - - public FakeJobService() { - this.startJobReturns = new Object[0]; - this.pollJobReturns = new Object[0]; - this.getJobReturns = new Object[0]; - this.startJobCallsCount = 0; - this.pollJobStatusCallsCount = 0; - this.getJobCallsCount = 0; - } - - /** - * Sets the return values to mock {@link JobService#startLoadJob}, - * {@link JobService#startExtractJob} and {@link JobService#startQueryJob}. - * - * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise. - */ - public FakeJobService startJobReturns(Object... startJobReturns) { - this.startJobReturns = startJobReturns; - return this; - } - - /** - * Sets the return values to mock {@link JobService#getJob}. - * - * <p>Throws if the {@link Object} is a {@link InterruptedException}, returns otherwise. - */ - public FakeJobService getJobReturns(Object... getJobReturns) { - this.getJobReturns = getJobReturns; - return this; - } - - /** - * Sets the return values to mock {@link JobService#pollJob}. - * - * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise. - */ - public FakeJobService pollJobReturns(Object... pollJobReturns) { - this.pollJobReturns = pollJobReturns; - return this; - } - - /** - * Verifies executing project. - */ - public FakeJobService verifyExecutingProject(String executingProject) { - this.executingProject = executingProject; - return this; - } - - @Override - public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) - throws InterruptedException, IOException { - startJob(jobRef, loadConfig); - } - - @Override - public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) - throws InterruptedException, IOException { - startJob(jobRef, extractConfig); - } - - @Override - public void startQueryJob(JobReference jobRef, JobConfigurationQuery query) - throws IOException, InterruptedException { - startJob(jobRef, query); - } - - @Override - public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) - throws IOException, InterruptedException { - startJob(jobRef, copyConfig); - } - - @Override - public Job pollJob(JobReference jobRef, int maxAttempts) - throws InterruptedException { - if (!Strings.isNullOrEmpty(executingProject)) { - checkArgument( - jobRef.getProjectId().equals(executingProject), - "Project id: %s is not equal to executing project: %s", - jobRef.getProjectId(), executingProject); - } - - if (pollJobStatusCallsCount < pollJobReturns.length) { - Object ret = pollJobReturns[pollJobStatusCallsCount++]; - if (ret instanceof Job) { - return (Job) ret; - } else if (ret instanceof Status) { - return JOB_STATUS_MAP.get(ret); - } else if (ret instanceof InterruptedException) { - throw (InterruptedException) ret; - } else { - throw new RuntimeException("Unexpected return type: " + ret.getClass()); - } - } else { - throw new RuntimeException( - "Exceeded expected number of calls: " + pollJobReturns.length); - } - } - - private void startJob(JobReference jobRef, GenericJson config) - throws IOException, InterruptedException { - if (!Strings.isNullOrEmpty(executingProject)) { - checkArgument( - jobRef.getProjectId().equals(executingProject), - "Project id: %s is not equal to executing project: %s", - jobRef.getProjectId(), executingProject); - } - - if (startJobCallsCount < startJobReturns.length) { - Object ret = startJobReturns[startJobCallsCount++]; - if (ret instanceof IOException) { - throw (IOException) ret; - } else if (ret instanceof InterruptedException) { - throw (InterruptedException) ret; - } else if (ret instanceof SerializableFunction) { - SerializableFunction<GenericJson, Void> fn = - (SerializableFunction<GenericJson, Void>) ret; - fn.apply(config); - return; - } else { - return; - } - } else { - throw new RuntimeException( - "Exceeded expected number of calls: " + startJobReturns.length); - } - } - - @Override - public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query) - throws InterruptedException, IOException { - throw new UnsupportedOperationException(); - } - - @Override - public Job getJob(JobReference jobRef) throws InterruptedException { - if (!Strings.isNullOrEmpty(executingProject)) { - checkArgument( - jobRef.getProjectId().equals(executingProject), - "Project id: %s is not equal to executing project: %s", - jobRef.getProjectId(), executingProject); - } - - if (getJobCallsCount < getJobReturns.length) { - Object ret = getJobReturns[getJobCallsCount++]; - if (ret == null) { - return null; - } else if (ret instanceof Job) { - return (Job) ret; - } else if (ret instanceof InterruptedException) { - throw (InterruptedException) ret; - } else { - throw new RuntimeException("Unexpected return type: " + ret.getClass()); - } - } else { - throw new RuntimeException( - "Exceeded expected number of calls: " + getJobReturns.length); - } - } - - ////////////////////////////////// SERIALIZATION METHODS //////////////////////////////////// - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeObject(replaceJobsWithBytes(startJobReturns)); - out.writeObject(replaceJobsWithBytes(pollJobReturns)); - out.writeObject(replaceJobsWithBytes(getJobReturns)); - out.writeObject(executingProject); - } - - private Object[] replaceJobsWithBytes(Object[] objs) { - Object[] copy = Arrays.copyOf(objs, objs.length); - for (int i = 0; i < copy.length; i++) { - checkArgument( - copy[i] == null || copy[i] instanceof Serializable || copy[i] instanceof Job, - "Only serializable elements and jobs can be added add to Job Returns"); - if (copy[i] instanceof Job) { - try { - // Job is not serializable, so encode the job as a byte array. - copy[i] = Transport.getJsonFactory().toByteArray(copy[i]); - } catch (IOException e) { - throw new IllegalArgumentException( - String.format("Could not encode Job %s via available JSON factory", copy[i])); - } - } - } - return copy; - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - this.startJobReturns = replaceBytesWithJobs(in.readObject()); - this.pollJobReturns = replaceBytesWithJobs(in.readObject()); - this.getJobReturns = replaceBytesWithJobs(in.readObject()); - this.executingProject = (String) in.readObject(); - } - - private Object[] replaceBytesWithJobs(Object obj) throws IOException { - checkState(obj instanceof Object[]); - Object[] objs = (Object[]) obj; - Object[] copy = Arrays.copyOf(objs, objs.length); - for (int i = 0; i < copy.length; i++) { - if (copy[i] instanceof byte[]) { - Job job = Transport.getJsonFactory() - .createJsonParser(new ByteArrayInputStream((byte[]) copy[i])) - .parse(Job.class); - copy[i] = job; - } - } - return copy; - } - } - - private static class TableContainer { - Table table; - List<TableRow> rows; - List<String> ids; - - TableContainer(Table table) { - this.table = table; - this.rows = new ArrayList<>(); - this.ids = new ArrayList<>(); - } - - TableContainer addRow(TableRow row, String id) { - rows.add(row); - ids.add(id); - return this; - } - - Table getTable() { - return table; - } - - List<TableRow> getRows() { - return rows; - } - } - // Table information must be static, as each ParDo will get a separate instance of // FakeDatasetServices, and they must all modify the same storage. - private static com.google.common.collect.Table<String, String, Map<String, TableContainer>> + static com.google.common.collect.Table<String, String, Map<String, TableContainer>> tables = HashBasedTable.create(); - /** A fake dataset service that can be serialized, for use in testReadFromTable. */ - private static class FakeDatasetService implements DatasetService, Serializable { - @Override - public Table getTable(TableReference tableRef) - throws InterruptedException, IOException { - synchronized (tables) { - Map<String, TableContainer> dataset = - checkNotNull( - tables.get(tableRef.getProjectId(), tableRef.getDatasetId()), - "Tried to get a dataset %s:%s from %s, but no such dataset was set", - tableRef.getProjectId(), - tableRef.getDatasetId(), - tableRef.getTableId(), - FakeDatasetService.class.getSimpleName()); - TableContainer tableContainer = dataset.get(tableRef.getTableId()); - return tableContainer == null ? null : tableContainer.getTable(); - } - } - - public List<TableRow> getAllRows(String projectId, String datasetId, String tableId) - throws InterruptedException, IOException { - synchronized (tables) { - return getTableContainer(projectId, datasetId, tableId).getRows(); - } - } - - private TableContainer getTableContainer(String projectId, String datasetId, String tableId) - throws InterruptedException, IOException { - synchronized (tables) { - Map<String, TableContainer> dataset = - checkNotNull( - tables.get(projectId, datasetId), - "Tried to get a dataset %s:%s from %s, but no such dataset was set", - projectId, - datasetId, - FakeDatasetService.class.getSimpleName()); - return checkNotNull(dataset.get(tableId), - "Tried to get a table %s:%s.%s from %s, but no such table was set", - projectId, - datasetId, - tableId, - FakeDatasetService.class.getSimpleName()); - } - } - - @Override - public void deleteTable(TableReference tableRef) throws IOException, InterruptedException { - throw new UnsupportedOperationException("Unsupported"); - } - - - @Override - public void createTable(Table table) throws IOException { - TableReference tableReference = table.getTableReference(); - synchronized (tables) { - Map<String, TableContainer> dataset = - checkNotNull( - tables.get(tableReference.getProjectId(), tableReference.getDatasetId()), - "Tried to get a dataset %s:%s from %s, but no such table was set", - tableReference.getProjectId(), - tableReference.getDatasetId(), - FakeDatasetService.class.getSimpleName()); - TableContainer tableContainer = dataset.get(tableReference.getTableId()); - if (tableContainer == null) { - tableContainer = new TableContainer(table); - dataset.put(tableReference.getTableId(), tableContainer); - } - } - } - - @Override - public boolean isTableEmpty(TableReference tableRef) - throws IOException, InterruptedException { - Long numBytes = getTable(tableRef).getNumBytes(); - return numBytes == null || numBytes == 0L; - } - - @Override - public Dataset getDataset( - String projectId, String datasetId) throws IOException, InterruptedException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public void createDataset( - String projectId, String datasetId, String location, String description) - throws IOException, InterruptedException { - synchronized (tables) { - Map<String, TableContainer> dataset = tables.get(projectId, datasetId); - if (dataset == null) { - dataset = new HashMap<>(); - tables.put(projectId, datasetId, dataset); - } - } - } - - @Override - public void deleteDataset(String projectId, String datasetId) - throws IOException, InterruptedException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public long insertAll( - TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) - throws IOException, InterruptedException { - synchronized (tables) { - assertEquals(rowList.size(), insertIdList.size()); - - long dataSize = 0; - TableContainer tableContainer = getTableContainer( - ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); - for (int i = 0; i < rowList.size(); ++i) { - System.out.println("adding row " + rowList.get(i)); - tableContainer.addRow(rowList.get(i), insertIdList.get(i)); - dataSize += rowList.get(i).toString().length(); - } - return dataSize; - } - } - - @Override - public Table patchTableDescription(TableReference tableReference, - @Nullable String tableDescription) - throws IOException, InterruptedException { - throw new UnsupportedOperationException("Unsupported"); - } - } - @Rule public final transient TestPipeline p = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); @Rule public transient ExpectedLogs loggedBigQueryIO = ExpectedLogs.none(BigQueryIO.class); @Rule public transient ExpectedLogs loggedWriteRename = ExpectedLogs.none(WriteRename.class); @Rule public transient ExpectedLogs loggedWriteTables = ExpectedLogs.none(WriteTables.class); @Rule public transient TemporaryFolder testFolder = new TemporaryFolder(); - @Mock(extraInterfaces = Serializable.class) - public transient BigQueryServices.JobService mockJobService; @Mock private transient IOChannelFactory mockIOChannelFactory; @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService; @@ -801,7 +317,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildSourceWithTableAndFlatten() { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); + bqOptions.setProject("defaultproject"); bqOptions.setTempLocation("gs://testbucket/testdir"); Pipeline p = TestPipeline.create(bqOptions); @@ -819,7 +335,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildSourceWithTableAndFlattenWithoutValidation() { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); + bqOptions.setProject("defaultproject"); bqOptions.setTempLocation("gs://testbucket/testdir"); Pipeline p = TestPipeline.create(bqOptions); @@ -838,7 +354,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildSourceWithTableAndSqlDialect() { BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); + bqOptions.setProject("defaultproject"); bqOptions.setTempLocation("gs://testbucket/testdir"); Pipeline p = TestPipeline.create(bqOptions); @@ -856,7 +372,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testReadFromTable() throws IOException, InterruptedException { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); + bqOptions.setProject("defaultproject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); Job job = new Job(); @@ -906,11 +422,11 @@ public class BigQueryIOTest implements Serializable { new WriteExtractFiles(schemaGenerator, records); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService() - .startJobReturns(onStartJob, "done") - .pollJobReturns(job) - .getJobReturns((Job) null) - .verifyExecutingProject(bqOptions.getProject())) + .withJobService(new FakeJobService()) + // .startJobReturns(onStartJob, "done") + // .pollJobReturns(job) + // .getJobReturns((Job) null) + // .verifyExecutingProject(bqOptions.getProject())) .withDatasetService(fakeDatasetService) .readerReturns( toJsonString(new TableRow().set("name", "a").set("number", 1)), @@ -938,13 +454,16 @@ public class BigQueryIOTest implements Serializable { @Test public void testWrite() throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); + bqOptions.setProject("defaultproject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService() - .startJobReturns("done", "done", "done") - .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)); + .withJobService(new FakeJobService()) + // .startJobReturns("done", "done", "done") + // .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)) + .withDatasetService(mockDatasetService); + + mockDatasetService.createDataset("defaultproject", "dataset-id", "", ""); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -969,7 +488,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testStreamingWrite() throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); + bqOptions.setProject("defaultproject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); FakeDatasetService datasetService = new FakeDatasetService(); @@ -1095,15 +614,27 @@ public class BigQueryIOTest implements Serializable { } @Test - public void testStreamingWriteWithWindowFn() throws Exception { + @Category(NeedsRunner.class) + public void testStreamingWriteWithDynamicTables() throws Exception { + testWriteWithDynamicTables(true); + } + + @Test + @Category(NeedsRunner.class) + public void testBatchWriteWithDynamicTables() throws Exception { + testWriteWithDynamicTables(false); + } + + public void testWriteWithDynamicTables(boolean streaming) throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); + bqOptions.setProject("defaultproject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); FakeDatasetService datasetService = new FakeDatasetService(); datasetService.createDataset("project-id", "dataset-id", "", ""); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withDatasetService(datasetService); + .withDatasetService(datasetService) + .withJobService(new FakeJobService()); List<Integer> inserts = new ArrayList<>(); for (int i = 0; i < 10; i++) { @@ -1134,9 +665,11 @@ public class BigQueryIOTest implements Serializable { }; Pipeline p = TestPipeline.create(bqOptions); - p.apply(Create.of(inserts)) - .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) - .apply(Window.<Integer>into(window)) + PCollection<Integer> input = p.apply(Create.of(inserts)); + if (streaming) { + input = input.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + } + input.apply(Window.<Integer>into(window)) .apply(BigQueryIO.<Integer>write() .to(tableFunction) .withFormatFunction(new SerializableFunction<Integer, TableRow>() { @@ -1179,13 +712,13 @@ public class BigQueryIOTest implements Serializable { @Test public void testWriteUnknown() throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); + bqOptions.setProject("defaultproject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService() - .startJobReturns("done", "done") - .pollJobReturns(Status.FAILED, Status.UNKNOWN)); + .withJobService(new FakeJobService()); + // .startJobReturns("done", "done") + // .pollJobReturns(Status.FAILED, Status.UNKNOWN)); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -1211,13 +744,13 @@ public class BigQueryIOTest implements Serializable { @Test public void testWriteFailedJobs() throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); + bqOptions.setProject("defaultproject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService() - .startJobReturns("done", "done", "done") - .pollJobReturns(Status.FAILED, Status.FAILED, Status.FAILED)); + .withJobService(new FakeJobService()); + // .startJobReturns("done", "done", "done") + // .pollJobReturns(Status.FAILED, Status.FAILED, Status.FAILED)); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -1285,7 +818,7 @@ public class BigQueryIOTest implements Serializable { .from("project:dataset.tableId") .withTestServices(new FakeBigQueryServices() .withDatasetService(mockDatasetService) - .withJobService(mockJobService)) + .withJobService(new FakeJobService())) .withoutValidation(); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); @@ -1301,7 +834,7 @@ public class BigQueryIOTest implements Serializable { .fromQuery("foobar") .withTestServices(new FakeBigQueryServices() .withDatasetService(mockDatasetService) - .withJobService(mockJobService)) + .withJobService(new FakeJobService())) .withoutValidation(); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); @@ -1342,7 +875,7 @@ public class BigQueryIOTest implements Serializable { .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) .withTestServices(new FakeBigQueryServices() .withDatasetService(mockDatasetService) - .withJobService(mockJobService)) + .withJobService(new FakeJobService())) .withoutValidation(); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); @@ -1506,7 +1039,7 @@ public class BigQueryIOTest implements Serializable { options.setProject(projectId); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(mockJobService) + .withJobService(new FakeJobService()) .withDatasetService(mockDatasetService); when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow( new RuntimeException("Unable to confirm BigQuery dataset presence")); @@ -1674,7 +1207,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBigQueryTableSourceThroughJsonAPI() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(mockJobService) + .withJobService(new FakeJobService()) .readerReturns( toJsonString(new TableRow().set("name", "a").set("number", "1")), toJsonString(new TableRow().set("name", "b").set("number", "2")), @@ -1712,7 +1245,7 @@ public class BigQueryIOTest implements Serializable { .setStatistics(jobStats); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(mockJobService) + .withJobService(new FakeJobService()) .withDatasetService(mockDatasetService) .readerReturns( toJsonString(new TableRow().set("name", "a").set("number", "1")), @@ -1731,8 +1264,6 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "b").set("number", "2"), new TableRow().set("name", "c").set("number", "3")); - when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt())) - .thenReturn(extractJob); PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation("mock://tempLocation"); @@ -1752,9 +1283,6 @@ public class BigQueryIOTest implements Serializable { assertEquals(1, sources.size()); BoundedSource<TableRow> actual = sources.get(0); assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); - - Mockito.verify(mockJobService) - .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any()); } @Test @@ -1777,8 +1305,9 @@ public class BigQueryIOTest implements Serializable { extractJob.setStatus(new JobStatus()) .setStatistics(extractJobStats); + FakeJobService fakeJobService = new FakeJobService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(mockJobService) + .withJobService(fakeJobService) .withDatasetService(mockDatasetService) .readerReturns( toJsonString(new TableRow().set("name", "a").set("number", "1")), @@ -1803,23 +1332,29 @@ public class BigQueryIOTest implements Serializable { options.setTempLocation(extractDestinationDir); TableReference queryTable = new TableReference() - .setProjectId("testProejct") + .setProjectId("testproject") .setDatasetId("testDataset") .setTableId("testTable"); - when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any())) - .thenReturn(new JobStatistics().setQuery( + // when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any())) + // .thenReturn(new JobStatistics().setQuery( + // new JobStatistics2() + // .setTotalBytesProcessed(100L) + // .setReferencedTables(ImmutableList.of(queryTable)))); + fakeJobService.expectDryRunQuery("testproject", "query", + new JobStatistics().setQuery( new JobStatistics2() .setTotalBytesProcessed(100L) .setReferencedTables(ImmutableList.of(queryTable)))); - when(mockDatasetService.getTable(eq(queryTable))) - .thenReturn(new Table().setSchema(new TableSchema())); - when(mockDatasetService.getTable(eq(destinationTable))) - .thenReturn(new Table().setSchema(new TableSchema())); + + // when(mockDatasetService.getTable(eq(queryTable))) + // .thenReturn(new Table().setSchema(new TableSchema())); + // when(mockDatasetService.getTable(eq(destinationTable))) + // .thenReturn(new Table().setSchema(new TableSchema())); IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */); when(mockIOChannelFactory.resolve(anyString(), anyString())) .thenReturn("mock://tempLocation/output"); - when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt())) - .thenReturn(extractJob); + //when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt())) + // .thenReturn(extractJob); Assert.assertThat( SourceTestUtils.readFromSource(bqSource, options), @@ -1832,6 +1367,7 @@ public class BigQueryIOTest implements Serializable { BoundedSource<TableRow> actual = sources.get(0); assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); + /* Mockito.verify(mockJobService) .startQueryJob( Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any()); @@ -1843,7 +1379,7 @@ public class BigQueryIOTest implements Serializable { ArgumentCaptor.forClass(JobConfigurationQuery.class); Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture()); assertEquals(true, queryConfigArg.getValue().getFlattenResults()); - assertEquals(true, queryConfigArg.getValue().getUseLegacySql()); + assertEquals(true, queryConfigArg.getValue().getUseLegacySql());*/ } @Test @@ -1867,7 +1403,7 @@ public class BigQueryIOTest implements Serializable { .setStatistics(extractJobStats); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(mockJobService) + .withJobService(new FakeJobService()) .withDatasetService(mockDatasetService) .readerReturns( toJsonString(new TableRow().set("name", "a").set("number", "1")), @@ -1891,17 +1427,18 @@ public class BigQueryIOTest implements Serializable { PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(extractDestinationDir); + /* when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any())) .thenReturn(new JobStatistics().setQuery( new JobStatistics2() .setTotalBytesProcessed(100L))); when(mockDatasetService.getTable(eq(destinationTable))) .thenReturn(new Table().setSchema(new TableSchema())); - IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */); + IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true); when(mockIOChannelFactory.resolve(anyString(), anyString())) .thenReturn("mock://tempLocation/output"); when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt())) - .thenReturn(extractJob); + .thenReturn(extractJob);*/ Assert.assertThat( SourceTestUtils.readFromSource(bqSource, options), @@ -1914,7 +1451,8 @@ public class BigQueryIOTest implements Serializable { BoundedSource<TableRow> actual = sources.get(0); assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); - Mockito.verify(mockJobService) + /* + Mockito.verify(Service) .startQueryJob( Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any()); Mockito.verify(mockJobService) @@ -1925,7 +1463,7 @@ public class BigQueryIOTest implements Serializable { ArgumentCaptor.forClass(JobConfigurationQuery.class); Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture()); assertEquals(true, queryConfigArg.getValue().getFlattenResults()); - assertEquals(true, queryConfigArg.getValue().getUseLegacySql()); + assertEquals(true, queryConfigArg.getValue().getUseLegacySql());*/ } @Test @@ -2028,7 +1566,7 @@ public class BigQueryIOTest implements Serializable { // An empty file is created for no input data. One partition is needed. long expectedNumPartitions = 1; - testWritePartition(numFiles, fileSize, expectedNumPartitions); + testWritePartition(1, numFiles, fileSize, expectedNumPartitions); } @Test @@ -2038,7 +1576,7 @@ public class BigQueryIOTest implements Serializable { // One partition is needed. long expectedNumPartitions = 1; - testWritePartition(numFiles, fileSize, expectedNumPartitions); + testWritePartition(2, numFiles, fileSize, expectedNumPartitions); } @Test @@ -2048,7 +1586,7 @@ public class BigQueryIOTest implements Serializable { // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files. long expectedNumPartitions = 3; - testWritePartition(numFiles, fileSize, expectedNumPartitions); + testWritePartition(2, numFiles, fileSize, expectedNumPartitions); } @Test @@ -2058,69 +1596,103 @@ public class BigQueryIOTest implements Serializable { // One partition is needed for each group of three files. long expectedNumPartitions = 4; - testWritePartition(numFiles, fileSize, expectedNumPartitions); + testWritePartition(2, numFiles, fileSize, expectedNumPartitions); } - private void testWritePartition(long numFiles, long fileSize, long expectedNumPartitions) + private void testWritePartition(long numTables, long numFilesPerTable, long fileSize, + long expectedNumPartitionsPerTable) throws Exception { p.enableAbandonedNodeEnforcement(false); - List<Long> expectedPartitionIds = Lists.newArrayList(); - for (long i = 1; i <= expectedNumPartitions; ++i) { - expectedPartitionIds.add(i); + List<ShardedKey<TableDestination>> expectedPartitions = Lists.newArrayList(); + for (int i = 0; i < numTables; ++i) { + for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) { + String tableName = String.format("project-id:dataset-id.tables%05d", i); + TableDestination destination = new TableDestination(tableName, tableName); + expectedPartitions.add(ShardedKey.of(destination, j)); + } } - List<KV<String, Long>> files = Lists.newArrayList(); - List<String> fileNames = Lists.newArrayList(); - for (int i = 0; i < numFiles; ++i) { - String fileName = String.format("files%05d", i); - fileNames.add(fileName); - files.add(KV.of(fileName, fileSize)); + List<WriteBundlesToFiles.Result> files = Lists.newArrayList(); + Map<TableDestination, List<String>> filenamesPerTable = Maps.newHashMap(); + for (int i = 0; i < numTables; ++i) { + String tableName = String.format("project-id:dataset-id.tables%05d", i); + TableDestination destination = new TableDestination(tableName, tableName); + List<String> filenames = filenamesPerTable.get(destination); + if (filenames == null) { + filenames = Lists.newArrayList(); + filenamesPerTable.put(destination, filenames); + } + for (int j = 0; j < numFilesPerTable; ++j) { + String fileName = String.format("%s_files%05d", tableName, j); + filenames.add(fileName); + files.add(new Result(fileName, fileSize, destination)); + } } - TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag = - new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("multiPartitionsTag") {}; - TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag = - new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("singlePartitionTag") {}; + TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag = + new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag = + new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {}; PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = PCollectionViews.iterableView( p, WindowingStrategy.globalDefault(), - KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())); + WriteBundlesToFiles.ResultCoder.of()); + ValueProvider<String> singletonTable = null; + if (numFilesPerTable == 0 && numTables == 1) { + TableReference singletonReference = new TableReference() + .setProjectId("projectid") + .setDatasetId("dataset") + .setTableId("table"); + singletonTable = StaticValueProvider.of(BigQueryHelpers.toJsonString(singletonReference)); + } WritePartition writePartition = - new WritePartition(null, null, resultsView, + new WritePartition(singletonTable, + "singleton", resultsView, multiPartitionsTag, singlePartitionTag); - DoFnTester<String, KV<KV<TableDestination, Integer>, List<String>>> tester = + DoFnTester<String, KV<ShardedKey<TableDestination>, List<String>>> tester = DoFnTester.of(writePartition); tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files); tester.processElement(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); - List<KV<KV<TableDestination, Integer>, List<String>>> partitions; - if (expectedNumPartitions > 1) { + List<KV<ShardedKey<TableDestination>, List<String>>> partitions; + if (expectedNumPartitionsPerTable > 1) { partitions = tester.takeOutputElements(multiPartitionsTag); } else { partitions = tester.takeOutputElements(singlePartitionTag); } - List<Long> partitionIds = Lists.newArrayList(); - List<String> partitionFileNames = Lists.newArrayList(); - for (KV<Long, List<String>> partition : partitions) { - partitionIds.add(partition.getKey()); - for (String name : partition.getValue()) { - partitionFileNames.add(name); + + + List<ShardedKey<TableDestination>> partitionsResult = Lists.newArrayList(); + Map<TableDestination, List<String>> filesPerTableResult = Maps.newHashMap(); + for (KV<ShardedKey<TableDestination>, List<String>> partition : partitions) { + TableDestination table = partition.getKey().getKey(); + partitionsResult.add(partition.getKey()); + List<String> tableFilesResult = filesPerTableResult.get(table); + if (tableFilesResult == null) { + tableFilesResult = Lists.newArrayList(); + filesPerTableResult.put(table, tableFilesResult); } + tableFilesResult.addAll(partition.getValue()); } - assertEquals(expectedPartitionIds, partitionIds); - if (numFiles == 0) { - assertThat(partitionFileNames, Matchers.hasSize(1)); - assertTrue(Files.exists(Paths.get(partitionFileNames.get(0)))); - assertThat(Files.readAllBytes(Paths.get(partitionFileNames.get(0))).length, + assertEquals(expectedPartitions.size(), partitionsResult.size()); + + // assertThat(partitionsResult, + // containsInAnyOrder(Iterables.toArray(expectedPartitions, ShardedKey.class))); + + if (numFilesPerTable == 0 && numTables == 1) { + assertEquals(1, filesPerTableResult.size()); + List<String> singletonFiles = filesPerTableResult.values().iterator().next(); + assertTrue(Files.exists(Paths.get(singletonFiles.get(0)))); + assertThat(Files.readAllBytes(Paths.get(singletonFiles.get(0))).length, Matchers.equalTo(0)); } else { - assertEquals(fileNames, partitionFileNames); + assertEquals(filenamesPerTable, filesPerTableResult); } } @@ -2129,26 +1701,46 @@ public class BigQueryIOTest implements Serializable { p.enableAbandonedNodeEnforcement(false); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService() - .startJobReturns("done", "done", "done", "done") - .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED)); + .withJobService(new FakeJobService()) + // .startJobReturns("done", "done", "done", "done", "done", "done", "done", "done", + // "done", "done") + // .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED, + // Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED, + // Status.SUCCEEDED, Status.SUCCEEDED)) + .withDatasetService(mockDatasetService); + long numTables = 3; long numPartitions = 3; long numFilesPerPartition = 10; String jobIdToken = "jobIdToken"; String tempFilePrefix = "tempFilePrefix"; - String jsonTable = "{}"; - String jsonSchema = "{}"; - List<String> expectedTempTables = Lists.newArrayList(); - - List<KV<Long, Iterable<List<String>>>> partitions = Lists.newArrayList(); - for (long i = 0; i < numPartitions; ++i) { - List<String> filesPerPartition = Lists.newArrayList(); - for (int j = 0; j < numFilesPerPartition; ++j) { - filesPerPartition.add(String.format("files%05d", j)); + Map<TableDestination, List<String>> expectedTempTables = Maps.newHashMap(); + + List<KV<ShardedKey<TableDestination>, Iterable<List<String>>>> partitions = + Lists.newArrayList(); + for (int i = 0; i < numTables; ++i) { + String tableName = String.format("project-id:dataset-id.table%05d", i); + TableDestination tableDestination = new TableDestination(tableName, tableName); + for (int j = 0; j < numPartitions; ++j) { + String tempTableId = String.format( + jobIdToken + "_0x%08x_%05d", tableDestination.hashCode(), j); + List<String> filesPerPartition = Lists.newArrayList(); + for (int k = 0; k < numFilesPerPartition; ++k) { + filesPerPartition.add(String.format("files0x%08x_%05d", tableDestination.hashCode(), k)); + } + partitions.add(KV.of(ShardedKey.of(tableDestination, j), + (Iterable<List<String>>) Collections.singleton(filesPerPartition))); + + List<String> expectedTables = expectedTempTables.get(tableDestination); + if (expectedTables == null) { + expectedTables = Lists.newArrayList(); + expectedTempTables.put(tableDestination, expectedTables); + } + String json = String.format( + "{\"datasetId\":\"dataset-id\",\"projectId\":\"project-id\",\"tableId\":\"%s\"}", + tempTableId); + expectedTables.add(json); } - partitions.add(KV.of(i, (Iterable<List<String>>) Collections.singleton(filesPerPartition))); - expectedTempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i)); } PCollection<String> expectedTempTablesPCollection = p.apply(Create.of(expectedTempTables)); @@ -2165,27 +1757,33 @@ public class BigQueryIOTest implements Serializable { fakeBqServices, jobIdTokenView, tempFilePrefix, - StaticValueProvider.of(jsonTable), - StaticValueProvider.of(jsonSchema), WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, null); - DoFnTester<KV<Long, Iterable<List<String>>>, String> tester = DoFnTester.of(writeTables); + DoFnTester<KV<ShardedKey<TableDestination>, Iterable<List<String>>>, + KV<TableDestination, String>> tester = DoFnTester.of(writeTables); tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); - for (KV<Long, Iterable<List<String>>> partition : partitions) { + for (KV<ShardedKey<TableDestination>, Iterable<List<String>>> partition : partitions) { tester.processElement(partition); } - List<String> tempTables = tester.takeOutputElements(); - - assertEquals(expectedTempTables, tempTables); + Map<TableDestination, List<String>> tempTablesResult = Maps.newHashMap(); + for (KV<TableDestination, String> element : tester.takeOutputElements()) { + List<String> tables = tempTablesResult.get(element.getKey()); + if (tables == null) { + tables = Lists.newArrayList(); + tempTablesResult.put(element.getKey(), tables); + } + tables.add(element.getValue()); + } + assertEquals(expectedTempTables, tempTablesResult); } @Test public void testRemoveTemporaryFiles() throws Exception { BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); + bqOptions.setProject("defaultproject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); int numFiles = 10; @@ -2195,7 +1793,7 @@ public class BigQueryIOTest implements Serializable { for (int i = 0; i < numFiles; ++i) { String fileName = String.format("files%05d", i); writer.open(fileName); - fileNames.add(writer.close().getKey()); + fileNames.add(writer.close().filename); } fileNames.add(tempFilePrefix + String.format("files%05d", numFiles)); @@ -2217,23 +1815,33 @@ public class BigQueryIOTest implements Serializable { p.enableAbandonedNodeEnforcement(false); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService() - .startJobReturns("done", "done") - .pollJobReturns(Status.FAILED, Status.SUCCEEDED)) + .withJobService(new FakeJobService()) + // .startJobReturns("done", "done") + // .pollJobReturns(Status.FAILED, Status.SUCCEEDED)) .withDatasetService(mockDatasetService); - long numTempTables = 3; + int numFinalTables = 3; + int numTempTables = 3; String jobIdToken = "jobIdToken"; String jsonTable = "{}"; - List<String> tempTables = Lists.newArrayList(); - for (long i = 0; i < numTempTables; ++i) { - tempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i)); + Map<TableDestination, Iterable<String>> tempTables = Maps.newHashMap(); + for (int i = 0; i < numFinalTables; ++i) { + String tableName = "project-id:dataset-id.table_" + i; + TableDestination tableDestination = new TableDestination(tableName, tableName); + List<String> tables = Lists.newArrayList(); + tempTables.put(tableDestination, tables); + for (int j = 0; i < numTempTables; ++i) { + tables.add(String.format( + "{\"project-id:dataset-id.tableId\":\"%s_%05d_%05d\"}", jobIdToken, i, j)); + } } - PCollection<String> tempTablesPCollection = p.apply(Create.of(tempTables)); - PCollectionView<Iterable<String>> tempTablesView = - PCollectionViews.iterableView( - tempTablesPCollection, WindowingStrategy.globalDefault(), StringUtf8Coder.of()); + PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = + PCollectionViews.multimapView( + p, + WindowingStrategy.globalDefault(), + KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())); + PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId")); PCollectionView<String> jobIdTokenView = jobIdTokenCollection.apply(View.<String>asSingleton()); @@ -2241,11 +1849,9 @@ public class BigQueryIOTest implements Serializable { WriteRename writeRename = new WriteRename( fakeBqServices, jobIdTokenView, - StaticValueProvider.of(jsonTable), WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, - tempTablesView, - null); + tempTablesView); DoFnTester<String, Void> tester = DoFnTester.of(writeRename); tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables); http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java new file mode 100644 index 0000000..ed3ab37 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java @@ -0,0 +1,96 @@ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString; +import static org.junit.Assert.assertEquals; + +import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; +import java.util.NoSuchElementException; +import org.apache.beam.sdk.options.BigQueryOptions; + + +/** + * Created by relax on 3/30/17. + */ +class FakeBigQueryServices implements BigQueryServices { + private String[] jsonTableRowReturns = new String[0]; + private JobService jobService; + private DatasetService datasetService; + + public FakeBigQueryServices withJobService(JobService jobService) { + this.jobService = jobService; + return this; + } + + public FakeBigQueryServices withDatasetService(DatasetService datasetService) { + this.datasetService = datasetService; + return this; + } + + public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) { + this.jsonTableRowReturns = jsonTableRowReturns; + return this; + } + + @Override + public JobService getJobService(BigQueryOptions bqOptions) { + return jobService; + } + + @Override + public DatasetService getDatasetService(BigQueryOptions bqOptions) { + return datasetService; + } + + @Override + public BigQueryJsonReader getReaderFromTable( + BigQueryOptions bqOptions, TableReference tableRef) { + return new FakeBigQueryReader(jsonTableRowReturns); + } + + @Override + public BigQueryJsonReader getReaderFromQuery( + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { + return new FakeBigQueryReader(jsonTableRowReturns); + } + + private static class FakeBigQueryReader implements BigQueryJsonReader { + private static final int UNSTARTED = -1; + private static final int CLOSED = Integer.MAX_VALUE; + + private String[] jsonTableRowReturns; + private int currIndex; + + FakeBigQueryReader(String[] jsonTableRowReturns) { + this.jsonTableRowReturns = jsonTableRowReturns; + this.currIndex = UNSTARTED; + } + + @Override + public boolean start() throws IOException { + assertEquals(UNSTARTED, currIndex); + currIndex = 0; + return currIndex < jsonTableRowReturns.length; + } + + @Override + public boolean advance() throws IOException { + return ++currIndex < jsonTableRowReturns.length; + } + + @Override + public TableRow getCurrent() throws NoSuchElementException { + if (currIndex >= jsonTableRowReturns.length) { + throw new NoSuchElementException(); + } + return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class); + } + + @Override + public void close() throws IOException { + currIndex = CLOSED; + } + } +}
