Repository: incubator-beam Updated Branches: refs/heads/master 217c29bfc -> cc28f0cb4
Add PTransformOverrideFactory to the Core SDK This migrates PTransformOverrideFactory from the DirectRunner to the Core SDK, as part of BEAM-646. Add getOriginalToReplacements to provide a mapping from the original outputs to replaced outputs. This enables all replaced nodes to be rewired to output the original output. Migrate all DirectRunner Override Factories to the new PTransformOverrideFactory. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3f227a0a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3f227a0a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3f227a0a Branch: refs/heads/master Commit: 3f227a0ad18f425767e89f88d8a1c9fdcebdd80c Parents: 217c29b Author: Thomas Groh <tg...@google.com> Authored: Mon Dec 5 16:01:57 2016 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Fri Dec 16 14:21:49 2016 -0800 ---------------------------------------------------------------------- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 7 ++-- .../direct/DirectGroupByKeyOverrideFactory.java | 3 +- .../beam/runners/direct/DirectRunner.java | 7 +++- .../direct/PTransformOverrideFactory.java | 35 ----------------- .../direct/ParDoMultiOverrideFactory.java | 9 +++-- .../ParDoSingleViaMultiOverrideFactory.java | 11 +++--- .../direct/TestStreamEvaluatorFactory.java | 5 ++- .../runners/direct/ViewEvaluatorFactory.java | 4 +- .../direct/WriteWithShardingFactory.java | 6 ++- .../direct/WriteWithShardingFactoryTest.java | 4 +- .../beam/sdk/annotations/Experimental.java | 5 ++- .../sdk/runners/PTransformOverrideFactory.java | 41 ++++++++++++++++++++ 12 files changed, 80 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java index 1fa059c..ab4c114 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java @@ -18,7 +18,8 @@ package org.apache.beam.runners.direct; import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -30,10 +31,10 @@ import org.apache.beam.sdk.values.PCollection; class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT> implements PTransformOverrideFactory< PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>, - SplittableParDo.GBKIntoKeyedWorkItems<KeyT, InputT>> { + GBKIntoKeyedWorkItems<KeyT, InputT>> { @Override public PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> - override(SplittableParDo.GBKIntoKeyedWorkItems<KeyT, InputT> transform) { + getReplacementTransform(GBKIntoKeyedWorkItems<KeyT, InputT> transform) { return new DirectGroupByKey.DirectGroupByKeyOnly<>(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java index 9acf5e9..7cf3256 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; @@ -27,7 +28,7 @@ final class DirectGroupByKeyOverrideFactory<K, V> implements PTransformOverrideFactory< PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> { @Override - public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> override( + public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> getReplacementTransform( GroupByKey<K, V> transform) { return new DirectGroupByKey<>(transform); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 2f84356..78163c0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Aggregator; @@ -284,9 +285,11 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { @Override public <OutputT extends POutput, InputT extends PInput> OutputT apply( PTransform<InputT, OutputT> transform, InputT input) { - PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass()); + PTransformOverrideFactory<InputT, OutputT, PTransform<InputT, OutputT>> overrideFactory = + defaultTransformOverrides.get(transform.getClass()); if (overrideFactory != null) { - PTransform<InputT, OutputT> customTransform = overrideFactory.override(transform); + PTransform<InputT, OutputT> customTransform = + overrideFactory.getReplacementTransform(transform); if (customTransform != transform) { return Pipeline.applyTransform(transform.getName(), input, customTransform); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java deleted file mode 100644 index 8db6e9b..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -interface PTransformOverrideFactory< - InputT extends PInput, - OutputT extends POutput, - TransformT extends PTransform<InputT, OutputT>> { - /** - * Create a {@link PTransform} override for the provided {@link PTransform} if applicable. - * Otherwise, return the input {@link PTransform}. - * - * <p>The returned PTransform must be semantically equivalent to the input {@link PTransform}. - */ - PTransform<InputT, OutputT> override(TransformT transform); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 4401434..c5bc069 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -20,10 +20,12 @@ package org.apache.beam.runners.direct; import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.values.KV; @@ -39,12 +41,11 @@ import org.apache.beam.sdk.values.TypedPValue; */ class ParDoMultiOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory< - PCollection<? extends InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> { - + PCollection<? extends InputT>, PCollectionTuple, BoundMulti<InputT, OutputT>> { @Override @SuppressWarnings("unchecked") - public PTransform<PCollection<? extends InputT>, PCollectionTuple> override( - ParDo.BoundMulti<InputT, OutputT> transform) { + public PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform( + BoundMulti<InputT, OutputT> transform) { DoFn<InputT, OutputT> fn = transform.getFn(); DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java index 5fcf49c..3ae3382 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java @@ -17,8 +17,10 @@ */ package org.apache.beam.runners.direct; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -30,12 +32,11 @@ import org.apache.beam.sdk.values.TupleTagList; */ class ParDoSingleViaMultiOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory< - PCollection<? extends InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> { + PCollection<? extends InputT>, PCollection<OutputT>, Bound<InputT, OutputT>>{ @Override - @SuppressWarnings("unchecked") - public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> override( - ParDo.Bound<InputT, OutputT> transform) { - return new ParDoSingleViaMulti(transform); + public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> getReplacementTransform( + Bound<InputT, OutputT> transform) { + return new ParDoSingleViaMulti<>(transform); } static class ParDoSingleViaMulti<InputT, OutputT> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 3601dbc..6ba65bf 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.TestStream.ElementEvent; @@ -157,8 +158,10 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { static class DirectTestStreamFactory<T> implements PTransformOverrideFactory<PBegin, PCollection<T>, TestStream<T>> { + @Override - public PTransform<PBegin, PCollection<T>> override(TestStream<T> transform) { + public PTransform<PBegin, PCollection<T>> getReplacementTransform( + TestStream<T> transform) { return new DirectTestStream<>(transform); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index 460b1c2..96a18d7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -24,6 +24,7 @@ import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; import org.apache.beam.runners.direct.StepTransformResult.Builder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; @@ -96,8 +97,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { public static class ViewOverrideFactory<ElemT, ViewT> implements PTransformOverrideFactory< PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> { + @Override - public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> override( + public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform( CreatePCollectionView<ElemT, ViewT> transform) { return new DirectCreatePCollectionView<>(transform); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index 3c88337..fd1c175 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -47,11 +47,13 @@ import org.joda.time.Duration; * of shards is the log base 10 of the number of input records, with up to 2 additional shards. */ class WriteWithShardingFactory<InputT> - implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write.Bound<InputT>> { + implements org.apache.beam.sdk.runners.PTransformOverrideFactory< + PCollection<InputT>, PDone, Write.Bound<InputT>> { static final int MAX_RANDOM_EXTRA_SHARDS = 3; @Override - public PTransform<PCollection<InputT>, PDone> override(Write.Bound<InputT> transform) { + public PTransform<PCollection<InputT>, PDone> getReplacementTransform( + Bound<InputT> transform) { if (transform.getNumShards() == 0) { return new DynamicallyReshardedWrite<>(transform); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 1ff5de2..aeb75ed 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -121,13 +121,13 @@ public class WriteWithShardingFactoryTest { public void withShardingSpecifiesOriginalTransform() { Write.Bound<Object> original = Write.to(new TestSink()).withNumShards(3); - assertThat(factory.override(original), equalTo((Object) original)); + assertThat(factory.getReplacementTransform(original), equalTo((Object) original)); } @Test public void withNoShardingSpecifiedReturnsNewTransform() { Write.Bound<Object> original = Write.to(new TestSink()); - assertThat(factory.override(original), not(equalTo((Object) original))); + assertThat(factory.getReplacementTransform(original), not(equalTo((Object) original))); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index c55cd5e..2659659 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -85,6 +85,9 @@ public @interface Experimental { SPLITTABLE_DO_FN, /** Metrics-related experimental APIs. */ - METRICS + METRICS, + + /** Experimental runner APIs. Should not be used by pipeline authors. */ + CORE_RUNNERS_ONLY } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java new file mode 100644 index 0000000..f6e90e2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.beam.sdk.runners; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +/** + * Produces {@link PipelineRunner}-specific overrides of {@link PTransform PTransforms}, and + * provides mappings between original and replacement outputs. + */ +@Experimental(Kind.CORE_RUNNERS_ONLY) +public interface PTransformOverrideFactory< + InputT extends PInput, + OutputT extends POutput, + TransformT extends PTransform<? super InputT, OutputT>> { + /** + * Returns a {@link PTransform} that produces equivalent output to the provided transform. + */ + PTransform<InputT, OutputT> getReplacementTransform(TransformT transform); +}