Full removal of Aggregators in Java SDK and Runners
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/615761a7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/615761a7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/615761a7 Branch: refs/heads/master Commit: 615761a77d2da6229dfa2cad5376d265afea8a62 Parents: 5bfd3e0 Author: Pablo <pabl...@google.com> Authored: Tue May 2 14:49:39 2017 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue May 2 16:37:27 2017 -0700 ---------------------------------------------------------------------- .../cookbook/CombinePerKeyExamples.java | 4 - .../beam/runners/core/AggregatorFactory.java | 38 ---- .../apache/beam/runners/core/DoFnRunners.java | 27 ++- .../apache/beam/runners/core/LateDataUtils.java | 6 +- .../beam/runners/core/SimpleDoFnRunner.java | 1 - .../beam/runners/core/SimpleOldDoFnRunner.java | 3 - .../core/LateDataDroppingDoFnRunnerTest.java | 27 --- .../beam/runners/core/SimpleDoFnRunnerTest.java | 9 - .../runners/core/SimpleOldDoFnRunnerTest.java | 2 +- .../runners/core/StatefulDoFnRunnerTest.java | 1 - .../runners/direct/AggregatorContainer.java | 200 ------------------- .../beam/runners/direct/EvaluationContext.java | 23 +-- .../GroupAlsoByWindowEvaluatorFactory.java | 23 +-- .../beam/runners/direct/ParDoEvaluator.java | 14 +- ...littableProcessElementsEvaluatorFactory.java | 2 - .../direct/StatefulParDoEvaluatorFactory.java | 1 - .../runners/direct/StepTransformResult.java | 8 - .../beam/runners/direct/TransformResult.java | 6 - .../beam/runners/direct/ParDoEvaluatorTest.java | 5 - .../beam/runners/spark/SparkPipelineResult.java | 5 - .../spark/aggregators/SparkAggregators.java | 110 ---------- .../SparkGroupAlsoByWindowViaWindowSet.java | 57 ++---- .../spark/translation/MultiDoFnFunction.java | 2 - ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 10 - .../spark/translation/SparkRuntimeContext.java | 81 -------- .../spark/aggregators/ClearAggregatorsRule.java | 38 ---- .../metrics/sink/NamedAggregatorsTest.java | 101 ---------- .../beam/sdk/AggregatorPipelineExtractor.java | 84 -------- .../beam/sdk/AggregatorRetrievalException.java | 33 --- .../org/apache/beam/sdk/AggregatorValues.java | 51 ----- .../main/java/org/apache/beam/sdk/Pipeline.java | 10 - .../beam/sdk/annotations/Experimental.java | 3 - .../apache/beam/sdk/transforms/Aggregator.java | 14 +- .../sdk/transforms/DelegatingAggregator.java | 126 ------------ .../org/apache/beam/sdk/transforms/Latest.java | 2 - .../harness/control/ProcessBundleHandler.java | 2 - .../fn/harness/fake/FakeAggregatorFactory.java | 52 ----- 37 files changed, 52 insertions(+), 1129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java index 39553a5..693f0c4 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java @@ -45,10 +45,6 @@ import org.apache.beam.sdk.values.PCollection; * list of play names in which that word appears, and saves this information * to a bigquery table. * - * <p>Concepts: the Combine.perKey transform, which lets you combine the values in a - * key-grouped Collection, and how to use an Aggregator to track information in the - * Monitoring UI. - * * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output * table. * http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java deleted file mode 100644 index 24a605f..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; - -/** - * A factory for creating aggregators. - */ -public interface AggregatorFactory { - /** - * Create an aggregator with the given {@code name} and {@link CombineFn}. - * - * <p>This method is called to create an aggregator for a {@link DoFn}. It receives the - * class of the {@link DoFn} being executed and the context of the step it is being - * executed in. - */ - <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, ExecutionContext.StepContext stepContext, - String aggregatorName, CombineFn<InputT, AccumT, OutputT> combine); -} http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 26e57f5..fe33af7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -48,6 +48,27 @@ public class DoFnRunners { <T> void output(TupleTag<T> tag, WindowedValue<T> output); } + @Deprecated + public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner( + PipelineOptions options, + DoFn<InputT, OutputT> fn, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + StepContext stepContext, + Object aggregatorFactory, + WindowingStrategy<?, ?> windowingStrategy) { + return simpleRunner(options, + fn, + sideInputReader, + outputManager, + mainOutputTag, + additionalOutputTags, + stepContext, + windowingStrategy); + } + /** * Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}. * @@ -63,7 +84,6 @@ public class DoFnRunners { TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, StepContext stepContext, - AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) { return new SimpleDoFnRunner<>( options, @@ -73,7 +93,6 @@ public class DoFnRunners { mainOutputTag, additionalOutputTags, stepContext, - aggregatorFactory, windowingStrategy); } @@ -90,7 +109,6 @@ public class DoFnRunners { TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, StepContext stepContext, - AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) { return new SimpleOldDoFnRunner<>( options, @@ -100,7 +118,6 @@ public class DoFnRunners { mainOutputTag, additionalOutputTags, stepContext, - aggregatorFactory, windowingStrategy); } @@ -151,7 +168,6 @@ public class DoFnRunners { TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, StepContext stepContext, - AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) { return new ProcessFnRunner<>( simpleRunner( @@ -162,7 +178,6 @@ public class DoFnRunners { mainOutputTag, additionalOutputTags, stepContext, - aggregatorFactory, windowingStrategy), views, sideInputReader); http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java index 17bd360..982d693 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java @@ -22,7 +22,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterables; import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.metrics.CounterCell; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; @@ -42,7 +42,7 @@ public class LateDataUtils { Iterable<WindowedValue<V>> elements, final TimerInternals timerInternals, final WindowingStrategy<?, ?> windowingStrategy, - final Aggregator<Long, Long> droppedDueToLateness) { + final CounterCell droppedDueToLateness) { return FluentIterable.from(elements) .transformAndConcat( // Explode windows to filter out expired ones @@ -71,7 +71,7 @@ public class LateDataUtils { .isBefore(timerInternals.currentInputWatermarkTime()); if (expired) { // The element is too late for this window. - droppedDueToLateness.addValue(1L); + droppedDueToLateness.inc(); WindowTracing.debug( "GroupAlsoByWindow: Dropping element at {} for key: {}; " + "window: {} since it is too far behind inputWatermark: {}", http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index edce9a2..8a3e25f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -106,7 +106,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, StepContext stepContext, - AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) { this.fn = fn; this.signature = DoFnSignatures.getSignature(fn.getClass()); http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index b5f8f45..4c3149a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -66,7 +66,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, StepContext stepContext, - AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) { this.fn = fn; this.context = new DoFnContext<>( @@ -77,7 +76,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT mainOutputTag, additionalOutputTags, stepContext, - aggregatorFactory, windowingStrategy == null ? null : windowingStrategy.getWindowFn()); } @@ -181,7 +179,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, StepContext stepContext, - AggregatorFactory aggregatorFactory, WindowFn<?, ?> windowFn) { fn.super(); this.options = options; http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java index 74fb562..bf78427 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java @@ -29,9 +29,6 @@ import org.apache.beam.runners.core.LateDataDroppingDoFnRunner.LateDataFilter; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -102,28 +99,4 @@ public class LateDataDroppingDoFnRunnerTest { Arrays.asList(WINDOW_FN.assignWindow(timestamp)), PaneInfo.NO_FIRING); } - - private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> { - private final String name; - private long sum = 0; - - public InMemoryLongSumAggregator(String name) { - this.name = name; - } - - @Override - public void addValue(Long value) { - sum += value; - } - - @Override - public String getName() { - return name; - } - - @Override - public CombineFn<Long, ?, Long> getCombineFn() { - return Sum.ofLongs(); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index 4ae5332..3e404ad 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -86,7 +86,6 @@ public class SimpleDoFnRunnerTest { null, Collections.<TupleTag<?>>emptyList(), mockStepContext, - null, WindowingStrategy.of(new GlobalWindows())); thrown.expect(UserCodeException.class); @@ -107,7 +106,6 @@ public class SimpleDoFnRunnerTest { null, Collections.<TupleTag<?>>emptyList(), mockStepContext, - null, WindowingStrategy.of(new GlobalWindows())); thrown.expect(UserCodeException.class); @@ -138,7 +136,6 @@ public class SimpleDoFnRunnerTest { null, Collections.<TupleTag<?>>emptyList(), mockStepContext, - null, WindowingStrategy.of(new GlobalWindows())); // Setting the timer needs the current time, as it is set relative @@ -167,7 +164,6 @@ public class SimpleDoFnRunnerTest { null, Collections.<TupleTag<?>>emptyList(), mockStepContext, - null, WindowingStrategy.of(new GlobalWindows())); thrown.expect(UserCodeException.class); @@ -188,7 +184,6 @@ public class SimpleDoFnRunnerTest { null, Collections.<TupleTag<?>>emptyList(), mockStepContext, - null, WindowingStrategy.of(new GlobalWindows())); thrown.expect(UserCodeException.class); @@ -215,7 +210,6 @@ public class SimpleDoFnRunnerTest { null, Collections.<TupleTag<?>>emptyList(), mockStepContext, - null, WindowingStrategy.of(windowFn)); Instant currentTime = new Instant(42); @@ -255,7 +249,6 @@ public class SimpleDoFnRunnerTest { new TupleTag<Duration>(), Collections.<TupleTag<?>>emptyList(), mockStepContext, - null, WindowingStrategy.of(new GlobalWindows())); runner.startBundle(); @@ -292,7 +285,6 @@ public class SimpleDoFnRunnerTest { new TupleTag<Duration>(), Collections.<TupleTag<?>>emptyList(), mockStepContext, - null, WindowingStrategy.of(new GlobalWindows())); runner.startBundle(); @@ -330,7 +322,6 @@ public class SimpleDoFnRunnerTest { new TupleTag<Duration>(), Collections.<TupleTag<?>>emptyList(), mockStepContext, - null, WindowingStrategy.of(new GlobalWindows())); runner.startBundle(); http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java index 8ded2dc..a73ef5e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java @@ -67,7 +67,7 @@ public class SimpleOldDoFnRunnerTest { List<TupleTag<?>> additionalOutputTags = Arrays.asList(); StepContext context = mock(StepContext.class); return new SimpleOldDoFnRunner<>( - null, fn, null, null, null, additionalOutputTags, context, null, null); + null, fn, null, null, null, additionalOutputTags, context, null); } static class ThrowingDoFn extends OldDoFn<String, String> { http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index f80643a..d4ff49e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -204,7 +204,6 @@ public class StatefulDoFnRunnerTest { null, Collections.<TupleTag<?>>emptyList(), mockStepContext, - null, WINDOWING_STRATEGY); } http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java deleted file mode 100644 index fd17704..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.auto.value.AutoValue; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; -import org.apache.beam.runners.core.AggregatorFactory; -import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine.CombineFn; - -/** - * AccumT container for the current values associated with {@link Aggregator Aggregators}. - */ -public class AggregatorContainer { - - private static class AggregatorInfo<InputT, AccumT, OutputT> - implements Aggregator<InputT, OutputT> { - private final String stepName; - private final String name; - private final CombineFn<InputT, AccumT, OutputT> combiner; - @GuardedBy("this") - private volatile AccumT accumulator = null; - private boolean committed = false; - - private AggregatorInfo( - String stepName, String name, CombineFn<InputT, AccumT, OutputT> combiner) { - this.stepName = stepName; - this.name = name; - this.combiner = combiner; - } - - @Override - public synchronized void addValue(InputT input) { - checkState(!committed, "Cannot addValue after committing"); - if (accumulator == null) { - accumulator = combiner.createAccumulator(); - } - accumulator = combiner.addInput(accumulator, input); - } - - public synchronized OutputT getOutput() { - return accumulator == null ? null : combiner.extractOutput(accumulator); - } - - private void merge(AggregatorInfo<?, ?, ?> other) { - // Aggregators are only merged if they are the same (same step, same name). - // As a result, they should also have the same CombineFn, so this is safe. - AggregatorInfo<InputT, AccumT, OutputT> otherSafe = - (AggregatorInfo<InputT, AccumT, OutputT>) other; - mergeSafe(otherSafe); - } - - private synchronized void mergeSafe(AggregatorInfo<InputT, AccumT, OutputT> other) { - if (accumulator == null) { - accumulator = other.accumulator; - } else if (other.accumulator != null) { - accumulator = combiner.mergeAccumulators(Arrays.asList(accumulator, other.accumulator)); - } - } - - public String getStepName() { - return name; - } - - @Override - public String getName() { - return name; - } - - @Override - public CombineFn<InputT, ?, OutputT> getCombineFn() { - return combiner; - } - } - - private final ConcurrentMap<AggregatorKey, AggregatorInfo<?, ?, ?>> accumulators = - new ConcurrentHashMap<>(); - - private AggregatorContainer() { - } - - public static AggregatorContainer create() { - return new AggregatorContainer(); - } - - @Nullable - <OutputT> OutputT getAggregate(String stepName, String aggregatorName) { - AggregatorInfo<?, ?, OutputT> aggregatorInfo = - (AggregatorInfo<?, ?, OutputT>) accumulators.get( - AggregatorKey.create(stepName, aggregatorName)); - return aggregatorInfo == null ? null : aggregatorInfo.getOutput(); - } - - public Mutator createMutator() { - return new Mutator(this); - } - - /** - * AccumT class for mutations to the aggregator values. - */ - public static class Mutator implements AggregatorFactory { - - private final Map<AggregatorKey, AggregatorInfo<?, ?, ?>> accumulatorDeltas = new HashMap<>(); - private final AggregatorContainer container; - private boolean committed = false; - - private Mutator(AggregatorContainer container) { - this.container = container; - } - - public void commit() { - checkState(!committed, "Should not be already committed"); - committed = true; - - for (Map.Entry<AggregatorKey, AggregatorInfo<?, ?, ?>> entry : accumulatorDeltas.entrySet()) { - AggregatorInfo<?, ?, ?> previous = container.accumulators.get(entry.getKey()); - entry.getValue().committed = true; - if (previous == null) { - previous = container.accumulators.putIfAbsent(entry.getKey(), entry.getValue()); - } - if (previous != null) { - previous.merge(entry.getValue()); - previous.committed = true; - } - } - } - - @Override - public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, - ExecutionContext.StepContext step, - String name, - CombineFn<InputT, AccumT, OutputT> combine) { - return createAggregatorForStep(step, name, combine); - } - - public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createSystemAggregator( - ExecutionContext.StepContext step, - String name, - CombineFn<InputT, AccumT, OutputT> combiner) { - return createAggregatorForStep(step, name, combiner); - } - - private <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForStep( - ExecutionContext.StepContext step, - String name, - CombineFn<InputT, AccumT, OutputT> combine) { - checkState(!committed, "Cannot create aggregators after committing"); - AggregatorKey key = AggregatorKey.create(step.getStepName(), name); - AggregatorInfo<?, ?, ?> aggregatorInfo = accumulatorDeltas.get(key); - if (aggregatorInfo != null) { - AggregatorInfo<InputT, ?, OutputT> typedAggregatorInfo = - (AggregatorInfo<InputT, ?, OutputT>) aggregatorInfo; - return typedAggregatorInfo; - } else { - AggregatorInfo<InputT, ?, OutputT> typedAggregatorInfo = - new AggregatorInfo<>(step.getStepName(), name, combine); - accumulatorDeltas.put(key, typedAggregatorInfo); - return typedAggregatorInfo; - } - } - } - - /** - * Aggregators are identified by a step name and an aggregator name. - */ - @AutoValue - public abstract static class AggregatorKey { - public static AggregatorKey create(String stepName, String aggregatorName) { - return new AutoValue_AggregatorContainer_AggregatorKey(stepName, aggregatorName); - } - - public abstract String getStepName(); - public abstract String aggregatorName(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index f6d9a36..93d6f96 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -62,7 +62,7 @@ import org.joda.time.Instant; * <p>{@link EvaluationContext} contains shared state for an execution of the * {@link DirectRunner} that can be used while evaluating a {@link PTransform}. This * consists of views into underlying state and watermark implementations, access to read and write - * {@link PCollectionView PCollectionViews}, and managing the {@link AggregatorContainer} and + * {@link PCollectionView PCollectionViews}, and managing the * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and * known to be empty). @@ -95,8 +95,6 @@ class EvaluationContext { private final SideInputContainer sideInputContainer; - private final AggregatorContainer mergedAggregators; - private final DirectMetrics metrics; private final Set<PValue> keyedPValues; @@ -126,7 +124,6 @@ class EvaluationContext { this.sideInputContainer = SideInputContainer.create(this, graph.getViews()); this.applicationStateInternals = new ConcurrentHashMap<>(); - this.mergedAggregators = AggregatorContainer.create(); this.metrics = new DirectMetrics(); this.callbackExecutor = @@ -174,10 +171,6 @@ class EvaluationContext { : completedBundle.withElements((Iterable) result.getUnprocessedElements()), committedBundles, outputTypes); - // Commit aggregator changes - if (result.getAggregatorChanges() != null) { - result.getAggregatorChanges().commit(); - } // Update state internals CopyOnAccessInMemoryStateInternals theirState = result.getState(); if (theirState != null) { @@ -362,20 +355,6 @@ class EvaluationContext { return sideInputContainer.createReaderForViews(sideInputs); } - /** - * Returns a new mutator for the {@link AggregatorContainer}. - */ - public AggregatorContainer.Mutator getAggregatorMutator() { - return mergedAggregators.createMutator(); - } - - /** - * Returns the counter container for this context. - */ - public AggregatorContainer getAggregatorContainer() { - return mergedAggregators; - } - /** Returns the metrics container for this pipeline. */ public DirectMetrics getMetrics() { return metrics; http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 9f567a4..d006553 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -42,10 +42,10 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowTracing; @@ -113,11 +113,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { private final StructuralKey<?> structuralKey; private final Collection<UncommittedBundle<?>> outputBundles; private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements; - private final AggregatorContainer.Mutator aggregatorChanges; private final SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn; - private final Aggregator<Long, Long> droppedDueToClosedWindow; - private final Aggregator<Long, Long> droppedDueToLateness; + private final Counter droppedDueToClosedWindow; + private final Counter droppedDueToLateness; public GroupAlsoByWindowEvaluator( final EvaluationContext evaluationContext, @@ -140,17 +139,14 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { outputBundles = new ArrayList<>(); unprocessedElements = ImmutableList.builder(); - aggregatorChanges = evaluationContext.getAggregatorMutator(); Coder<V> valueCoder = application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder()); reduceFn = SystemReduceFn.buffering(valueCoder); - droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext, - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, - Sum.ofLongs()); - droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext, - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, - Sum.ofLongs()); + droppedDueToClosedWindow = Metrics.counter(GroupAlsoByWindowEvaluator.class, + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); + droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class, + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER); } @Override @@ -197,7 +193,6 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { .withState(state) .addOutput(outputBundles) .withTimerUpdate(stepContext.getTimerUpdate()) - .withAggregatorChanges(aggregatorChanges) .addUnprocessedElements(unprocessedElements.build()) .build(); } @@ -229,7 +224,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { .isBefore(timerInternals.currentInputWatermarkTime()); if (expired) { // The element is too late for this window. - droppedDueToLateness.addValue(1L); + droppedDueToLateness.inc(); WindowTracing.debug( "GroupAlsoByWindow: Dropping element at {} for key: {}; " + "window: {} since it is too far behind inputWatermark: {}", http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 053da31..2ea8a91 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -54,7 +54,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, DirectStepContext stepContext, - AggregatorContainer.Mutator aggregatorChanges, WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy); } @@ -70,7 +69,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, DirectStepContext stepContext, - AggregatorContainer.Mutator aggregatorChanges, WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) { DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner( @@ -81,7 +79,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { mainOutputTag, additionalOutputTags, stepContext, - aggregatorChanges, windowingStrategy); return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader); } @@ -100,7 +97,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { List<TupleTag<?>> additionalOutputTags, Map<TupleTag<?>, PCollection<?>> outputs, DoFnRunnerFactory<InputT, OutputT> runnerFactory) { - AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator(); BundleOutputManager outputManager = createOutputManager(evaluationContext, key, outputs); @@ -116,19 +112,17 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { mainOutputTag, additionalOutputTags, stepContext, - aggregatorChanges, windowingStrategy); - return create(runner, stepContext, application, aggregatorChanges, outputManager); + return create(runner, stepContext, application, outputManager); } public static <InputT, OutputT> ParDoEvaluator<InputT> create( PushbackSideInputDoFnRunner<InputT, OutputT> runner, DirectStepContext stepContext, AppliedPTransform<?, ?, ?> application, - AggregatorContainer.Mutator aggregatorChanges, BundleOutputManager outputManager) { - return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext); + return new ParDoEvaluator<>(runner, application, outputManager, stepContext); } static BundleOutputManager createOutputManager( @@ -155,7 +149,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner; private final AppliedPTransform<?, ?, ?> transform; - private final AggregatorContainer.Mutator aggregatorChanges; private final BundleOutputManager outputManager; private final DirectStepContext stepContext; @@ -164,14 +157,12 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { private ParDoEvaluator( PushbackSideInputDoFnRunner<InputT, ?> fnRunner, AppliedPTransform<?, ?, ?> transform, - AggregatorContainer.Mutator aggregatorChanges, BundleOutputManager outputManager, DirectStepContext stepContext) { this.fnRunner = fnRunner; this.transform = transform; this.outputManager = outputManager; this.stepContext = stepContext; - this.aggregatorChanges = aggregatorChanges; this.unprocessedElements = ImmutableList.builder(); try { @@ -222,7 +213,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { return resultBuilder .addOutput(outputManager.bundles.values()) .withTimerUpdate(stepContext.getTimerUpdate()) - .withAggregatorChanges(aggregatorChanges) .addUnprocessedElements(unprocessedElements.build()) .build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index e0adc40..5f6b4f7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -197,7 +197,6 @@ class SplittableProcessElementsEvaluatorFactory< TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, DirectExecutionContext.DirectStepContext stepContext, - AggregatorContainer.Mutator aggregatorChanges, WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) { ProcessFn<InputT, OutputT, RestrictionT, ?> processFn = (ProcessFn) fn; @@ -210,7 +209,6 @@ class SplittableProcessElementsEvaluatorFactory< mainOutputTag, additionalOutputTags, stepContext, - aggregatorChanges, windowingStrategy); } }; http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 93ab077..7cf3840 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -261,7 +261,6 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo delegateResult.getTransform(), delegateResult.getWatermarkHold()) .withTimerUpdate(delegateResult.getTimerUpdate()) .withState(delegateResult.getState()) - .withAggregatorChanges(delegateResult.getAggregatorChanges()) .withMetricUpdates(delegateResult.getLogicalMetricUpdates()) .addOutput(Lists.newArrayList(delegateResult.getOutputBundles())); http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java index fe3ae97..2a2ccab 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -54,7 +54,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp getTransform(), getOutputBundles(), getUnprocessedElements(), - getAggregatorChanges(), metricUpdates, getWatermarkHold(), getState(), @@ -72,7 +71,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp private MetricUpdates metricUpdates; private CopyOnAccessInMemoryStateInternals state; private TimerUpdate timerUpdate; - private AggregatorContainer.Mutator aggregatorChanges; private final Set<OutputType> producedOutputs; private final Instant watermarkHold; @@ -91,7 +89,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp transform, bundlesBuilder.build(), unprocessedElementsBuilder.build(), - aggregatorChanges, metricUpdates, watermarkHold, state, @@ -99,11 +96,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp producedOutputs); } - public Builder<InputT> withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) { - this.aggregatorChanges = aggregatorChanges; - return this; - } - public Builder<InputT> withMetricUpdates(MetricUpdates metricUpdates) { this.metricUpdates = metricUpdates; return this; http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java index bde44ca..3a95df7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java @@ -61,12 +61,6 @@ public interface TransformResult<InputT> { Iterable<? extends WindowedValue<InputT>> getUnprocessedElements(); /** - * Returns the {@link AggregatorContainer.Mutator} used by this {@link PTransform}, or null if - * this transform did not use an {@link AggregatorContainer.Mutator}. - */ - @Nullable AggregatorContainer.Mutator getAggregatorChanges(); - - /** * Returns the logical metric updates. */ MetricUpdates getLogicalMetricUpdates(); http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index e99e4bf..69dbc22 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -150,11 +150,6 @@ public class ParDoEvaluatorTest { Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class))) .thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - @SuppressWarnings("unchecked") AppliedPTransform<PCollection<Integer>, ?, ?> transform = (AppliedPTransform<PCollection<Integer>, ?, ?>) DirectGraphs.getProducer(output); http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index 1110a55..3e94a45 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -24,7 +24,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.metrics.SparkMetricResults; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.sdk.Pipeline; @@ -77,10 +76,6 @@ public abstract class SparkPipelineResult implements PipelineResult { protected abstract State awaitTermination(Duration duration) throws TimeoutException, ExecutionException, InterruptedException; - public <T> T getAggregatorValue(final String name, final Class<T> resultType) { - return SparkAggregators.valueOf(name, resultType); - } - @Override public PipelineResult.State getState() { return state; http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java deleted file mode 100644 index 1da196b..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.aggregators; - -import com.google.common.collect.ImmutableList; -import java.util.Collection; -import java.util.Map; -import org.apache.beam.runners.core.AggregatorFactory; -import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.runners.spark.translation.SparkRuntimeContext; -import org.apache.beam.sdk.AggregatorValues; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.spark.Accumulator; - -/** - * A utility class for handling Beam {@link Aggregator}s. - */ -public class SparkAggregators { - - private static <T> AggregatorValues<T> valueOf(final Accumulator<NamedAggregators> accum, - final Aggregator<?, T> aggregator) { - @SuppressWarnings("unchecked") - Class<T> valueType = (Class<T>) aggregator.getCombineFn().getOutputType().getRawType(); - final T value = valueOf(accum, aggregator.getName(), valueType); - - return new AggregatorValues<T>() { - - @Override - public Collection<T> getValues() { - return ImmutableList.of(value); - } - - @Override - public Map<String, T> getValuesAtSteps() { - throw new UnsupportedOperationException("getValuesAtSteps is not supported."); - } - }; - } - - private static <T> T valueOf(final Accumulator<NamedAggregators> accum, - final String aggregatorName, - final Class<T> typeClass) { - return accum.value().getValue(aggregatorName, typeClass); - } - - /** - * Retrieves the value of an aggregator from a SparkContext instance. - * - * @param aggregator The aggregator whose value to retrieve - * @param <T> The type of the aggregator's output - * @return The value of the aggregator - */ - public static <T> AggregatorValues<T> valueOf(final Aggregator<?, T> aggregator) { - return valueOf(AggregatorsAccumulator.getInstance(), aggregator); - } - - /** - * Retrieves the value of an aggregator from a SparkContext instance. - * - * @param name Name of the aggregator to retrieve the value of. - * @param typeClass Type class of value to be retrieved. - * @param <T> Type of object to be returned. - * @return The value of the aggregator. - */ - public static <T> T valueOf(final String name, final Class<T> typeClass) { - return valueOf(AggregatorsAccumulator.getInstance(), name, typeClass); - } - - /** - * An implementation of {@link AggregatorFactory} for the SparkRunner. - */ - public static class Factory implements AggregatorFactory { - - private final SparkRuntimeContext runtimeContext; - private final Accumulator<NamedAggregators> accumulator; - - public Factory(SparkRuntimeContext runtimeContext, Accumulator<NamedAggregators> accumulator) { - this.runtimeContext = runtimeContext; - this.accumulator = accumulator; - } - - @Override - public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, - ExecutionContext.StepContext stepContext, - String aggregatorName, - Combine.CombineFn<InputT, AccumT, OutputT> combine) { - - return runtimeContext.createAggregator(accumulator, aggregatorName, combine); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index c59e0e7..4a2851d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -43,9 +43,9 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.metrics.CounterCell; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -207,10 +207,13 @@ public class SparkGroupAlsoByWindowViaWindowSet { new OutputWindowedValueHolder<>(); // use in memory Aggregators since Spark Accumulators are not resilient // in stateful operators, once done with this partition. - final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator( - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); - final InMemoryLongSumAggregator droppedDueToLateness = new InMemoryLongSumAggregator( - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER); + final MetricsContainer cellProvider = new MetricsContainer("cellProvider"); + final CounterCell droppedDueToClosedWindow = cellProvider.getCounter( + MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER)); + final CounterCell droppedDueToLateness = cellProvider.getCounter( + MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER)); AbstractIterator< Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>> @@ -315,15 +318,15 @@ public class SparkGroupAlsoByWindowViaWindowSet { }; // log if there's something to log. - long lateDropped = droppedDueToLateness.getSum(); + long lateDropped = droppedDueToLateness.getCumulative(); if (lateDropped > 0) { LOG.info(String.format("Dropped %d elements due to lateness.", lateDropped)); - droppedDueToLateness.zero(); + droppedDueToLateness.inc(-droppedDueToLateness.getCumulative()); } - long closedWindowDropped = droppedDueToClosedWindow.getSum(); + long closedWindowDropped = droppedDueToClosedWindow.getCumulative(); if (closedWindowDropped > 0) { LOG.info(String.format("Dropped %d elements due to closed window.", closedWindowDropped)); - droppedDueToClosedWindow.zero(); + droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative()); } return scala.collection.JavaConversions.asScalaIterator(outIter); @@ -421,36 +424,4 @@ public class SparkGroupAlsoByWindowViaWindowSet { "Tagged outputs are not allowed in GroupAlsoByWindow."); } } - - private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> { - private final String name; - private long sum = 0; - - public void zero() { - sum = 0; - } - - public long getSum() { - return sum; - } - - InMemoryLongSumAggregator(String name) { - this.name = name; - } - - @Override - public void addValue(Long value) { - sum += value; - } - - @Override - public String getName() { - return name; - } - - @Override - public Combine.CombineFn<Long, ?, Long> getCombineFn() { - return Sum.ofLongs(); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 4cd1683..410b7de 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -28,7 +28,6 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.spark.aggregators.NamedAggregators; -import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; @@ -104,7 +103,6 @@ public class MultiDoFnFunction<InputT, OutputT> mainOutputTag, Collections.<TupleTag<?>>emptyList(), new SparkProcessContext.NoOpStepContext(), - new SparkAggregators.Factory(runtimeContext, aggAccum), windowingStrategy); DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics = http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index 063feef..9ee52de 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn; -import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; @@ -35,8 +34,6 @@ import org.apache.beam.runners.core.construction.Triggers; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.spark.aggregators.NamedAggregators; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -59,8 +56,6 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde private final StateInternalsFactory<K> stateInternalsFactory; private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn; private final SparkRuntimeContext runtimeContext; - private final Aggregator<Long, Long> droppedDueToClosedWindow; - public SparkGroupAlsoByWindowViaOutputBufferFn( WindowingStrategy<?, W> windowingStrategy, @@ -72,11 +67,6 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde this.stateInternalsFactory = stateInternalsFactory; this.reduceFn = reduceFn; this.runtimeContext = runtimeContext; - - droppedDueToClosedWindow = runtimeContext.createAggregator( - accumulator, - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, - Sum.ofLongs()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 6abab17..6bba863 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -22,19 +22,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.spark.Accumulator; /** * The SparkRuntimeContext allows us to define useful features on the client side before our @@ -44,9 +36,6 @@ public class SparkRuntimeContext implements Serializable { private final String serializedPipelineOptions; private transient CoderRegistry coderRegistry; - // map for names to Beam aggregators. - private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>(); - SparkRuntimeContext(Pipeline pipeline) { this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions()); } @@ -71,45 +60,6 @@ public class SparkRuntimeContext implements Serializable { return PipelineOptionsHolder.getOrInit(serializedPipelineOptions); } - /** - * Creates and aggregator and associates it with the specified name. - * - * @param accum Spark Accumulator. - * @param named Name of aggregator. - * @param combineFn Combine function used in aggregation. - * @param <InputT> Type of inputs to aggregator. - * @param <InterT> Intermediate data type - * @param <OutputT> Type of aggregator outputs. - * @return Specified aggregator - */ - public synchronized <InputT, InterT, OutputT> Aggregator<InputT, OutputT> createAggregator( - Accumulator<NamedAggregators> accum, - String named, - Combine.CombineFn<? super InputT, InterT, OutputT> combineFn) { - @SuppressWarnings("unchecked") - Aggregator<InputT, OutputT> aggregator = (Aggregator<InputT, OutputT>) aggregators.get(named); - try { - if (aggregator == null) { - @SuppressWarnings("unchecked") - final - NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state = - new NamedAggregators.CombineFunctionState<>( - (Combine.CombineFn<InputT, InterT, OutputT>) combineFn, - // hidden assumption: InputT == OutputT - (Coder<InputT>) getCoderRegistry().getCoder(combineFn.getOutputType()), - this); - - accum.add(new NamedAggregators(named, state)); - aggregator = new SparkAggregator<>(named, state); - aggregators.put(named, aggregator); - } - return aggregator; - } catch (CannotProvideCoderException e) { - throw new RuntimeException(String.format("Unable to create an aggregator named: [%s]", named), - e); - } - } - public CoderRegistry getCoderRegistry() { if (coderRegistry == null) { coderRegistry = CoderRegistry.createDefault(); @@ -135,35 +85,4 @@ public class SparkRuntimeContext implements Serializable { return pipelineOptions; } } - - /** - * Initialize spark aggregators exactly once. - * - * @param <InputT> Type of element fed in to aggregator. - */ - private static class SparkAggregator<InputT, OutputT> - implements Aggregator<InputT, OutputT>, Serializable { - private final String name; - private final NamedAggregators.State<InputT, ?, OutputT> state; - - SparkAggregator(String name, NamedAggregators.State<InputT, ?, OutputT> state) { - this.name = name; - this.state = state; - } - - @Override - public String getName() { - return name; - } - - @Override - public void addValue(InputT elem) { - state.update(elem); - } - - @Override - public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() { - return state.getCombineFn(); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java deleted file mode 100644 index 0b31acc..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.aggregators; - -import org.junit.rules.ExternalResource; - - -/** - * A rule that clears the {@link AggregatorsAccumulator} - * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s. - */ -public class ClearAggregatorsRule extends ExternalResource { - - @Override - protected void before() throws Throwable { - clearNamedAggregators(); - } - - public void clearNamedAggregators() { - AggregatorsAccumulator.clear(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java deleted file mode 100644 index dbd8cac..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.aggregators.metrics.sink; - -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; - -import com.google.common.collect.ImmutableSet; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import org.apache.beam.runners.spark.PipelineRule; -import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule; -import org.apache.beam.runners.spark.aggregators.SparkAggregators; -import org.apache.beam.runners.spark.examples.WordCount; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExternalResource; - - -/** - * A test for the NamedAggregators mechanism. - */ -public class NamedAggregatorsTest { - - @Rule - public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); - - @Rule - public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule(); - - @Rule - public final PipelineRule pipelineRule = PipelineRule.batch(); - - private Pipeline createSparkPipeline() { - pipelineRule.getOptions().setEnableSparkMetricSinks(true); - return pipelineRule.createPipeline(); - } - - private void runPipeline() { - - final List<String> words = - Arrays.asList("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"); - - final Set<String> expectedCounts = - ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); - - final Pipeline pipeline = createSparkPipeline(); - - final PCollection<String> output = - pipeline - .apply(Create.of(words).withCoder(StringUtf8Coder.of())) - .apply(new WordCount.CountWords()) - .apply(MapElements.via(new WordCount.FormatAsTextFn())); - - PAssert.that(output).containsInAnyOrder(expectedCounts); - - pipeline.run(); - } - - @Test - public void testNamedAggregators() throws Exception { - assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue())); - - runPipeline(); - - assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d)); - } - - @Test - public void testNonExistingAggregatorName() throws Exception { - runPipeline(); - - final Long valueOf = SparkAggregators.valueOf("myMissingAggregator", Long.class); - - assertThat(valueOf, is(nullValue())); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java deleted file mode 100644 index eeb9b45..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.SetMultimap; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PValue; - -/** - * Retrieves {@link Aggregator Aggregators} at each {@link ParDo} and returns a {@link Map} of - * {@link Aggregator} to the {@link PTransform PTransforms} in which it is present. - */ -@Deprecated -class AggregatorPipelineExtractor { - private final Pipeline pipeline; - - /** - * Creates an {@code AggregatorPipelineExtractor} for the given {@link Pipeline}. - */ - public AggregatorPipelineExtractor(Pipeline pipeline) { - this.pipeline = pipeline; - } - - /** - * Returns a {@link Map} between each {@link Aggregator} in the {@link Pipeline} to the {@link - * PTransform PTransforms} in which it is used. - */ - public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() { - HashMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps = HashMultimap.create(); - pipeline.traverseTopologically(new AggregatorVisitor(aggregatorSteps)); - return aggregatorSteps.asMap(); - } - - private static class AggregatorVisitor extends PipelineVisitor.Defaults { - private final SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps; - - public AggregatorVisitor(SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps) { - this.aggregatorSteps = aggregatorSteps; - } - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - PTransform<?, ?> transform = node.getTransform(); - addStepToAggregators(transform, getAggregators(transform)); - } - - private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) { - return Collections.emptyList(); - } - - private void addStepToAggregators( - PTransform<?, ?> transform, Collection<Aggregator<?, ?>> aggregators) { - for (Aggregator<?, ?> aggregator : aggregators) { - aggregatorSteps.put(aggregator, transform); - } - } - - @Override - public void visitValue(PValue value, TransformHierarchy.Node producer) {} - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java deleted file mode 100644 index 3040815..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk; - -import org.apache.beam.sdk.transforms.Aggregator; - -/** - * Signals that an exception has occurred while retrieving {@link Aggregator}s. - */ -public class AggregatorRetrievalException extends Exception { - /** - * Constructs a new {@code AggregatorRetrievalException} with the specified detail message and - * cause. - */ - public AggregatorRetrievalException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java deleted file mode 100644 index 1fd034a..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk; - -import java.util.Collection; -import java.util.Map; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; - -/** - * A collection of values associated with an {@link Aggregator}. Aggregators declared in a - * {@link DoFn} are emitted on a per-{@link DoFn}-application basis. - * - * @param <T> the output type of the aggregator - */ -public abstract class AggregatorValues<T> { - /** - * Get the values of the {@link Aggregator} at all steps it was used. - */ - public Collection<T> getValues() { - return getValuesAtSteps().values(); - } - - /** - * Get the values of the {@link Aggregator} by the user name at each step it was used. - */ - public abstract Map<String, T> getValuesAtSteps(); - - /** - * Get the total value of this {@link Aggregator} by applying the specified {@link CombineFn}. - */ - public T getTotalValue(CombineFn<T, ?, T> combineFn) { - return combineFn.apply(getValues()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index ab8906a..351e1b8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -24,7 +24,6 @@ import com.google.common.base.Joiner; import com.google.common.collect.HashMultimap; import com.google.common.collect.SetMultimap; import java.util.ArrayList; -import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -40,7 +39,6 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformHierarchy.Node; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; @@ -571,14 +569,6 @@ public class Pipeline { } /** - * Returns a {@link Map} from each {@link Aggregator} in the {@link Pipeline} to the {@link - * PTransform PTransforms} in which it is used. - */ - public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() { - return new AggregatorPipelineExtractor(this).getAggregatorSteps(); - } - - /** * Builds a name from a "/"-delimited prefix and a name. */ private String buildName(String namePrefix, String name) { http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index f720599..7255a01 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -63,9 +63,6 @@ public @interface Experimental { /** Trigger-related experimental APIs. */ TRIGGER, - /** Aggregator-related experimental APIs. */ - AGGREGATOR, - /** Experimental APIs for Coder binary format identifiers. */ CODER_ENCODING_ID, http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java index c957100..6c21b8c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java @@ -26,21 +26,9 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; * @param <InputT> the type of input values * @param <OutputT> the type of output values */ +@Deprecated public interface Aggregator<InputT, OutputT> { - - /** - * Adds a new value into the Aggregator. - */ void addValue(InputT value); - - /** - * Returns the name of the Aggregator. - */ String getName(); - - /** - * Returns the {@link CombineFn}, which combines input elements in the - * aggregator. - */ CombineFn<InputT, ?, OutputT> getCombineFn(); }