BigQuery: swap from asSingleton to asIterable for Cleanup asIterable can be simpler for runners to implement as it does not require semantically that the PCollection being viewed contains exactly one element.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/059b351e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/059b351e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/059b351e Branch: refs/heads/gearpump-runner Commit: 059b351e58ab746ee699ee5d8ff746a27ec7586e Parents: aaa5e55 Author: Dan Halperin <[email protected]> Authored: Tue May 2 10:37:11 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Tue May 2 13:13:52 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/059b351e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java index 75f7b93..f49c4e1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java @@ -53,9 +53,9 @@ class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>()) .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal))); - PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal) + PCollectionView<Iterable<Void>> cleanupSignalView = outputs.get(cleanupSignal) .setCoder(VoidCoder.of()) - .apply(View.<Void>asSingleton().withDefaultValue(null)); + .apply(View.<Void>asIterable()); input.getPipeline() .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
