Repository: beam Updated Branches: refs/heads/master 0cba43ee2 -> 09d75a0b0
[BEAM-2406] Fix NullPointerException when writing an empty table Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52df03af Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52df03af Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52df03af Branch: refs/heads/master Commit: 52df03af64fd611c76fc56be89d2a677138dae80 Parents: 0cba43e Author: Reuven Lax <[email protected]> Authored: Fri Jun 2 18:29:59 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Mon Jun 5 10:00:03 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 1 + .../io/gcp/bigquery/WriteBundlesToFiles.java | 2 + .../sdk/io/gcp/bigquery/WritePartition.java | 7 ++- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 45 +++++++++++--------- 4 files changed, 32 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/52df03af/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 3686f99..c1b202e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -257,6 +257,7 @@ class BatchLoads<DestinationT> ParDo.of( new WritePartition<>( singletonTable, + dynamicDestinations, tempFilePrefix, resultsView, multiPartitionsTag, http://git-wip-us.apache.org/repos/asf/beam/blob/52df03af/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index f014039..0b5f54b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; +import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import com.google.api.services.bigquery.model.TableRow; @@ -79,6 +80,7 @@ class WriteBundlesToFiles<DestinationT> public final DestinationT destination; public Result(String filename, Long fileByteSize, DestinationT destination) { + checkNotNull(destination); this.filename = filename; this.fileByteSize = fileByteSize; this.destination = destination; http://git-wip-us.apache.org/repos/asf/beam/blob/52df03af/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 24693da..acd1132 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.TupleTag; class WritePartition<DestinationT> extends DoFn<Void, KV<ShardedKey<DestinationT>, List<String>>> { private final boolean singletonTable; + private final DynamicDestinations<?, DestinationT> dynamicDestinations; private final PCollectionView<String> tempFilePrefix; private final PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results; private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag; @@ -100,11 +101,13 @@ class WritePartition<DestinationT> WritePartition( boolean singletonTable, + DynamicDestinations<?, DestinationT> dynamicDestinations, PCollectionView<String> tempFilePrefix, PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results, TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag, TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag) { this.singletonTable = singletonTable; + this.dynamicDestinations = dynamicDestinations; this.results = results; this.tempFilePrefix = tempFilePrefix; this.multiPartitionsTag = multiPartitionsTag; @@ -126,8 +129,8 @@ class WritePartition<DestinationT> // Return a null destination in this case - the constant DynamicDestinations class will // resolve it to the singleton output table. results.add( - new Result<DestinationT>( - writerResult.resourceId.toString(), writerResult.byteSize, null)); + new Result<>(writerResult.resourceId.toString(), writerResult.byteSize, + dynamicDestinations.getDestination(null))); } Map<DestinationT, DestinationData> currentResults = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/beam/blob/52df03af/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 04bbac4..bfd260a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1797,20 +1797,23 @@ public class BigQueryIOTest implements Serializable { // function) and there is no input data, WritePartition will generate an empty table. This // code is to test that path. boolean isSingleton = numTables == 1 && numFilesPerTable == 0; - - List<ShardedKey<String>> expectedPartitions = Lists.newArrayList(); + DynamicDestinations<String, TableDestination> dynamicDestinations = + new DynamicDestinationsHelpers.ConstantTableDestinations<>( + StaticValueProvider.of("SINGLETON"), ""); + List<ShardedKey<TableDestination>> expectedPartitions = Lists.newArrayList(); if (isSingleton) { - expectedPartitions.add(ShardedKey.<String>of(null, 1)); + expectedPartitions.add(ShardedKey.<TableDestination>of( + new TableDestination("SINGLETON", ""), 1)); } else { for (int i = 0; i < numTables; ++i) { for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) { String tableName = String.format("project-id:dataset-id.tables%05d", i); - expectedPartitions.add(ShardedKey.of(tableName, j)); + expectedPartitions.add(ShardedKey.of(new TableDestination(tableName, ""), j)); } } } - List<WriteBundlesToFiles.Result<String>> files = Lists.newArrayList(); + List<WriteBundlesToFiles.Result<TableDestination>> files = Lists.newArrayList(); Map<String, List<String>> filenamesPerTable = Maps.newHashMap(); for (int i = 0; i < numTables; ++i) { String tableName = String.format("project-id:dataset-id.tables%05d", i); @@ -1822,36 +1825,36 @@ public class BigQueryIOTest implements Serializable { for (int j = 0; j < numFilesPerTable; ++j) { String fileName = String.format("%s_files%05d", tableName, j); filenames.add(fileName); - files.add(new Result<>(fileName, fileSize, tableName)); + files.add(new Result<>(fileName, fileSize, new TableDestination(tableName, ""))); } } - TupleTag<KV<ShardedKey<String>, List<String>>> multiPartitionsTag = - new TupleTag<KV<ShardedKey<String>, List<String>>>("multiPartitionsTag") {}; - TupleTag<KV<ShardedKey<String>, List<String>>> singlePartitionTag = - new TupleTag<KV<ShardedKey<String>, List<String>>>("singlePartitionTag") {}; + TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag = + new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag = + new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {}; - PCollectionView<Iterable<WriteBundlesToFiles.Result<String>>> resultsView = + PCollectionView<Iterable<WriteBundlesToFiles.Result<TableDestination>>> resultsView = p.apply( Create.of(files) - .withCoder(WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of()))) - .apply(View.<WriteBundlesToFiles.Result<String>>asIterable()); + .withCoder(WriteBundlesToFiles.ResultCoder.of(TableDestinationCoder.of()))) + .apply(View.<WriteBundlesToFiles.Result<TableDestination>>asIterable()); String tempFilePrefix = testFolder.newFolder("BigQueryIOTest").getAbsolutePath(); PCollectionView<String> tempFilePrefixView = p.apply(Create.of(tempFilePrefix)).apply(View.<String>asSingleton()); - WritePartition<String> writePartition = - new WritePartition<>( - isSingleton, tempFilePrefixView, resultsView, multiPartitionsTag, singlePartitionTag); + WritePartition<TableDestination> writePartition = + new WritePartition<>(isSingleton, dynamicDestinations, tempFilePrefixView, + resultsView, multiPartitionsTag, singlePartitionTag); - DoFnTester<Void, KV<ShardedKey<String>, List<String>>> tester = + DoFnTester<Void, KV<ShardedKey<TableDestination>, List<String>>> tester = DoFnTester.of(writePartition); tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files); tester.setSideInput(tempFilePrefixView, GlobalWindow.INSTANCE, tempFilePrefix); tester.processElement(null); - List<KV<ShardedKey<String>, List<String>>> partitions; + List<KV<ShardedKey<TableDestination>, List<String>>> partitions; if (expectedNumPartitionsPerTable > 1) { partitions = tester.takeOutputElements(multiPartitionsTag); } else { @@ -1859,10 +1862,10 @@ public class BigQueryIOTest implements Serializable { } - List<ShardedKey<String>> partitionsResult = Lists.newArrayList(); + List<ShardedKey<TableDestination>> partitionsResult = Lists.newArrayList(); Map<String, List<String>> filesPerTableResult = Maps.newHashMap(); - for (KV<ShardedKey<String>, List<String>> partition : partitions) { - String table = partition.getKey().getKey(); + for (KV<ShardedKey<TableDestination>, List<String>> partition : partitions) { + String table = partition.getKey().getKey().getTableSpec(); partitionsResult.add(partition.getKey()); List<String> tableFilesResult = filesPerTableResult.get(table); if (tableFilesResult == null) {
