Add a Base OverrideFactory class for 1-to-1 overrides

These overrides are relatively common, and this reduces the
reimplementation of mapping singletons and casting them.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/926385c4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/926385c4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/926385c4

Branch: refs/heads/master
Commit: 926385c474f652117d814e966f476ca6280ba506
Parents: 453e37b
Author: Thomas Groh <[email protected]>
Authored: Thu Feb 16 19:14:23 2017 -0800
Committer: Thomas Groh <[email protected]>
Committed: Wed Feb 22 09:36:17 2017 -0800

----------------------------------------------------------------------
 .../SingleInputOutputOverrideFactory.java       |  50 ++++++++
 .../SingleInputOutputOverrideFactoryTest.java   | 114 +++++++++++++++++++
 2 files changed, 164 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/926385c4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
new file mode 100644
index 0000000..43bf556
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.google.common.collect.Iterables;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+
+/**
+ * A {@link PTransformOverrideFactory} which consumes from a {@link PValue} 
and produces a
+ * {@link PValue}. {@link #getInput(List, Pipeline)} and {@link 
#mapOutputs(List, PValue)} are
+ * implemented.
+ */
+public abstract class SingleInputOutputOverrideFactory<
+        InputT extends PValue,
+        OutputT extends PValue,
+        TransformT extends PTransform<InputT, OutputT>>
+    implements PTransformOverrideFactory<InputT, OutputT, TransformT> {
+  @Override
+  public final InputT getInput(List<TaggedPValue> inputs, Pipeline p) {
+    return (InputT) Iterables.getOnlyElement(inputs).getValue();
+  }
+
+  @Override
+  public final Map<PValue, ReplacementOutput> mapOutputs(
+      List<TaggedPValue> outputs, OutputT newOutput) {
+    return ReplacementOutputs.singleton(outputs, newOutput);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/926385c4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
new file mode 100644
index 0000000..b4cdd1f
--- /dev/null
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SingleInputOutputOverrideFactory}. */
+@RunWith(JUnit4.class)
+public class SingleInputOutputOverrideFactoryTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  @Rule
+  public transient TestPipeline pipeline =
+      TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  private transient SingleInputOutputOverrideFactory<
+          PCollection<? extends Integer>, PCollection<Integer>, 
MapElements<Integer, Integer>>
+      factory =
+          new SingleInputOutputOverrideFactory<
+              PCollection<? extends Integer>, PCollection<Integer>,
+              MapElements<Integer, Integer>>() {
+            @Override
+            public PTransform<PCollection<? extends Integer>, 
PCollection<Integer>>
+                getReplacementTransform(MapElements<Integer, Integer> 
transform) {
+              return transform;
+            }
+          };
+
+  private SimpleFunction<Integer, Integer> fn = new SimpleFunction<Integer, 
Integer>() {
+      @Override
+      public Integer apply(Integer input) {
+        return input - 1;
+      }
+    };
+
+  @Test
+  public void testGetInput() {
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
+    assertThat(
+        factory.getInput(input.expand(), pipeline),
+        Matchers.<PCollection<? extends Integer>>equalTo(input));
+  }
+
+  @Test
+  public void testGetInputMultipleInputsFails() {
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
+    PCollection<Integer> otherInput = pipeline.apply("OtherCreate", 
Create.of(1, 2, 3));
+
+    thrown.expect(IllegalArgumentException.class);
+    factory.getInput(PCollectionList.of(input).and(otherInput).expand(), 
pipeline);
+  }
+
+  @Test
+  public void testMapOutputs() {
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
+    PCollection<Integer> output = input.apply("Map", MapElements.via(fn));
+    PCollection<Integer> reappliedOutput = input.apply("ReMap", 
MapElements.via(fn));
+    Map<PValue, ReplacementOutput> replacementMap =
+        factory.mapOutputs(output.expand(), reappliedOutput);
+    assertThat(
+        replacementMap,
+        Matchers.<PValue, ReplacementOutput>hasEntry(
+            reappliedOutput,
+            ReplacementOutput.of(
+                Iterables.getOnlyElement(output.expand()),
+                Iterables.getOnlyElement(reappliedOutput.expand()))));
+  }
+
+  @Test
+  public void testMapOutputsMultipleOriginalOutputsFails() {
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
+    PCollection<Integer> output = input.apply("Map", MapElements.via(fn));
+    PCollection<Integer> reappliedOutput = input.apply("ReMap", 
MapElements.via(fn));
+    thrown.expect(IllegalArgumentException.class);
+    Map<PValue, ReplacementOutput> replacementMap =
+        factory.mapOutputs(
+            
PCollectionList.of(output).and(input).and(reappliedOutput).expand(), 
reappliedOutput);
+  }
+}

Reply via email to