BigQuery: swap from asSingleton to asIterable for Cleanup

asIterable can be simpler for runners to implement as it does not require 
semantically
that the PCollection being viewed contains exactly one element.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/059b351e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/059b351e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/059b351e

Branch: refs/heads/gearpump-runner
Commit: 059b351e58ab746ee699ee5d8ff746a27ec7586e
Parents: aaa5e55
Author: Dan Halperin <[email protected]>
Authored: Tue May 2 10:37:11 2017 -0700
Committer: Dan Halperin <[email protected]>
Committed: Tue May 2 13:13:52 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/059b351e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
index 75f7b93..f49c4e1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
@@ -53,9 +53,9 @@ class PassThroughThenCleanup<T> extends 
PTransform<PCollection<T>, PCollection<T
     PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
         .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal)));
 
-    PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal)
+    PCollectionView<Iterable<Void>> cleanupSignalView = 
outputs.get(cleanupSignal)
         .setCoder(VoidCoder.of())
-        .apply(View.<Void>asSingleton().withDefaultValue(null));
+        .apply(View.<Void>asIterable());
 
     input.getPipeline()
         .apply("Create(CleanupOperation)", Create.of(cleanupOperation))

Reply via email to