Repository: beam Updated Branches: refs/heads/master 0c26d024d -> e6f94a85a
BEAM-1766 Remove Aggregators from Apex runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62fc6489 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62fc6489 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62fc6489 Branch: refs/heads/master Commit: 62fc6489699130b50237731105884d3a3f993db9 Parents: 634bf4e Author: Thomas Weise <[email protected]> Authored: Thu Apr 27 09:13:19 2017 -0700 Committer: Thomas Weise <[email protected]> Committed: Thu Apr 27 09:13:19 2017 -0700 ---------------------------------------------------------------------- .../operators/ApexParDoOperator.java | 47 +------------------- 1 file changed, 2 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/62fc6489/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 52d1d43..f4c617d 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -39,11 +39,9 @@ import org.apache.beam.runners.apex.translation.utils.NoOpStepContext; import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.apex.translation.utils.StateInternalsProxy; import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable; -import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; @@ -57,8 +55,6 @@ import org.apache.beam.runners.core.TimerInternalsFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; @@ -337,7 +333,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements mainOutputTag, additionalOutputTags, stepContext, - new NoOpAggregatorFactory(), + null, windowingStrategy ); @@ -362,7 +358,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements doFn, doFnRunner, stepContext, - new NoOpAggregatorFactory(), + null, windowingStrategy, cleanupTimer, stateCleaner); @@ -387,45 +383,6 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements public void endWindow() { } - /** - * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode. - * It is called from {@link org.apache.beam.runners.core.SimpleDoFnRunner}. - */ - public static class NoOpAggregatorFactory implements AggregatorFactory { - - private NoOpAggregatorFactory() { - } - - @Override - public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, ExecutionContext.StepContext step, - String name, CombineFn<InputT, AccumT, OutputT> combine) { - return new NoOpAggregator<>(); - } - - private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>, - java.io.Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void addValue(InputT value) { - } - - @Override - public String getName() { - // TODO Auto-generated method stub - return null; - } - - @Override - public CombineFn<InputT, ?, OutputT> getCombineFn() { - // TODO Auto-generated method stub - return null; - } - - }; - } - private static class LongMin { long state = Long.MAX_VALUE;
