Renames ParDo.getNewFn to getFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b502fc1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b502fc1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b502fc1 Branch: refs/heads/master Commit: 6b502fc111af266c7b1a0e6f7d473c36f57281a2 Parents: 33ed323 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Dec 9 17:29:41 2016 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Thu Dec 15 13:59:11 2016 -0800 ---------------------------------------------------------------------- .../translation/ParDoBoundMultiTranslator.java | 2 +- .../apex/translation/ParDoBoundTranslator.java | 2 +- .../beam/runners/core/SplittableParDo.java | 4 ++-- .../runners/direct/ParDoEvaluatorFactory.java | 2 +- .../direct/ParDoMultiOverrideFactory.java | 2 +- .../ParDoSingleViaMultiOverrideFactory.java | 4 ++-- .../direct/StatefulParDoEvaluatorFactory.java | 4 ++-- .../FlinkBatchTransformTranslators.java | 4 ++-- .../FlinkStreamingTransformTranslators.java | 12 +++++------ .../dataflow/DataflowPipelineTranslator.java | 8 +++---- .../spark/translation/TransformTranslator.java | 4 ++-- .../streaming/StreamingTransformTranslator.java | 4 ++-- .../beam/sdk/AggregatorPipelineExtractor.java | 4 ++-- .../org/apache/beam/sdk/transforms/ParDo.java | 22 ++++++++++---------- .../sdk/AggregatorPipelineExtractorTest.java | 12 +++++------ 15 files changed, 45 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java index 574ce8f..bff7652 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java @@ -53,7 +53,7 @@ class ParDoBoundMultiTranslator<InputT, OutputT> @Override public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { - DoFn<InputT, OutputT> doFn = transform.getNewFn(); + DoFn<InputT, OutputT> doFn = transform.getFn(); DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); if (signature.stateDeclarations().size() > 0) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java index de78628..3b6eb6e 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java @@ -40,7 +40,7 @@ class ParDoBoundTranslator<InputT, OutputT> @Override public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { - DoFn<InputT, OutputT> doFn = transform.getNewFn(); + DoFn<InputT, OutputT> doFn = transform.getFn(); DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); if (signature.stateDeclarations().size() > 0) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 720db63..f8d12ec 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -104,7 +104,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> checkNotNull(parDo, "parDo must not be null"); this.parDo = parDo; checkArgument( - DoFnSignatures.getSignature(parDo.getNewFn().getClass()).processElement().isSplittable(), + DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(), "fn must be a splittable DoFn"); } @@ -114,7 +114,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> } private PCollectionTuple applyTyped(PCollection<InputT> input) { - DoFn<InputT, OutputT> fn = parDo.getNewFn(); + DoFn<InputT, OutputT> fn = parDo.getFn(); Coder<RestrictionT> restrictionCoder = DoFnInvokers.invokerFor(fn) .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index ec5dc2c..b4684e3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -65,7 +65,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator application; ParDo.BoundMulti<InputT, OutputT> transform = parDoApplication.getTransform(); - final DoFn<InputT, OutputT> doFn = transform.getNewFn(); + final DoFn<InputT, OutputT> doFn = transform.getFn(); @SuppressWarnings({"unchecked", "rawtypes"}) TransformEvaluator<T> evaluator = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 4e7914f..4401434 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -46,7 +46,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT> public PTransform<PCollection<? extends InputT>, PCollectionTuple> override( ParDo.BoundMulti<InputT, OutputT> transform) { - DoFn<InputT, OutputT> fn = transform.getNewFn(); + DoFn<InputT, OutputT> fn = transform.getFn(); DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); if (signature.processElement().isSplittable()) { return new SplittableParDo(transform); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java index 10530bb..5fcf49c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java @@ -56,12 +56,12 @@ class ParDoSingleViaMultiOverrideFactory<InputT, OutputT> PCollectionTuple outputs = input.apply( - ParDo.of(underlyingParDo.getNewFn()) + ParDo.of(underlyingParDo.getFn()) .withSideInputs(underlyingParDo.getSideInputs()) .withOutputTags(mainOutputTag, TupleTagList.empty())); PCollection<OutputT> output = outputs.get(mainOutputTag); - output.setTypeDescriptor(underlyingParDo.getNewFn().getOutputTypeDescriptor()); + output.setTypeDescriptor(underlyingParDo.getFn().getOutputTypeDescriptor()); return output; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 1f3286c..1f64d9a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -86,7 +86,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo throws Exception { final DoFn<KV<K, InputT>, OutputT> doFn = - application.getTransform().getUnderlyingParDo().getNewFn(); + application.getTransform().getUnderlyingParDo().getFn(); final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); // If the DoFn is stateful, schedule state clearing. @@ -141,7 +141,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo WindowingStrategy<?, ?> windowingStrategy = pc.getWindowingStrategy(); BoundedWindow window = transformOutputWindow.getWindow(); final DoFn<?, ?> doFn = - transformOutputWindow.getTransform().getTransform().getUnderlyingParDo().getNewFn(); + transformOutputWindow.getTransform().getTransform().getUnderlyingParDo().getFn(); final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); final DirectStepContext stepContext = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index 497b293..eb625b2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -516,7 +516,7 @@ class FlinkBatchTransformTranslators { ParDo.Bound<InputT, OutputT> transform, FlinkBatchTranslationContext context) { - DoFn<InputT, OutputT> doFn = transform.getNewFn(); + DoFn<InputT, OutputT> doFn = transform.getFn(); rejectStateAndTimers(doFn); DataSet<WindowedValue<InputT>> inputDataSet = @@ -562,7 +562,7 @@ class FlinkBatchTransformTranslators { public void translateNode( ParDo.BoundMulti<InputT, OutputT> transform, FlinkBatchTranslationContext context) { - DoFn<InputT, OutputT> doFn = transform.getNewFn(); + DoFn<InputT, OutputT> doFn = transform.getFn(); rejectStateAndTimers(doFn); DataSet<WindowedValue<InputT>> inputDataSet = context.getInputDataSet(context.getInput(transform)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 42ef630..ffa6d16 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -338,7 +338,7 @@ public class FlinkStreamingTransformTranslators { ParDo.Bound<InputT, OutputT> transform, FlinkStreamingTranslationContext context) { - DoFn<InputT, OutputT> doFn = transform.getNewFn(); + DoFn<InputT, OutputT> doFn = transform.getFn(); rejectStateAndTimers(doFn); WindowingStrategy<?, ?> windowingStrategy = @@ -358,7 +358,7 @@ public class FlinkStreamingTransformTranslators { if (sideInputs.isEmpty()) { DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = new DoFnOperator<>( - transform.getNewFn(), + transform.getFn(), inputTypeInfo, new TupleTag<OutputT>("main output"), Collections.<TupleTag<?>>emptyList(), @@ -381,7 +381,7 @@ public class FlinkStreamingTransformTranslators { DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = new DoFnOperator<>( - transform.getNewFn(), + transform.getFn(), inputTypeInfo, new TupleTag<OutputT>("main output"), Collections.<TupleTag<?>>emptyList(), @@ -490,7 +490,7 @@ public class FlinkStreamingTransformTranslators { ParDo.BoundMulti<InputT, OutputT> transform, FlinkStreamingTranslationContext context) { - DoFn<InputT, OutputT> doFn = transform.getNewFn(); + DoFn<InputT, OutputT> doFn = transform.getFn(); rejectStateAndTimers(doFn); // we assume that the transformation does not change the windowing strategy. @@ -515,7 +515,7 @@ public class FlinkStreamingTransformTranslators { if (sideInputs.isEmpty()) { DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = new DoFnOperator<>( - transform.getNewFn(), + transform.getFn(), inputTypeInfo, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), @@ -542,7 +542,7 @@ public class FlinkStreamingTransformTranslators { DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = new DoFnOperator<>( - transform.getNewFn(), + transform.getFn(), inputTypeInfo, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index a56690c..8d2b076 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -955,14 +955,14 @@ public class DataflowPipelineTranslator { private <InputT, OutputT> void translateMultiHelper( ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { - rejectStatefulDoFn(transform.getNewFn()); + rejectStatefulDoFn(transform.getFn()); context.addStep(transform, "ParallelDo"); translateInputs(context.getInput(transform), transform.getSideInputs(), context); BiMap<Long, TupleTag<?>> outputMap = translateOutputs(context.getOutput(transform), context); translateFn( - transform.getNewFn(), + transform.getFn(), context.getInput(transform).getWindowingStrategy(), transform.getSideInputs(), context.getInput(transform).getCoder(), @@ -985,13 +985,13 @@ public class DataflowPipelineTranslator { private <InputT, OutputT> void translateSingleHelper( ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { - rejectStatefulDoFn(transform.getNewFn()); + rejectStatefulDoFn(transform.getFn()); context.addStep(transform, "ParallelDo"); translateInputs(context.getInput(transform), transform.getSideInputs(), context); long mainOutput = context.addOutput(context.getOutput(transform)); translateFn( - transform.getNewFn(), + transform.getFn(), context.getInput(transform).getWindowingStrategy(), transform.getSideInputs(), context.getInput(transform).getCoder(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index ac91892..5dd6beb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -227,7 +227,7 @@ public final class TransformTranslator { return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() { @Override public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) { - DoFn<InputT, OutputT> doFn = transform.getNewFn(); + DoFn<InputT, OutputT> doFn = transform.getFn(); rejectStateAndTimers(doFn); @SuppressWarnings("unchecked") JavaRDD<WindowedValue<InputT>> inRDD = @@ -250,7 +250,7 @@ public final class TransformTranslator { return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() { @Override public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) { - DoFn<InputT, OutputT> doFn = transform.getNewFn(); + DoFn<InputT, OutputT> doFn = transform.getFn(); rejectStateAndTimers(doFn); @SuppressWarnings("unchecked") JavaRDD<WindowedValue<InputT>> inRDD = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 27204ed..070ccbb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -344,7 +344,7 @@ final class StreamingTransformTranslator { @Override public void evaluate(final ParDo.Bound<InputT, OutputT> transform, final EvaluationContext context) { - final DoFn<InputT, OutputT> doFn = transform.getNewFn(); + final DoFn<InputT, OutputT> doFn = transform.getFn(); rejectStateAndTimers(doFn); final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = @@ -378,7 +378,7 @@ final class StreamingTransformTranslator { @Override public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform, final EvaluationContext context) { - final DoFn<InputT, OutputT> doFn = transform.getNewFn(); + final DoFn<InputT, OutputT> doFn = transform.getFn(); rejectStateAndTimers(doFn); final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java index ade5978..c79f779 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java @@ -70,10 +70,10 @@ class AggregatorPipelineExtractor { private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) { if (transform != null) { if (transform instanceof ParDo.Bound) { - return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getNewFn()); + return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn()); } else if (transform instanceof ParDo.BoundMulti) { return AggregatorRetriever.getAggregators( - ((ParDo.BoundMulti<?, ?>) transform).getNewFn()); + ((ParDo.BoundMulti<?, ?>) transform).getFn()); } } return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index d2149c0..f897f82 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -745,7 +745,7 @@ public class ParDo { @Override public PCollection<OutputT> expand(PCollection<? extends InputT> input) { checkArgument( - !isSplittable(getNewFn()), + !isSplittable(getFn()), "%s does not support Splittable DoFn", input.getPipeline().getOptions().getRunner().getName()); validateWindowType(input, fn); @@ -753,7 +753,7 @@ public class ParDo { input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setTypeDescriptor(getNewFn().getOutputTypeDescriptor()); + .setTypeDescriptor(getFn().getOutputTypeDescriptor()); } @Override @@ -761,14 +761,14 @@ public class ParDo { protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input) throws CannotProvideCoderException { return input.getPipeline().getCoderRegistry().getDefaultCoder( - getNewFn().getOutputTypeDescriptor(), - getNewFn().getInputTypeDescriptor(), + getFn().getOutputTypeDescriptor(), + getFn().getInputTypeDescriptor(), ((PCollection<InputT>) input).getCoder()); } @Override protected String getKindString() { - Class<?> clazz = getNewFn().getClass(); + Class<?> clazz = getFn().getClass(); if (clazz.isAnonymousClass()) { return "AnonymousParDo"; } else { @@ -789,7 +789,7 @@ public class ParDo { ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData); } - public DoFn<InputT, OutputT> getNewFn() { + public DoFn<InputT, OutputT> getFn() { return fn; } @@ -952,7 +952,7 @@ public class ParDo { @Override public PCollectionTuple expand(PCollection<? extends InputT> input) { checkArgument( - !isSplittable(getNewFn()), + !isSplittable(getFn()), "%s does not support Splittable DoFn", input.getPipeline().getOptions().getRunner().getName()); validateWindowType(input, fn); @@ -965,7 +965,7 @@ public class ParDo { // The fn will likely be an instance of an anonymous subclass // such as DoFn<Integer, String> { }, thus will have a high-fidelity // TypeDescriptor for the output type. - outputs.get(mainOutputTag).setTypeDescriptor(getNewFn().getOutputTypeDescriptor()); + outputs.get(mainOutputTag).setTypeDescriptor(getFn().getOutputTypeDescriptor()); return outputs; } @@ -984,13 +984,13 @@ public class ParDo { Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder(); return input.getPipeline().getCoderRegistry().getDefaultCoder( output.getTypeDescriptor(), - getNewFn().getInputTypeDescriptor(), + getFn().getInputTypeDescriptor(), inputCoder); } @Override protected String getKindString() { - Class<?> clazz = getNewFn().getClass(); + Class<?> clazz = getFn().getClass(); if (clazz.isAnonymousClass()) { return "AnonymousParMultiDo"; } else { @@ -1004,7 +1004,7 @@ public class ParDo { ParDo.populateDisplayData(builder, fn, fnDisplayData); } - public DoFn<InputT, OutputT> getNewFn() { + public DoFn<InputT, OutputT> getFn() { return fn; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java index c4e9b8a..1bf2c3d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java @@ -68,7 +68,7 @@ public class AggregatorPipelineExtractorTest { @SuppressWarnings("rawtypes") ParDo.Bound bound = mock(ParDo.Bound.class, "Bound"); AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>(); - when(bound.getNewFn()).thenReturn(fn); + when(bound.getFn()).thenReturn(fn); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn()); Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn()); @@ -96,7 +96,7 @@ public class AggregatorPipelineExtractorTest { @SuppressWarnings("rawtypes") ParDo.BoundMulti bound = mock(ParDo.BoundMulti.class, "BoundMulti"); AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>(); - when(bound.getNewFn()).thenReturn(fn); + when(bound.getFn()).thenReturn(fn); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn()); Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn()); @@ -126,8 +126,8 @@ public class AggregatorPipelineExtractorTest { @SuppressWarnings("rawtypes") ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound"); AggregatorProvidingDoFn<String, Math> fn = new AggregatorProvidingDoFn<>(); - when(bound.getNewFn()).thenReturn(fn); - when(otherBound.getNewFn()).thenReturn(fn); + when(bound.getFn()).thenReturn(fn); + when(otherBound.getFn()).thenReturn(fn); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn()); Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn()); @@ -162,7 +162,7 @@ public class AggregatorPipelineExtractorTest { AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>(); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn()); - when(bound.getNewFn()).thenReturn(fn); + when(bound.getFn()).thenReturn(fn); @SuppressWarnings("rawtypes") ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound"); @@ -170,7 +170,7 @@ public class AggregatorPipelineExtractorTest { AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>(); Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn()); - when(otherBound.getNewFn()).thenReturn(otherFn); + when(otherBound.getFn()).thenReturn(otherFn); TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class); when(transformNode.getTransform()).thenReturn(bound);