Handle PCollectionList.empty() in FlattenEvaluatorFactory PCollectionList.empty() is a valid argument to a Flatten#pCollections PTransform. It should succeed and produce no output.
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115455733 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d15d924d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d15d924d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d15d924d Branch: refs/heads/master Commit: d15d924d5dae18a07067cc3a71ba3b50431fe3d7 Parents: f7fc939 Author: tgroh <[email protected]> Authored: Wed Feb 24 08:42:55 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:27 2016 -0800 ---------------------------------------------------------------------- .../inprocess/FlattenEvaluatorFactory.java | 8 ++++++- .../inprocess/FlattenEvaluatorFactoryTest.java | 23 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d15d924d/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java index d8b5312..1442888 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015 Google Inc. + * Copyright (C) 2016 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -46,6 +46,12 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory { application, final CommittedBundle<InputT> inputBundle, final InProcessEvaluationContext evaluationContext) { + if (inputBundle == null) { + // it is impossible to call processElement on a flatten with no input bundle. A Flatten with + // no input bundle occurs as an output of Flatten.pcollections(PCollectionList.empty()) + return new FlattenEvaluator<>( + null, StepTransformResult.withoutHold(application).build()); + } final UncommittedBundle<InputT> outputBundle = evaluationContext.createBundle(inputBundle, application.getOutput()); final InProcessTransformResult result = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d15d924d/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java index c2b9995..dac42b6 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java @@ -16,6 +16,7 @@ package com.google.cloud.dataflow.sdk.runners.inprocess; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -112,4 +113,26 @@ public class FlattenEvaluatorFactoryTest { WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)), WindowedValue.valueInGlobalWindow(-1))); } + + @Test + public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Exception { + TestPipeline p = TestPipeline.create(); + PCollectionList<Integer> list = PCollectionList.empty(p); + + PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections()); + + InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); + + FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(); + TransformEvaluator<Integer> emptyEvaluator = + factory.forApplication(flattened.getProducingTransformInternal(), null, context); + + InProcessTransformResult leftSideResult = emptyEvaluator.finishBundle(); + + assertThat(leftSideResult.getOutputBundles(), emptyIterable()); + assertThat( + leftSideResult.getTransform(), + Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal())); + } + }
