Handle PCollectionList.empty() in FlattenEvaluatorFactory

PCollectionList.empty() is a valid argument to a Flatten#pCollections
PTransform. It should succeed and produce no output.

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115455733


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d15d924d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d15d924d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d15d924d

Branch: refs/heads/master
Commit: d15d924d5dae18a07067cc3a71ba3b50431fe3d7
Parents: f7fc939
Author: tgroh <[email protected]>
Authored: Wed Feb 24 08:42:55 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:27 2016 -0800

----------------------------------------------------------------------
 .../inprocess/FlattenEvaluatorFactory.java      |  8 ++++++-
 .../inprocess/FlattenEvaluatorFactoryTest.java  | 23 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d15d924d/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
index d8b5312..1442888 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015 Google Inc.
+ * Copyright (C) 2016 Google Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy 
of
@@ -46,6 +46,12 @@ class FlattenEvaluatorFactory implements 
TransformEvaluatorFactory {
           application,
       final CommittedBundle<InputT> inputBundle,
       final InProcessEvaluationContext evaluationContext) {
+    if (inputBundle == null) {
+      // it is impossible to call processElement on a flatten with no input 
bundle. A Flatten with
+      // no input bundle occurs as an output of 
Flatten.pcollections(PCollectionList.empty())
+      return new FlattenEvaluator<>(
+          null, StepTransformResult.withoutHold(application).build());
+    }
     final UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(inputBundle, application.getOutput());
     final InProcessTransformResult result =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d15d924d/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
index c2b9995..dac42b6 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
@@ -16,6 +16,7 @@
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -112,4 +113,26 @@ public class FlattenEvaluatorFactoryTest {
             WindowedValue.timestampedValueInGlobalWindow(-4, new 
Instant(-4096)),
             WindowedValue.valueInGlobalWindow(-1)));
   }
+
+  @Test
+  public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws 
Exception {
+    TestPipeline p = TestPipeline.create();
+    PCollectionList<Integer> list = PCollectionList.empty(p);
+
+    PCollection<Integer> flattened = 
list.apply(Flatten.<Integer>pCollections());
+
+    InProcessEvaluationContext context = 
mock(InProcessEvaluationContext.class);
+
+    FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory();
+    TransformEvaluator<Integer> emptyEvaluator =
+        factory.forApplication(flattened.getProducingTransformInternal(), 
null, context);
+
+    InProcessTransformResult leftSideResult = emptyEvaluator.finishBundle();
+
+    assertThat(leftSideResult.getOutputBundles(), emptyIterable());
+    assertThat(
+        leftSideResult.getTransform(),
+        Matchers.<AppliedPTransform<?, ?, 
?>>equalTo(flattened.getProducingTransformInternal()));
+  }
+
 }

Reply via email to