Remove Counter and associated code Aggregator is the model level concept. Counter was specific to the Dataflow Runner, and is now not needed as part of Beam.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d20a7ada Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d20a7ada Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d20a7ada Branch: refs/heads/master Commit: d20a7ada7eb3ee714917e7c334e1b15ecc2c3b03 Parents: 2a1055d Author: bchambers <[email protected]> Authored: Fri Jul 29 09:41:17 2016 -0700 Committer: bchambers <[email protected]> Committed: Thu Aug 11 10:26:04 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/util/DoFnRunners.java | 78 -- .../beam/runners/dataflow/DataflowRunner.java | 4 +- .../org/apache/beam/sdk/transforms/Combine.java | 13 - .../org/apache/beam/sdk/transforms/Max.java | 27 +- .../org/apache/beam/sdk/transforms/Min.java | 28 +- .../org/apache/beam/sdk/transforms/Sum.java | 27 +- .../apache/beam/sdk/util/CounterAggregator.java | 128 -- .../apache/beam/sdk/util/common/Counter.java | 1287 ------------------ .../beam/sdk/util/common/CounterName.java | 153 --- .../beam/sdk/util/common/CounterProvider.java | 27 - .../apache/beam/sdk/util/common/CounterSet.java | 179 --- .../util/common/ElementByteSizeObserver.java | 24 +- .../beam/sdk/util/CounterAggregatorTest.java | 256 ---- .../beam/sdk/util/common/CounterSetTest.java | 227 --- .../beam/sdk/util/common/CounterTest.java | 736 ---------- 15 files changed, 15 insertions(+), 3179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java index a9f3cf4..6089228 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java @@ -23,8 +23,6 @@ import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor; import org.apache.beam.sdk.util.ExecutionContext.StepContext; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; @@ -72,33 +70,6 @@ public class DoFnRunners { } /** - * Returns a basic implementation of {@link DoFnRunner} that works for most - * {@link OldDoFn OldDoFns}. - * - * <p>It invokes {@link OldDoFn#processElement} for each input. - */ - public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner( - PipelineOptions options, - OldDoFn<InputT, OutputT> fn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - CounterSet.AddCounterMutator addCounterMutator, - WindowingStrategy<?, ?> windowingStrategy) { - return simpleRunner(options, - fn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - CounterAggregator.factoryFor(addCounterMutator), - windowingStrategy); - } - - /** * Returns an implementation of {@link DoFnRunner} that handles late data dropping. * * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}. @@ -132,33 +103,6 @@ public class DoFnRunners { reduceFnExecutor.getDroppedDueToLatenessAggregator()); } - /** - * Returns an implementation of {@link DoFnRunner} that handles late data dropping. - * - * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}. - */ - public static <K, InputT, OutputT, W extends BoundedWindow> - DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner( - PipelineOptions options, - ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<KV<K, OutputT>> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - CounterSet.AddCounterMutator addCounterMutator, - WindowingStrategy<?, W> windowingStrategy) { - return lateDataDroppingRunner( - options, - reduceFnExecutor, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - CounterAggregator.factoryFor(addCounterMutator), - windowingStrategy); - } public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault( PipelineOptions options, @@ -197,26 +141,4 @@ public class DoFnRunners { aggregatorFactory, windowingStrategy); } - - public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault( - PipelineOptions options, - OldDoFn<InputT, OutputT> doFn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - AddCounterMutator addCounterMutator, - WindowingStrategy<?, ?> windowingStrategy) { - return createDefault( - options, - doFn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - CounterAggregator.factoryFor(addCounterMutator), - windowingStrategy); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index bea6264..667a63b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -212,9 +212,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // Default Docker container images that execute Dataflow worker harness, residing in Google // Container Registry, separately for Batch and Streaming. public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE = - "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160804-dofn"; + "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160810"; public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE = - "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160804-dofn"; + "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160810"; // The limit of CreateJob request size. private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index a825800..6ba3f8a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -49,7 +49,6 @@ import org.apache.beam.sdk.util.PerKeyCombineFnRunners; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.common.Counter; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -736,10 +735,6 @@ public class Combine { return new int[] { value }; } - public Counter<Integer> getCounter(@SuppressWarnings("unused") String name) { - throw new UnsupportedOperationException("BinaryCombineIntegerFn does not support getCounter"); - } - private static final class ToIntegerCodingFunction implements DelegateCoder.CodingFunction<int[], Integer> { @Override @@ -839,10 +834,6 @@ public class Combine { return new long[] { value }; } - public Counter<Long> getCounter(@SuppressWarnings("unused") String name) { - throw new UnsupportedOperationException("BinaryCombineLongFn does not support getCounter"); - } - private static final class ToLongCodingFunction implements DelegateCoder.CodingFunction<long[], Long> { @Override @@ -944,10 +935,6 @@ public class Combine { return new double[] { value }; } - public Counter<Double> getCounter(@SuppressWarnings("unused") String name) { - throw new UnsupportedOperationException("BinaryCombineDoubleFn does not support getCounter"); - } - private static final class ToDoubleCodingFunction implements DelegateCoder.CodingFunction<double[], Double> { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java index 52617b6..eed13fb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java @@ -19,9 +19,6 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.Counter.AggregationKind; -import org.apache.beam.sdk.util.common.CounterProvider; import java.io.Serializable; import java.util.Comparator; @@ -218,8 +215,7 @@ public class Max { * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an * argument to {@link Combine#globally} or {@link Combine#perKey}. */ - public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn - implements CounterProvider<Integer> { + public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn { @Override public int apply(int left, int right) { return left >= right ? left : right; @@ -229,19 +225,13 @@ public class Max { public int identity() { return Integer.MIN_VALUE; } - - @Override - public Counter<Integer> getCounter(String name) { - return Counter.ints(name, AggregationKind.MAX); - } } /** * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an * argument to {@link Combine#globally} or {@link Combine#perKey}. */ - public static class MaxLongFn extends Combine.BinaryCombineLongFn - implements CounterProvider<Long> { + public static class MaxLongFn extends Combine.BinaryCombineLongFn { @Override public long apply(long left, long right) { return left >= right ? left : right; @@ -251,19 +241,13 @@ public class Max { public long identity() { return Long.MIN_VALUE; } - - @Override - public Counter<Long> getCounter(String name) { - return Counter.longs(name, AggregationKind.MAX); - } } /** * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an * argument to {@link Combine#globally} or {@link Combine#perKey}. */ - public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn - implements CounterProvider<Double> { + public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn { @Override public double apply(double left, double right) { return left >= right ? left : right; @@ -273,10 +257,5 @@ public class Max { public double identity() { return Double.NEGATIVE_INFINITY; } - - @Override - public Counter<Double> getCounter(String name) { - return Counter.doubles(name, AggregationKind.MAX); - } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java index 3159134..9c9d14f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java @@ -19,9 +19,6 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.Counter.AggregationKind; -import org.apache.beam.sdk.util.common.CounterProvider; import java.io.Serializable; import java.util.Comparator; @@ -218,8 +215,7 @@ public class Min { * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an * argument to {@link Combine#globally} or {@link Combine#perKey}. */ - public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn - implements CounterProvider<Integer> { + public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn { @Override public int apply(int left, int right) { @@ -230,20 +226,13 @@ public class Min { public int identity() { return Integer.MAX_VALUE; } - - @Override - public Counter<Integer> getCounter(String name) { - return Counter.ints(name, AggregationKind.MIN); - } } /** * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an * argument to {@link Combine#globally} or {@link Combine#perKey}. */ - public static class MinLongFn extends Combine.BinaryCombineLongFn - implements CounterProvider<Long> { - + public static class MinLongFn extends Combine.BinaryCombineLongFn { @Override public long apply(long left, long right) { return left <= right ? left : right; @@ -253,19 +242,13 @@ public class Min { public long identity() { return Long.MAX_VALUE; } - - @Override - public Counter<Long> getCounter(String name) { - return Counter.longs(name, AggregationKind.MIN); - } } /** * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an * argument to {@link Combine#globally} or {@link Combine#perKey}. */ - public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn - implements CounterProvider<Double> { + public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn { @Override public double apply(double left, double right) { @@ -276,10 +259,5 @@ public class Min { public double identity() { return Double.POSITIVE_INFINITY; } - - @Override - public Counter<Double> getCounter(String name) { - return Counter.doubles(name, AggregationKind.MIN); - } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java index 07f78b5..27c5ced 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java @@ -17,10 +17,6 @@ */ package org.apache.beam.sdk.transforms; -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.Counter.AggregationKind; -import org.apache.beam.sdk.util.common.CounterProvider; - /** * {@code PTransform}s for computing the sum of the elements in a * {@code PCollection}, or the sum of the values associated with @@ -123,8 +119,7 @@ public class Sum { * {@code Iterable} of {@code Integer}s, useful as an argument to * {@link Combine#globally} or {@link Combine#perKey}. */ - public static class SumIntegerFn - extends Combine.BinaryCombineIntegerFn implements CounterProvider<Integer> { + public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn { @Override public int apply(int a, int b) { return a + b; @@ -134,11 +129,6 @@ public class Sum { public int identity() { return 0; } - - @Override - public Counter<Integer> getCounter(String name) { - return Counter.ints(name, AggregationKind.SUM); - } } /** @@ -147,7 +137,7 @@ public class Sum { * {@link Combine#globally} or {@link Combine#perKey}. */ public static class SumLongFn - extends Combine.BinaryCombineLongFn implements CounterProvider<Long> { + extends Combine.BinaryCombineLongFn { @Override public long apply(long a, long b) { return a + b; @@ -157,11 +147,6 @@ public class Sum { public long identity() { return 0; } - - @Override - public Counter<Long> getCounter(String name) { - return Counter.longs(name, AggregationKind.SUM); - } } /** @@ -169,8 +154,7 @@ public class Sum { * {@code Iterable} of {@code Double}s, useful as an argument to * {@link Combine#globally} or {@link Combine#perKey}. */ - public static class SumDoubleFn - extends Combine.BinaryCombineDoubleFn implements CounterProvider<Double> { + public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn { @Override public double apply(double a, double b) { return a + b; @@ -180,10 +164,5 @@ public class Sum { public double identity() { return 0; } - - @Override - public Counter<Double> getCounter(String name) { - return Counter.doubles(name, AggregationKind.SUM); - } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java deleted file mode 100644 index 5bde8ef..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Max; -import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.CounterProvider; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator; -import com.google.common.annotations.VisibleForTesting; - -/** - * An implementation of the {@code Aggregator} interface that uses a - * {@link Counter} as the underlying representation. Supports {@link CombineFn}s - * from the {@link Sum}, {@link Min} and {@link Max} classes. - * - * @param <InputT> the type of input values - * @param <AccumT> the type of accumulator values - * @param <OutputT> the type of output value - */ -public class CounterAggregator<InputT, AccumT, OutputT> - implements Aggregator<InputT, OutputT> { - - private static class CounterAggregatorFactory implements AggregatorFactory { - private final AddCounterMutator addCounterMutator; - - private CounterAggregatorFactory(CounterSet.AddCounterMutator addCounterMutator) { - this.addCounterMutator = addCounterMutator; - } - - @Override - public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, ExecutionContext.StepContext stepContext, - String userName, CombineFn<InputT, AccumT, OutputT> combine) { - boolean isSystem = fnClass.isAnnotationPresent(SystemDoFnInternal.class); - String mangledName = (isSystem ? "" : "user-") + stepContext.getStepName() + "-" + userName; - - return new CounterAggregator<>(mangledName, combine, addCounterMutator); - } - } - - private final Counter<InputT> counter; - private final CombineFn<InputT, AccumT, OutputT> combiner; - - /** - * Create a factory for producing {@link CounterAggregator CounterAggregators} backed by the given - * {@link CounterSet.AddCounterMutator}. - */ - public static AggregatorFactory factoryFor( - CounterSet.AddCounterMutator addCounterMutator) { - return new CounterAggregatorFactory(addCounterMutator); - } - - /** - * Constructs a new aggregator with the given name and aggregation logic - * specified in the CombineFn argument. The underlying counter is - * automatically added into the provided CounterSet. - * - * <p>If a counter with the same name already exists, it will be reused, as - * long as it has the same type. - */ - @VisibleForTesting CounterAggregator( - String name, CombineFn<? super InputT, AccumT, OutputT> combiner, - CounterSet.AddCounterMutator addCounterMutator) { - // Safe contravariant cast - this(constructCounter(name, combiner), addCounterMutator, - (CombineFn<InputT, AccumT, OutputT>) combiner); - } - - private CounterAggregator(Counter<InputT> counter, - CounterSet.AddCounterMutator addCounterMutator, - CombineFn<InputT, AccumT, OutputT> combiner) { - try { - this.counter = addCounterMutator.addCounter(counter); - } catch (IllegalArgumentException ex) { - throw new IllegalArgumentException( - "aggregator's name collides with an existing aggregator " - + "or system-provided counter of an incompatible type"); - } - this.combiner = combiner; - } - - private static <T> Counter<T> constructCounter(String name, - CombineFn<? super T, ?, ?> combiner) { - if (combiner instanceof CounterProvider) { - @SuppressWarnings("unchecked") - CounterProvider<T> counterProvider = (CounterProvider<T>) combiner; - return counterProvider.getCounter(name); - } else { - throw new IllegalArgumentException("unsupported combiner in Aggregator: " - + combiner.getClass().getName()); - } - } - - @Override - public void addValue(InputT value) { - counter.addValue(value); - } - - @Override - public String getName() { - return counter.getFlatName(); - } - - @Override - public CombineFn<InputT, ?, OutputT> getCombineFn() { - return combiner; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java deleted file mode 100644 index 550c648..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java +++ /dev/null @@ -1,1287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.common; - -import static org.apache.beam.sdk.util.common.Counter.AggregationKind.AND; -import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MEAN; -import static org.apache.beam.sdk.util.common.Counter.AggregationKind.OR; - -import static com.google.common.base.Preconditions.checkArgument; - -import org.apache.beam.sdk.values.TypeDescriptor; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.AtomicDouble; - -import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import javax.annotation.Nullable; - -/** - * A Counter enables the aggregation of a stream of values over time. The - * cumulative aggregate value is updated as new values are added, or it can be - * reset to a new value. Multiple kinds of aggregation are supported depending - * on the type of the counter. - * - * <p>Counters compare using value equality of their name, kind, and - * cumulative value. Equal counters should have equal toString()s. - * - * <p>After all possible mutations have completed, the reader should check - * {@link #isDirty} for every counter, otherwise updates may be lost. - * - * <p>A counter may become dirty without a corresponding update to the value. - * This generally will occur when the calls to {@code addValue()}, {@code committing()}, - * and {@code committed()} are interleaved such that the value is updated - * between the calls to committing and the read of the value. - * - * @param <T> the type of values aggregated by this counter - */ -public abstract class Counter<T> { - /** - * Possible kinds of counter aggregation. - */ - public static enum AggregationKind { - - /** - * Computes the sum of all added values. - * Applicable to {@link Integer}, {@link Long}, and {@link Double} values. - */ - SUM, - - /** - * Computes the maximum value of all added values. - * Applicable to {@link Integer}, {@link Long}, and {@link Double} values. - */ - MAX, - - /** - * Computes the minimum value of all added values. - * Applicable to {@link Integer}, {@link Long}, and {@link Double} values. - */ - MIN, - - /** - * Computes the arithmetic mean of all added values. Applicable to - * {@link Integer}, {@link Long}, and {@link Double} values. - */ - MEAN, - - /** - * Computes boolean AND over all added values. - * Applicable only to {@link Boolean} values. - */ - AND, - - /** - * Computes boolean OR over all added values. Applicable only to - * {@link Boolean} values. - */ - OR - // TODO: consider adding VECTOR_SUM, HISTOGRAM, KV_SET, PRODUCT, TOP. - } - - /** - * Constructs a new {@link Counter} that aggregates {@link Integer}, values - * according to the desired aggregation kind. The supported aggregation kinds - * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN}, - * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}. - * This is a convenience wrapper over a - * {@link Counter} implementation that aggregates {@link Long} values. This is - * useful when the application handles (boxed) {@link Integer} values that - * are not readily convertible to the (boxed) {@link Long} values otherwise - * expected by the {@link Counter} implementation aggregating {@link Long} - * values. - * - * @param name the name of the new counter - * @param kind the new counter's aggregation kind - * @return the newly constructed Counter - * @throws IllegalArgumentException if the aggregation kind is not supported - */ - public static Counter<Integer> ints(CounterName name, AggregationKind kind) { - return new IntegerCounter(name, kind); - } - - /** - * Constructs a new {@code Counter<Integer>} with an unstructured name. - * - * @deprecated use {@link #ints(CounterName, AggregationKind)}. - */ - @Deprecated - public static Counter<Integer> ints(String name, AggregationKind kind) { - return new IntegerCounter(CounterName.named(name), kind); - } - - /** - * Constructs a new {@link Counter} that aggregates {@link Long} values - * according to the desired aggregation kind. The supported aggregation kinds - * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN}, - * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}. - * - * @param name the name of the new counter - * @param kind the new counter's aggregation kind - * @return the newly constructed Counter - * @throws IllegalArgumentException if the aggregation kind is not supported - */ - public static Counter<Long> longs(CounterName name, AggregationKind kind) { - return new LongCounter(name, kind); - } - - /** - * Constructs a new {@code Counter<Long>} with an unstructured name. - * - * @deprecated use {@link #longs(CounterName, AggregationKind)}. - */ - @Deprecated - public static Counter<Long> longs(String name, AggregationKind kind) { - return new LongCounter(CounterName.named(name), kind); - } - - /** - * Constructs a new {@link Counter} that aggregates {@link Double} values - * according to the desired aggregation kind. The supported aggregation kinds - * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN}, - * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}. - * - * @param name the name of the new counter - * @param kind the new counter's aggregation kind - * @return the newly constructed Counter - * @throws IllegalArgumentException if the aggregation kind is not supported - */ - public static Counter<Double> doubles(CounterName name, AggregationKind kind) { - return new DoubleCounter(name, kind); - } - - /** - * Constructs a new {@code Counter<Double>} with an unstructured name. - * - * @deprecated use {@link #doubles(CounterName, AggregationKind)}. - */ - @Deprecated - public static Counter<Double> doubles(String name, AggregationKind kind) { - return new DoubleCounter(CounterName.named(name), kind); - } - - /** - * Constructs a new {@link Counter} that aggregates {@link Boolean} values - * according to the desired aggregation kind. The only supported aggregation - * kinds are {@link AggregationKind#AND} and {@link AggregationKind#OR}. - * - * @param name the name of the new counter - * @param kind the new counter's aggregation kind - * @return the newly constructed Counter - * @throws IllegalArgumentException if the aggregation kind is not supported - */ - public static Counter<Boolean> booleans(CounterName name, AggregationKind kind) { - return new BooleanCounter(name, kind); - } - - /** - * Constructs a new {@code Counter<Boolean>} with an unstructured name. - * - * @deprecated use {@link #booleans(CounterName, AggregationKind)}. - */ - @Deprecated - public static Counter<Boolean> booleans(String name, AggregationKind kind) { - return new BooleanCounter(CounterName.named(name), kind); - } - - ////////////////////////////////////////////////////////////////////////////// - - /** - * Adds a new value to the aggregation stream. Returns this (to allow method - * chaining). - */ - public abstract Counter<T> addValue(T value); - - /** - * Resets the aggregation stream to this new value. This aggregator must not - * be a MEAN aggregator. Returns this (to allow method chaining). - */ - public abstract Counter<T> resetToValue(T value); - - /** - * Resets the aggregation stream to this new value. Returns this (to allow - * method chaining). The value of elementCount must be non-negative, and this - * aggregator must be a MEAN aggregator. - */ - public abstract Counter<T> resetMeanToValue(long elementCount, T value); - - /** - * Resets the counter's delta value to have no values accumulated and returns - * the value of the delta prior to the reset. - * - * @return the aggregate delta at the time this method is called - */ - public abstract T getAndResetDelta(); - - /** - * Resets the counter's delta value to have no values accumulated and returns - * the value of the delta prior to the reset, for a MEAN counter. - * - * @return the mean delta t the time this method is called - */ - public abstract CounterMean<T> getAndResetMeanDelta(); - - /** - * Returns the counter's flat name. - */ - public String getFlatName() { - return name.getFlatName(); - } - - /** - * Returns the counter's name. - * - * @deprecated use {@link #getFlatName}. - */ - @Deprecated - public String getName() { - return name.getFlatName(); - } - - /** - * Returns the counter's aggregation kind. - */ - public AggregationKind getKind() { - return kind; - } - - /** - * Returns the counter's type. - */ - public Class<?> getType() { - return new TypeDescriptor<T>(getClass()) {}.getRawType(); - } - - /** - * Returns the aggregated value, or the sum for MEAN aggregation, either - * total or, if delta, since the last update extraction or resetDelta. - */ - public abstract T getAggregate(); - - /** - * The mean value of a {@code Counter}, represented as an aggregate value and - * a count. - * - * @param <T> the type of the aggregate - */ - public static interface CounterMean<T> { - /** - * Gets the aggregate value of this {@code CounterMean}. - */ - T getAggregate(); - - /** - * Gets the count of this {@code CounterMean}. - */ - long getCount(); - } - - /** - * Returns the mean in the form of a CounterMean, or null if this is not a - * MEAN counter. - */ - @Nullable - public abstract CounterMean<T> getMean(); - - /** - * Represents whether counters' values have been committed to the backend. - * - * <p>Runners can use this information to optimize counters updates. - * For example, if counters are committed, runners may choose to skip the updates. - * - * <p>Counters' state transition table: - * {@code - * Action\Current State COMMITTED DIRTY COMMITTING - * addValue() DIRTY DIRTY DIRTY - * committing() None COMMITTING None - * committed() None None COMMITTED - * } - */ - @VisibleForTesting - enum CommitState { - /** - * There are no local updates that haven't been committed to the backend. - */ - COMMITTED, - /** - * There are local updates that haven't been committed to the backend. - */ - DIRTY, - /** - * Local updates are committing to the backend, but are still pending. - */ - COMMITTING, - } - - /** - * Returns if the counter contains non-committed aggregate. - */ - public boolean isDirty() { - return commitState.get() != CommitState.COMMITTED; - } - - /** - * Changes the counter from {@code CommitState.DIRTY} to {@code CommitState.COMMITTING}. - * - * @return true if successful. False return indicates that the commit state - * was not in {@code CommitState.DIRTY}. - */ - public boolean committing() { - return commitState.compareAndSet(CommitState.DIRTY, CommitState.COMMITTING); - } - - /** - * Changes the counter from {@code CommitState.COMMITTING} to {@code CommitState.COMMITTED}. - * - * @return true if successful. - * - * <p>False return indicates that the counter was updated while the committing is pending. - * That counter update might or might not has been committed. The {@code commitState} has to - * stay in {@code CommitState.DIRTY}. - */ - public boolean committed() { - return commitState.compareAndSet(CommitState.COMMITTING, CommitState.COMMITTED); - } - - /** - * Sets the counter to {@code CommitState.DIRTY}. - * - * <p>Must be called at the end of {@link #addValue}, {@link #resetToValue}, - * {@link #resetMeanToValue}, and {@link #merge}. - */ - protected void setDirty() { - commitState.set(CommitState.DIRTY); - } - - /** - * Returns a string representation of the Counter. Useful for debugging logs. - * Example return value: "ElementCount:SUM(15)". - */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(getFlatName()); - sb.append(":"); - sb.append(getKind()); - sb.append("("); - switch (kind) { - case SUM: - case MAX: - case MIN: - case AND: - case OR: - sb.append(getAggregate()); - break; - case MEAN: - sb.append(getMean()); - break; - default: - throw illegalArgumentException(); - } - sb.append(")"); - - return sb.toString(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } else if (o instanceof Counter) { - Counter<?> that = (Counter<?>) o; - if (this.name.equals(that.name) && this.kind == that.kind - && this.getClass().equals(that.getClass())) { - if (kind == MEAN) { - CounterMean<T> thisMean = this.getMean(); - CounterMean<?> thatMean = that.getMean(); - return thisMean == thatMean - || (Objects.equals(thisMean.getAggregate(), thatMean.getAggregate()) - && thisMean.getCount() == thatMean.getCount()); - } else { - return Objects.equals(this.getAggregate(), that.getAggregate()); - } - } - } - return false; - } - - @Override - public int hashCode() { - if (kind == MEAN) { - CounterMean<T> mean = getMean(); - return Objects.hash(getClass(), name, kind, mean.getAggregate(), mean.getCount()); - } else { - return Objects.hash(getClass(), name, kind, getAggregate()); - } - } - - /** - * Returns whether this Counter is compatible with that Counter. If - * so, they can be merged into a single Counter. - */ - public boolean isCompatibleWith(Counter<?> that) { - return this.name.equals(that.name) - && this.kind == that.kind - && this.getClass().equals(that.getClass()); - } - - /** - * Merges this counter with the provided counter, returning this counter with the combined value - * of both counters. This may reset the delta of this counter. - * - * @throws IllegalArgumentException if the provided Counter is not compatible with this Counter - */ - public abstract Counter<T> merge(Counter<T> that); - - ////////////////////////////////////////////////////////////////////////////// - - /** The name and metadata of this counter. */ - protected final CounterName name; - - /** The kind of aggregation function to apply to this counter. */ - protected final AggregationKind kind; - - /** The commit state of this counter. */ - protected final AtomicReference<CommitState> commitState; - - protected Counter(CounterName name, AggregationKind kind) { - this.name = name; - this.kind = kind; - this.commitState = new AtomicReference<>(CommitState.COMMITTED); - } - - ////////////////////////////////////////////////////////////////////////////// - - /** - * Implements a {@link Counter} for {@link Long} values. - */ - private static class LongCounter extends Counter<Long> { - private final AtomicLong aggregate; - private final AtomicLong deltaAggregate; - private final AtomicReference<LongCounterMean> mean; - private final AtomicReference<LongCounterMean> deltaMean; - - /** Initializes a new {@link Counter} for {@link Long} values. */ - private LongCounter(CounterName name, AggregationKind kind) { - super(name, kind); - switch (kind) { - case MEAN: - mean = new AtomicReference<>(); - deltaMean = new AtomicReference<>(); - getAndResetMeanDelta(); - mean.set(deltaMean.get()); - aggregate = deltaAggregate = null; - break; - case SUM: - case MAX: - case MIN: - aggregate = new AtomicLong(); - deltaAggregate = new AtomicLong(); - getAndResetDelta(); - aggregate.set(deltaAggregate.get()); - mean = deltaMean = null; - break; - default: - throw illegalArgumentException(); - } - } - - @Override - public LongCounter addValue(Long value) { - try { - switch (kind) { - case SUM: - aggregate.addAndGet(value); - deltaAggregate.addAndGet(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); - } - return this; - } finally { - setDirty(); - } - } - - private void minAndSet(Long value, AtomicLong target) { - long current; - long update; - do { - current = target.get(); - update = Math.min(value, current); - } while (update < current && !target.compareAndSet(current, update)); - } - - private void maxAndSet(Long value, AtomicLong target) { - long current; - long update; - do { - current = target.get(); - update = Math.max(value, current); - } while (update > current && !target.compareAndSet(current, update)); - } - - private void addToMeanAndSet(Long value, AtomicReference<LongCounterMean> target) { - LongCounterMean current; - LongCounterMean update; - do { - current = target.get(); - update = new LongCounterMean(current.getAggregate() + value, current.getCount() + 1L); - } while (!target.compareAndSet(current, update)); - } - - @Override - public Long getAggregate() { - if (kind != MEAN) { - return aggregate.get(); - } else { - return getMean().getAggregate(); - } - } - - @Override - public Long getAndResetDelta() { - switch (kind) { - case SUM: - return deltaAggregate.getAndSet(0L); - case MAX: - return deltaAggregate.getAndSet(Long.MIN_VALUE); - case MIN: - return deltaAggregate.getAndSet(Long.MAX_VALUE); - default: - throw illegalArgumentException(); - } - } - - @Override - public Counter<Long> resetToValue(Long value) { - try { - if (kind == MEAN) { - throw illegalArgumentException(); - } - aggregate.set(value); - deltaAggregate.set(value); - return this; - } finally { - setDirty(); - } - } - - @Override - public Counter<Long> resetMeanToValue(long elementCount, Long value) { - try { - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); - } - LongCounterMean counterMean = new LongCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; - } finally { - setDirty(); - } - } - - @Override - public CounterMean<Long> getAndResetMeanDelta() { - if (kind != MEAN) { - throw illegalArgumentException(); - } - return deltaMean.getAndSet(new LongCounterMean(0L, 0L)); - } - - @Override - @Nullable - public CounterMean<Long> getMean() { - if (kind != MEAN) { - throw illegalArgumentException(); - } - return mean.get(); - } - - @Override - public Counter<Long> merge(Counter<Long> that) { - try { - checkArgument( - this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean<Long> thisCounterMean = this.getMean(); - CounterMean<Long> thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); - } - } finally { - setDirty(); - } - } - - private static class LongCounterMean implements CounterMean<Long> { - private final long aggregate; - private final long count; - - public LongCounterMean(long aggregate, long count) { - this.aggregate = aggregate; - this.count = count; - } - - @Override - public Long getAggregate() { - return aggregate; - } - - @Override - public long getCount() { - return count; - } - - @Override - public String toString() { - return aggregate + "/" + count; - } - } - } - - /** - * Implements a {@link Counter} for {@link Double} values. - */ - private static class DoubleCounter extends Counter<Double> { - AtomicDouble aggregate; - AtomicDouble deltaAggregate; - AtomicReference<DoubleCounterMean> mean; - AtomicReference<DoubleCounterMean> deltaMean; - - /** Initializes a new {@link Counter} for {@link Double} values. */ - private DoubleCounter(CounterName name, AggregationKind kind) { - super(name, kind); - switch (kind) { - case MEAN: - aggregate = deltaAggregate = null; - mean = new AtomicReference<>(); - deltaMean = new AtomicReference<>(); - getAndResetMeanDelta(); - mean.set(deltaMean.get()); - break; - case SUM: - case MAX: - case MIN: - mean = deltaMean = null; - aggregate = new AtomicDouble(); - deltaAggregate = new AtomicDouble(); - getAndResetDelta(); - aggregate.set(deltaAggregate.get()); - break; - default: - throw illegalArgumentException(); - } - } - - @Override - public DoubleCounter addValue(Double value) { - try { - switch (kind) { - case SUM: - aggregate.addAndGet(value); - deltaAggregate.addAndGet(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); - } - return this; - } finally { - setDirty(); - } - } - - private void addToMeanAndSet(Double value, AtomicReference<DoubleCounterMean> target) { - DoubleCounterMean current; - DoubleCounterMean update; - do { - current = target.get(); - update = new DoubleCounterMean(current.getAggregate() + value, current.getCount() + 1); - } while (!target.compareAndSet(current, update)); - } - - private void maxAndSet(Double value, AtomicDouble target) { - double current; - double update; - do { - current = target.get(); - update = Math.max(current, value); - } while (update > current && !target.compareAndSet(current, update)); - } - - private void minAndSet(Double value, AtomicDouble target) { - double current; - double update; - do { - current = target.get(); - update = Math.min(current, value); - } while (update < current && !target.compareAndSet(current, update)); - } - - @Override - public Double getAndResetDelta() { - switch (kind) { - case SUM: - return deltaAggregate.getAndSet(0.0); - case MAX: - return deltaAggregate.getAndSet(Double.NEGATIVE_INFINITY); - case MIN: - return deltaAggregate.getAndSet(Double.POSITIVE_INFINITY); - default: - throw illegalArgumentException(); - } - } - - @Override - public Counter<Double> resetToValue(Double value) { - try { - if (kind == MEAN) { - throw illegalArgumentException(); - } - aggregate.set(value); - deltaAggregate.set(value); - return this; - } finally { - setDirty(); - } - } - - @Override - public Counter<Double> resetMeanToValue(long elementCount, Double value) { - try { - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); - } - DoubleCounterMean counterMean = new DoubleCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; - } finally { - setDirty(); - } - } - - @Override - public CounterMean<Double> getAndResetMeanDelta() { - if (kind != MEAN) { - throw illegalArgumentException(); - } - return deltaMean.getAndSet(new DoubleCounterMean(0.0, 0L)); - } - - @Override - public Double getAggregate() { - if (kind != MEAN) { - return aggregate.get(); - } else { - return getMean().getAggregate(); - } - } - - @Override - @Nullable - public CounterMean<Double> getMean() { - if (kind != MEAN) { - throw illegalArgumentException(); - } - return mean.get(); - } - - @Override - public Counter<Double> merge(Counter<Double> that) { - try { - checkArgument( - this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean<Double> thisCounterMean = this.getMean(); - CounterMean<Double> thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); - } - } finally { - setDirty(); - } - } - - private static class DoubleCounterMean implements CounterMean<Double> { - private final double aggregate; - private final long count; - - public DoubleCounterMean(double aggregate, long count) { - this.aggregate = aggregate; - this.count = count; - } - - @Override - public Double getAggregate() { - return aggregate; - } - - @Override - public long getCount() { - return count; - } - - @Override - public String toString() { - return aggregate + "/" + count; - } - } - } - - /** - * Implements a {@link Counter} for {@link Boolean} values. - */ - private static class BooleanCounter extends Counter<Boolean> { - private final AtomicBoolean aggregate; - private final AtomicBoolean deltaAggregate; - - /** Initializes a new {@link Counter} for {@link Boolean} values. */ - private BooleanCounter(CounterName name, AggregationKind kind) { - super(name, kind); - aggregate = new AtomicBoolean(); - deltaAggregate = new AtomicBoolean(); - getAndResetDelta(); - aggregate.set(deltaAggregate.get()); - } - - @Override - public BooleanCounter addValue(Boolean value) { - try { - if (kind.equals(AND) && !value) { - aggregate.set(value); - deltaAggregate.set(value); - } else if (kind.equals(OR) && value) { - aggregate.set(value); - deltaAggregate.set(value); - } - return this; - } finally { - setDirty(); - } - } - - @Override - public Boolean getAndResetDelta() { - switch (kind) { - case AND: - return deltaAggregate.getAndSet(true); - case OR: - return deltaAggregate.getAndSet(false); - default: - throw illegalArgumentException(); - } - } - - @Override - public Counter<Boolean> resetToValue(Boolean value) { - try { - aggregate.set(value); - deltaAggregate.set(value); - return this; - } finally { - setDirty(); - } - } - - @Override - public Counter<Boolean> resetMeanToValue(long elementCount, Boolean value) { - throw illegalArgumentException(); - } - - @Override - public CounterMean<Boolean> getAndResetMeanDelta() { - throw illegalArgumentException(); - } - - @Override - public Boolean getAggregate() { - return aggregate.get(); - } - - @Override - @Nullable - public CounterMean<Boolean> getMean() { - throw illegalArgumentException(); - } - - @Override - public Counter<Boolean> merge(Counter<Boolean> that) { - try { - checkArgument( - this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - return addValue(that.getAggregate()); - } finally { - setDirty(); - } - } - } - - /** - * Implements a {@link Counter} for {@link String} values. - */ - private static class StringCounter extends Counter<String> { - /** Initializes a new {@link Counter} for {@link String} values. */ - private StringCounter(CounterName name, AggregationKind kind) { - super(name, kind); - // TODO: Support MIN, MAX of Strings. - throw illegalArgumentException(); - } - - @Override - public StringCounter addValue(String value) { - switch (kind) { - default: - throw illegalArgumentException(); - } - } - - @Override - public Counter<String> resetToValue(String value) { - switch (kind) { - default: - throw illegalArgumentException(); - } - } - - @Override - public Counter<String> resetMeanToValue(long elementCount, String value) { - switch (kind) { - default: - throw illegalArgumentException(); - } - } - - @Override - public String getAndResetDelta() { - switch (kind) { - default: - throw illegalArgumentException(); - } - } - - @Override - public CounterMean<String> getAndResetMeanDelta() { - switch (kind) { - default: - throw illegalArgumentException(); - } - } - - @Override - public String getAggregate() { - switch (kind) { - default: - throw illegalArgumentException(); - } - } - - @Override - @Nullable - public CounterMean<String> getMean() { - switch (kind) { - default: - throw illegalArgumentException(); - } - } - - @Override - public Counter<String> merge(Counter<String> that) { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - default: - throw illegalArgumentException(); - } - } - } - - /** - * Implements a {@link Counter} for {@link Integer} values. - */ - private static class IntegerCounter extends Counter<Integer> { - private final AtomicInteger aggregate; - private final AtomicInteger deltaAggregate; - private final AtomicReference<IntegerCounterMean> mean; - private final AtomicReference<IntegerCounterMean> deltaMean; - - /** Initializes a new {@link Counter} for {@link Integer} values. */ - private IntegerCounter(CounterName name, AggregationKind kind) { - super(name, kind); - switch (kind) { - case MEAN: - aggregate = deltaAggregate = null; - mean = new AtomicReference<>(); - deltaMean = new AtomicReference<>(); - getAndResetMeanDelta(); - mean.set(deltaMean.get()); - break; - case SUM: - case MAX: - case MIN: - mean = deltaMean = null; - aggregate = new AtomicInteger(); - deltaAggregate = new AtomicInteger(); - getAndResetDelta(); - aggregate.set(deltaAggregate.get()); - break; - default: - throw illegalArgumentException(); - } - } - - @Override - public IntegerCounter addValue(Integer value) { - try { - switch (kind) { - case SUM: - aggregate.getAndAdd(value); - deltaAggregate.getAndAdd(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); - } - return this; - } finally { - setDirty(); - } - } - - private void addToMeanAndSet(int value, AtomicReference<IntegerCounterMean> target) { - IntegerCounterMean current; - IntegerCounterMean update; - do { - current = target.get(); - update = new IntegerCounterMean(current.getAggregate() + value, current.getCount() + 1); - } while (!target.compareAndSet(current, update)); - } - - private void maxAndSet(int value, AtomicInteger target) { - int current; - int update; - do { - current = target.get(); - update = Math.max(value, current); - } while (update > current && !target.compareAndSet(current, update)); - } - - private void minAndSet(int value, AtomicInteger target) { - int current; - int update; - do { - current = target.get(); - update = Math.min(value, current); - } while (update < current && !target.compareAndSet(current, update)); - } - - @Override - public Integer getAndResetDelta() { - switch (kind) { - case SUM: - return deltaAggregate.getAndSet(0); - case MAX: - return deltaAggregate.getAndSet(Integer.MIN_VALUE); - case MIN: - return deltaAggregate.getAndSet(Integer.MAX_VALUE); - default: - throw illegalArgumentException(); - } - } - - @Override - public Counter<Integer> resetToValue(Integer value) { - try { - if (kind == MEAN) { - throw illegalArgumentException(); - } - aggregate.set(value); - deltaAggregate.set(value); - return this; - } finally { - setDirty(); - } - } - - @Override - public Counter<Integer> resetMeanToValue(long elementCount, Integer value) { - try { - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); - } - IntegerCounterMean counterMean = new IntegerCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; - } finally { - setDirty(); - } - } - - @Override - public CounterMean<Integer> getAndResetMeanDelta() { - if (kind != MEAN) { - throw illegalArgumentException(); - } - return deltaMean.getAndSet(new IntegerCounterMean(0, 0L)); - } - - @Override - public Integer getAggregate() { - if (kind != MEAN) { - return aggregate.get(); - } else { - return getMean().getAggregate(); - } - } - - @Override - @Nullable - public CounterMean<Integer> getMean() { - if (kind != MEAN) { - throw illegalArgumentException(); - } - return mean.get(); - } - - @Override - public Counter<Integer> merge(Counter<Integer> that) { - try { - checkArgument( - this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean<Integer> thisCounterMean = this.getMean(); - CounterMean<Integer> thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); - } - } finally { - setDirty(); - } - } - - private static class IntegerCounterMean implements CounterMean<Integer> { - private final int aggregate; - private final long count; - - public IntegerCounterMean(int aggregate, long count) { - this.aggregate = aggregate; - this.count = count; - } - - @Override - public Integer getAggregate() { - return aggregate; - } - - @Override - public long getCount() { - return count; - } - - @Override - public String toString() { - return aggregate + "/" + count; - } - } - } - - ////////////////////////////////////////////////////////////////////////////// - - /** - * Constructs an {@link IllegalArgumentException} explaining that this - * {@link Counter}'s aggregation kind is not supported by its value type. - */ - protected IllegalArgumentException illegalArgumentException() { - return new IllegalArgumentException("Cannot compute " + kind - + " aggregation over " + getType().getSimpleName() + " values."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java deleted file mode 100644 index b46be98..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.common; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.Strings; - -import java.util.concurrent.atomic.AtomicReference; - -/** - * The name of a counter identifies the user-specified name, as well as the origin, - * the step the counter is associated with, and a prefix to add to the name. - * - * <p>For backwards compatibility, the {@link CounterName} will be converted to - * a flat name (string) during the migration. - */ -public class CounterName { - /** - * Returns a {@link CounterName} with the given name. - */ - public static CounterName named(String name) { - return new CounterName(name, "", "", ""); - } - - /** - * Returns a msecs {@link CounterName}. - */ - public static CounterName msecs(String name) { - return named(name + "-msecs"); - } - - /** - * Returns a {@link CounterName} identical to this, but with the given origin. - */ - public CounterName withOrigin(String origin) { - return new CounterName(this.name, origin, this.stepName, this.prefix); - } - - /** - * Returns a {@link CounterName} identical to this, but with the given step name. - */ - public CounterName withStepName(String stepName) { - return new CounterName(this.name, this.origin, stepName, this.prefix); - } - - /** - * Returns a {@link CounterName} identical to this, but with the given prefix. - */ - public CounterName withPrefix(String prefix) { - return new CounterName(this.name, this.origin, this.stepName, prefix); - } - - /** - * Name of the counter. - * - * <p>For example, process-msecs, ElementCount. - */ - private final String name; - - /** - * Origin (namespace) of counter name. - * - * <p>For example, "user" for user-defined counters. - * It is empty for counters defined by the SDK or the runner. - */ - private final String origin; - - /** - * System defined step name or the named-output of a step. - * - * <p>For example, {@code s1} or {@code s2.out}. - * It may be empty when counters don't associate with step names. - */ - private final String stepName; - - /** - * Prefix of group of counters. - * - * <p>It is empty when counters don't have general prefixes. - */ - private final String prefix; - - /** - * Flat name is the equivalent unstructured name. - * - * <p>It is null before {@link #getFlatName()} is called. - */ - private AtomicReference<String> flatName; - - private CounterName(String name, String origin, String stepName, String prefix) { - this.name = checkNotNull(name, "name"); - this.origin = checkNotNull(origin, "origin"); - this.stepName = checkNotNull(stepName, "stepName"); - this.prefix = checkNotNull(prefix, "prefix"); - this.flatName = new AtomicReference<String>(); - } - - /** - * Returns the flat name of a structured counter. - */ - public String getFlatName() { - String ret = flatName.get(); - if (ret == null) { - StringBuilder sb = new StringBuilder(); - if (!Strings.isNullOrEmpty(prefix)) { - // Not all runner versions use "-" to concatenate prefix, it may already have it in it. - sb.append(prefix); - } - if (!Strings.isNullOrEmpty(origin)) { - sb.append(origin + "-"); - } - if (!Strings.isNullOrEmpty(stepName)) { - sb.append(stepName + "-"); - } - sb.append(name); - flatName.compareAndSet(null, sb.toString()); - ret = flatName.get(); - } - return ret; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } else if (o instanceof CounterName) { - CounterName that = (CounterName) o; - return this.getFlatName().equals(that.getFlatName()); - } - return false; - } - - @Override - public int hashCode() { - return getFlatName().hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java deleted file mode 100644 index c2550cd..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.common; - -/** - * A counter provider can provide {@link Counter} instances. - * - * @param <T> the input type of the counter. - */ -public interface CounterProvider<T> { - Counter<T> getCounter(String name); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java deleted file mode 100644 index cb0ffe5..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.common; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.util.AbstractSet; -import java.util.HashMap; -import java.util.Iterator; - -/** - * A CounterSet maintains a set of {@link Counter}s. - * - * <p>Thread-safe. - */ -public class CounterSet extends AbstractSet<Counter<?>> { - - /** Registered counters. */ - private final HashMap<String, Counter<?>> counters = new HashMap<>(); - - private final AddCounterMutator addCounterMutator = new AddCounterMutator(); - - /** - * Constructs a CounterSet containing the given Counters. - */ - public CounterSet(Counter<?>... counters) { - for (Counter<?> counter : counters) { - addNewCounter(counter); - } - } - - /** - * Returns an object that supports adding additional counters into - * this CounterSet. - */ - public AddCounterMutator getAddCounterMutator() { - return addCounterMutator; - } - - /** - * Adds a new counter, throwing an exception if a counter of the - * same name already exists. - */ - public void addNewCounter(Counter<?> counter) { - if (!addCounter(counter)) { - throw new IllegalArgumentException( - "Counter " + counter + " duplicates an existing counter in " + this); - } - } - - /** - * Adds the given Counter to this CounterSet. - * - * <p>If a counter with the same name already exists, it will be - * reused, as long as it is compatible. - * - * @return the Counter that was reused, or added - * @throws IllegalArgumentException if a counter with the same - * name but an incompatible kind had already been added - */ - public synchronized <T> Counter<T> addOrReuseCounter(Counter<T> counter) { - String flatName = counter.getFlatName(); - Counter<?> oldCounter = counters.get(flatName); - if (oldCounter == null) { - // A new counter. - counters.put(flatName, counter); - return counter; - } - if (counter.isCompatibleWith(oldCounter)) { - // Return the counter to reuse. - @SuppressWarnings("unchecked") - Counter<T> compatibleCounter = (Counter<T>) oldCounter; - return compatibleCounter; - } - throw new IllegalArgumentException( - "Counter " + counter + " duplicates incompatible counter " - + oldCounter + " in " + this); - } - - /** - * Adds a counter. Returns {@code true} if the counter was added to the set - * and false if the given counter was {@code null} or it already existed in - * the set. - * - * @param counter to register - */ - public boolean addCounter(Counter<?> counter) { - return add(counter); - } - - /** - * Returns the Counter with the given name in this CounterSet; - * returns null if no such Counter exists. - */ - public synchronized Counter<?> getExistingCounter(String name) { - return counters.get(name); - } - - @Override - public synchronized Iterator<Counter<?>> iterator() { - return counters.values().iterator(); - } - - @Override - public synchronized int size() { - return counters.size(); - } - - @Override - public synchronized boolean add(Counter<?> e) { - if (null == e) { - return false; - } - if (counters.containsKey(e.getFlatName())) { - return false; - } - counters.put(e.getFlatName(), e); - return true; - } - - public synchronized void merge(CounterSet that) { - for (Counter<?> theirCounter : that) { - Counter<?> myCounter = counters.get(theirCounter.getFlatName()); - if (myCounter != null) { - mergeCounters(myCounter, theirCounter); - } else { - addCounter(theirCounter); - } - } - } - - private <T> void mergeCounters(Counter<T> mine, Counter<?> theirCounter) { - checkArgument( - mine.isCompatibleWith(theirCounter), - "Can't merge CounterSets containing incompatible counters with the same name: " - + "%s (existing) and %s (merged)", - mine, - theirCounter); - @SuppressWarnings("unchecked") - Counter<T> theirs = (Counter<T>) theirCounter; - mine.merge(theirs); - } - - /** - * A nested class that supports adding additional counters into the - * enclosing CounterSet. This is useful as a mutator, hiding other - * public methods of the CounterSet. - */ - public class AddCounterMutator { - /** - * Adds the given Counter into the enclosing CounterSet. - * - * <p>If a counter with the same name already exists, it will be - * reused, as long as it has the same type. - * - * @return the Counter that was reused, or added - * @throws IllegalArgumentException if a counter with the same - * name but an incompatible kind had already been added - */ - public <T> Counter<T> addCounter(Counter<T> counter) { - return addOrReuseCounter(counter); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java index 3e7011b..388355e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java @@ -19,37 +19,21 @@ package org.apache.beam.sdk.util.common; import java.util.Observable; import java.util.Observer; -import javax.annotation.Nullable; /** - * An observer that gets notified when additional bytes are read - * and/or used. It adds all bytes into a local counter. When the - * observer gets advanced via the next() call, it adds the total byte - * count to the specified counter, and prepares for the next element. + * An observer that gets notified when additional bytes are read and/or used. */ -public class ElementByteSizeObserver implements Observer { - @Nullable - private final Counter<Long> counter; +public abstract class ElementByteSizeObserver implements Observer { private boolean isLazy = false; private long totalSize = 0; private double scalingFactor = 1.0; - public ElementByteSizeObserver() { - this.counter = null; - } - - public ElementByteSizeObserver(Counter<Long> counter) { - this.counter = counter; - } + public ElementByteSizeObserver() {} /** * Called to report element byte size. */ - protected void reportElementSize(long elementByteSize) { - if (counter != null) { - counter.addValue(elementByteSize); - } - } + protected abstract void reportElementSize(long elementByteSize); /** * Sets byte counting for the current element as lazy. That is, the http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java deleted file mode 100644 index 3f96cf2..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MAX; -import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MIN; -import static org.apache.beam.sdk.util.common.Counter.AggregationKind.SUM; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; - -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.IterableCombineFn; -import org.apache.beam.sdk.transforms.Max; -import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.CounterProvider; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator; - -import com.google.common.collect.Iterables; - -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Arrays; -import java.util.List; - -/** - * Unit tests for the {@link Aggregator} API. - */ -@RunWith(JUnit4.class) -public class CounterAggregatorTest { - @Rule - public final ExpectedException expectedEx = ExpectedException.none(); - - private static final String AGGREGATOR_NAME = "aggregator_name"; - - @SuppressWarnings("rawtypes") - private <V, AccumT> void testAggregator(List<V> items, - Combine.CombineFn<V, AccumT, V> combiner, - Counter expectedCounter) { - CounterSet counters = new CounterSet(); - Aggregator<V, V> aggregator = new CounterAggregator<>( - AGGREGATOR_NAME, combiner, counters.getAddCounterMutator()); - for (V item : items) { - aggregator.addValue(item); - } - - assertEquals(Iterables.getOnlyElement(counters), expectedCounter); - } - - @Test - public void testGetName() { - String name = "testAgg"; - CounterAggregator<Long, long[], Long> aggregator = new CounterAggregator<>( - name, new Sum.SumLongFn(), - new CounterSet().getAddCounterMutator()); - - assertEquals(name, aggregator.getName()); - } - - @Test - public void testGetCombineFn() { - CombineFn<Long, ?, Long> combineFn = new Min.MinLongFn(); - - CounterAggregator<Long, ?, Long> aggregator = new CounterAggregator<>("foo", - combineFn, new CounterSet().getAddCounterMutator()); - - assertEquals(combineFn, aggregator.getCombineFn()); - } - - @Test - - public void testSumInteger() throws Exception { - testAggregator(Arrays.asList(2, 4, 1, 3), new Sum.SumIntegerFn(), - Counter.ints(AGGREGATOR_NAME, SUM).resetToValue(10)); - } - - @Test - public void testSumLong() throws Exception { - testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Sum.SumLongFn(), - Counter.longs(AGGREGATOR_NAME, SUM).resetToValue(10L)); - } - - @Test - public void testSumDouble() throws Exception { - testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Sum.SumDoubleFn(), - Counter.doubles(AGGREGATOR_NAME, SUM).resetToValue(10.2)); - } - - @Test - public void testMinInteger() throws Exception { - testAggregator(Arrays.asList(2, 4, 1, 3), new Min.MinIntegerFn(), - Counter.ints(AGGREGATOR_NAME, MIN).resetToValue(1)); - } - - @Test - public void testMinLong() throws Exception { - testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Min.MinLongFn(), - Counter.longs(AGGREGATOR_NAME, MIN).resetToValue(1L)); - } - - @Test - public void testMinDouble() throws Exception { - testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Min.MinDoubleFn(), - Counter.doubles(AGGREGATOR_NAME, MIN).resetToValue(1.0)); - } - - @Test - public void testMaxInteger() throws Exception { - testAggregator(Arrays.asList(2, 4, 1, 3), new Max.MaxIntegerFn(), - Counter.ints(AGGREGATOR_NAME, MAX).resetToValue(4)); - } - - @Test - public void testMaxLong() throws Exception { - testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Max.MaxLongFn(), - Counter.longs(AGGREGATOR_NAME, MAX).resetToValue(4L)); - } - - @Test - public void testMaxDouble() throws Exception { - testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Max.MaxDoubleFn(), - Counter.doubles(AGGREGATOR_NAME, MAX).resetToValue(4.1)); - } - - @Test - public void testCounterProviderCallsProvidedCounterAddValue() { - @SuppressWarnings("unchecked") - CombineFn<String, ?, String> combiner = mock(CombineFn.class, - withSettings().extraInterfaces(CounterProvider.class)); - @SuppressWarnings("unchecked") - CounterProvider<String> provider = (CounterProvider<String>) combiner; - - @SuppressWarnings("unchecked") - Counter<String> mockCounter = mock(Counter.class); - String name = "foo"; - when(provider.getCounter(name)).thenReturn(mockCounter); - - AddCounterMutator addCounterMutator = mock(AddCounterMutator.class); - when(addCounterMutator.addCounter(mockCounter)).thenReturn(mockCounter); - - Aggregator<String, String> aggregator = - new CounterAggregator<>(name, combiner, addCounterMutator); - - aggregator.addValue("bar_baz"); - - verify(mockCounter).addValue("bar_baz"); - verify(addCounterMutator).addCounter(mockCounter); - } - - - @Test - public void testCompatibleDuplicateNames() throws Exception { - CounterSet counters = new CounterSet(); - Aggregator<Integer, Integer> aggregator1 = new CounterAggregator<>( - AGGREGATOR_NAME, new Sum.SumIntegerFn(), - counters.getAddCounterMutator()); - - Aggregator<Integer, Integer> aggregator2 = new CounterAggregator<>( - AGGREGATOR_NAME, new Sum.SumIntegerFn(), - counters.getAddCounterMutator()); - - // The duplicate aggregators should update the same counter. - aggregator1.addValue(3); - aggregator2.addValue(4); - Assert.assertEquals( - new CounterSet(Counter.ints(AGGREGATOR_NAME, SUM).resetToValue(7)), - counters); - } - - @Test - public void testIncompatibleDuplicateNames() throws Exception { - CounterSet counters = new CounterSet(); - new CounterAggregator<>( - AGGREGATOR_NAME, new Sum.SumIntegerFn(), - counters.getAddCounterMutator()); - - expectedEx.expect(IllegalArgumentException.class); - expectedEx.expectMessage(Matchers.containsString( - "aggregator's name collides with an existing aggregator or " - + "system-provided counter of an incompatible type")); - new CounterAggregator<>( - AGGREGATOR_NAME, new Sum.SumLongFn(), - counters.getAddCounterMutator()); - } - - @Test - public void testUnsupportedCombineFn() throws Exception { - expectedEx.expect(IllegalArgumentException.class); - expectedEx.expectMessage(Matchers.containsString("unsupported combiner")); - new CounterAggregator<>( - AGGREGATOR_NAME, - new Combine.CombineFn<Integer, List<Integer>, Integer>() { - @Override - public List<Integer> createAccumulator() { - return null; - } - @Override - public List<Integer> addInput(List<Integer> accumulator, Integer input) { - return null; - } - @Override - public List<Integer> mergeAccumulators(Iterable<List<Integer>> accumulators) { - return null; - } - @Override - public Integer extractOutput(List<Integer> accumulator) { - return null; - } - }, (new CounterSet()).getAddCounterMutator()); - } - - @Test - public void testUnsupportedSerializableFunction() throws Exception { - expectedEx.expect(IllegalArgumentException.class); - expectedEx.expectMessage(Matchers.containsString("unsupported combiner")); - CombineFn<Integer, List<Integer>, Integer> combiner = IterableCombineFn - .<Integer>of(new SerializableFunction<Iterable<Integer>, Integer>() { - @Override - public Integer apply(Iterable<Integer> input) { - return null; - } - }); - new CounterAggregator<>(AGGREGATOR_NAME, combiner, - (new CounterSet()).getAddCounterMutator()); - } -}
