Repository: beam
Updated Branches:
  refs/heads/master 0cba43ee2 -> 09d75a0b0


[BEAM-2406] Fix NullPointerException when writing an empty table


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52df03af
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52df03af
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52df03af

Branch: refs/heads/master
Commit: 52df03af64fd611c76fc56be89d2a677138dae80
Parents: 0cba43e
Author: Reuven Lax <[email protected]>
Authored: Fri Jun 2 18:29:59 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Mon Jun 5 10:00:03 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  1 +
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  2 +
 .../sdk/io/gcp/bigquery/WritePartition.java     |  7 ++-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 45 +++++++++++---------
 4 files changed, 32 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/52df03af/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 3686f99..c1b202e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -257,6 +257,7 @@ class BatchLoads<DestinationT>
             ParDo.of(
                     new WritePartition<>(
                         singletonTable,
+                        dynamicDestinations,
                         tempFilePrefix,
                         resultsView,
                         multiPartitionsTag,

http://git-wip-us.apache.org/repos/asf/beam/blob/52df03af/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index f014039..0b5f54b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
 
 import com.google.api.services.bigquery.model.TableRow;
@@ -79,6 +80,7 @@ class WriteBundlesToFiles<DestinationT>
     public final DestinationT destination;
 
     public Result(String filename, Long fileByteSize, DestinationT 
destination) {
+      checkNotNull(destination);
       this.filename = filename;
       this.fileByteSize = fileByteSize;
       this.destination = destination;

http://git-wip-us.apache.org/repos/asf/beam/blob/52df03af/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 24693da..acd1132 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.TupleTag;
 class WritePartition<DestinationT>
     extends DoFn<Void, KV<ShardedKey<DestinationT>, List<String>>> {
   private final boolean singletonTable;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
   private final PCollectionView<String> tempFilePrefix;
   private final 
PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results;
   private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> 
multiPartitionsTag;
@@ -100,11 +101,13 @@ class WritePartition<DestinationT>
 
   WritePartition(
       boolean singletonTable,
+      DynamicDestinations<?, DestinationT> dynamicDestinations,
       PCollectionView<String> tempFilePrefix,
       PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> 
results,
       TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag,
       TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag) 
{
     this.singletonTable = singletonTable;
+    this.dynamicDestinations = dynamicDestinations;
     this.results = results;
     this.tempFilePrefix = tempFilePrefix;
     this.multiPartitionsTag = multiPartitionsTag;
@@ -126,8 +129,8 @@ class WritePartition<DestinationT>
       // Return a null destination in this case - the constant 
DynamicDestinations class will
       // resolve it to the singleton output table.
       results.add(
-          new Result<DestinationT>(
-              writerResult.resourceId.toString(), writerResult.byteSize, 
null));
+          new Result<>(writerResult.resourceId.toString(), 
writerResult.byteSize,
+              dynamicDestinations.getDestination(null)));
     }
 
     Map<DestinationT, DestinationData> currentResults = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/beam/blob/52df03af/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 04bbac4..bfd260a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1797,20 +1797,23 @@ public class BigQueryIOTest implements Serializable {
     // function) and there is no input data, WritePartition will generate an 
empty table. This
     // code is to test that path.
     boolean isSingleton = numTables == 1 && numFilesPerTable == 0;
-
-    List<ShardedKey<String>> expectedPartitions = Lists.newArrayList();
+    DynamicDestinations<String, TableDestination> dynamicDestinations =
+        new DynamicDestinationsHelpers.ConstantTableDestinations<>(
+            StaticValueProvider.of("SINGLETON"), "");
+    List<ShardedKey<TableDestination>> expectedPartitions = 
Lists.newArrayList();
     if (isSingleton) {
-      expectedPartitions.add(ShardedKey.<String>of(null, 1));
+      expectedPartitions.add(ShardedKey.<TableDestination>of(
+          new TableDestination("SINGLETON", ""), 1));
     } else {
       for (int i = 0; i < numTables; ++i) {
         for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) {
           String tableName = String.format("project-id:dataset-id.tables%05d", 
i);
-          expectedPartitions.add(ShardedKey.of(tableName, j));
+          expectedPartitions.add(ShardedKey.of(new TableDestination(tableName, 
""), j));
         }
       }
     }
 
-    List<WriteBundlesToFiles.Result<String>> files = Lists.newArrayList();
+    List<WriteBundlesToFiles.Result<TableDestination>> files = 
Lists.newArrayList();
     Map<String, List<String>> filenamesPerTable = Maps.newHashMap();
     for (int i = 0; i < numTables; ++i) {
       String tableName = String.format("project-id:dataset-id.tables%05d", i);
@@ -1822,36 +1825,36 @@ public class BigQueryIOTest implements Serializable {
       for (int j = 0; j < numFilesPerTable; ++j) {
         String fileName = String.format("%s_files%05d", tableName, j);
         filenames.add(fileName);
-        files.add(new Result<>(fileName, fileSize, tableName));
+        files.add(new Result<>(fileName, fileSize, new 
TableDestination(tableName, "")));
       }
     }
 
-    TupleTag<KV<ShardedKey<String>, List<String>>> multiPartitionsTag =
-        new TupleTag<KV<ShardedKey<String>, 
List<String>>>("multiPartitionsTag") {};
-    TupleTag<KV<ShardedKey<String>, List<String>>> singlePartitionTag =
-        new TupleTag<KV<ShardedKey<String>, 
List<String>>>("singlePartitionTag") {};
+    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> 
multiPartitionsTag =
+        new TupleTag<KV<ShardedKey<TableDestination>, 
List<String>>>("multiPartitionsTag") {};
+    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> 
singlePartitionTag =
+        new TupleTag<KV<ShardedKey<TableDestination>, 
List<String>>>("singlePartitionTag") {};
 
-    PCollectionView<Iterable<WriteBundlesToFiles.Result<String>>> resultsView =
+    PCollectionView<Iterable<WriteBundlesToFiles.Result<TableDestination>>> 
resultsView =
         p.apply(
                 Create.of(files)
-                    
.withCoder(WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of())))
-            .apply(View.<WriteBundlesToFiles.Result<String>>asIterable());
+                    
.withCoder(WriteBundlesToFiles.ResultCoder.of(TableDestinationCoder.of())))
+            
.apply(View.<WriteBundlesToFiles.Result<TableDestination>>asIterable());
 
     String tempFilePrefix = 
testFolder.newFolder("BigQueryIOTest").getAbsolutePath();
     PCollectionView<String> tempFilePrefixView =
         p.apply(Create.of(tempFilePrefix)).apply(View.<String>asSingleton());
 
-    WritePartition<String> writePartition =
-        new WritePartition<>(
-            isSingleton, tempFilePrefixView, resultsView, multiPartitionsTag, 
singlePartitionTag);
+    WritePartition<TableDestination> writePartition =
+        new WritePartition<>(isSingleton, dynamicDestinations, 
tempFilePrefixView,
+            resultsView, multiPartitionsTag, singlePartitionTag);
 
-    DoFnTester<Void, KV<ShardedKey<String>, List<String>>> tester =
+    DoFnTester<Void, KV<ShardedKey<TableDestination>, List<String>>> tester =
         DoFnTester.of(writePartition);
     tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files);
     tester.setSideInput(tempFilePrefixView, GlobalWindow.INSTANCE, 
tempFilePrefix);
     tester.processElement(null);
 
-    List<KV<ShardedKey<String>, List<String>>> partitions;
+    List<KV<ShardedKey<TableDestination>, List<String>>> partitions;
     if (expectedNumPartitionsPerTable > 1) {
       partitions = tester.takeOutputElements(multiPartitionsTag);
     } else {
@@ -1859,10 +1862,10 @@ public class BigQueryIOTest implements Serializable {
     }
 
 
-    List<ShardedKey<String>> partitionsResult = Lists.newArrayList();
+    List<ShardedKey<TableDestination>> partitionsResult = Lists.newArrayList();
     Map<String, List<String>> filesPerTableResult = Maps.newHashMap();
-    for (KV<ShardedKey<String>, List<String>> partition : partitions) {
-      String table = partition.getKey().getKey();
+    for (KV<ShardedKey<TableDestination>, List<String>> partition : 
partitions) {
+      String table = partition.getKey().getKey().getTableSpec();
       partitionsResult.add(partition.getKey());
       List<String> tableFilesResult = filesPerTableResult.get(table);
       if (tableFilesResult == null) {

Reply via email to