Removal of Aggregator class. Also removal from comments.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/878981fd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/878981fd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/878981fd Branch: refs/heads/DSL_SQL Commit: 878981fd527098ccd083f999dd3d84c4934306b2 Parents: f3f8810 Author: Pablo <[email protected]> Authored: Tue May 2 18:06:56 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed May 3 20:51:43 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/examples/WindowedWordCount.java | 2 +- .../operators/ApexParDoOperator.java | 1 - .../apache/beam/runners/core/DoFnRunners.java | 21 ------------ .../functions/FlinkDoFnFunction.java | 1 - .../functions/FlinkStatefulDoFnFunction.java | 1 - .../wrappers/streaming/DoFnOperator.java | 1 - .../apache/beam/sdk/transforms/Aggregator.java | 34 -------------------- .../org/apache/beam/sdk/transforms/Latest.java | 15 +-------- .../beam/sdk/util/SystemDoFnInternal.java | 3 -- .../sdk/transforms/ApproximateUniqueTest.java | 2 +- .../beam/sdk/transforms/DoFnTesterTest.java | 2 +- .../beam/sdk/transforms/LatestFnTest.java | 2 +- 12 files changed, 5 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index 5c64c53..45746af 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -51,7 +51,7 @@ import org.joda.time.Instant; * * <p>Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples: * Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally - * and using a selected runner; defining DoFns; creating a custom aggregator; + * and using a selected runner; defining DoFns; * user-defined PTransforms; defining PipelineOptions. * * <p>New Concepts: http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index d5dd0dd..f7242e7 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -345,7 +345,6 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements mainOutputTag, additionalOutputTags, stepContext, - null, windowingStrategy ); http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index fe33af7..c090001 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -48,27 +48,6 @@ public class DoFnRunners { <T> void output(TupleTag<T> tag, WindowedValue<T> output); } - @Deprecated - public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner( - PipelineOptions options, - DoFn<InputT, OutputT> fn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - StepContext stepContext, - Object aggregatorFactory, - WindowingStrategy<?, ?> windowingStrategy) { - return simpleRunner(options, - fn, - sideInputReader, - outputManager, - mainOutputTag, - additionalOutputTags, - stepContext, - windowingStrategy); - } - /** * Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}. * http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index d28e7c4..28e1a44 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -105,7 +105,6 @@ public class FlinkDoFnFunction<InputT, OutputT> // see SimpleDoFnRunner, just use it to limit number of additional outputs Collections.<TupleTag<?>>emptyList(), new FlinkNoOpStepContext(), - null, windowingStrategy); if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index a79f856..9f000e0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -131,7 +131,6 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> return timerInternals; } }, - null, windowingStrategy); if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 243342d..c624036 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -260,7 +260,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> mainOutputTag, additionalOutputTags, stepContext, - null, windowingStrategy); if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) { http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java deleted file mode 100644 index 6c21b8c..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import org.apache.beam.sdk.transforms.Combine.CombineFn; - -/** - * An {@code Aggregator<InputT>} enables monitoring of values of type {@code InputT}, - * to be combined across all bundles. - * - * @param <InputT> the type of input values - * @param <OutputT> the type of output values - */ -@Deprecated -public interface Aggregator<InputT, OutputT> { - void addValue(InputT value); - String getName(); - CombineFn<InputT, ?, OutputT> getCombineFn(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java index d3ebbb7..f7028ec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java @@ -36,7 +36,7 @@ import org.apache.beam.sdk.values.TimestampedValue; * {@link PTransform} and {@link Combine.CombineFn} for computing the latest element * in a {@link PCollection}. * - * <p>Example 1: compute the latest value for each session: + * <p>Example: compute the latest value for each session: * <pre>{@code * PCollection<Long> input = ...; * PCollection<Long> sessioned = input @@ -44,19 +44,6 @@ import org.apache.beam.sdk.values.TimestampedValue; * PCollection<Long> latestValues = sessioned.apply(Latest.<Long>globally()); * }</pre> * - * <p>Example 2: track a latest computed value in an aggregator: - * <pre>{@code - * class MyDoFn extends DoFn<String, String> { - * - * {@literal @}ProcessElement - * public void processElement(ProcessContext c) { - * double val = // .. - * latestValue.addValue(TimestampedValue.of(val, c.timestamp())); - * // .. - * } - * } - * }</pre> - * * <p>{@link #combineFn} can also be used manually, in combination with state and with the * {@link Combine} transform. * http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java index 004496b..368cb9a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java @@ -26,9 +26,6 @@ import java.lang.annotation.Target; /** * Annotation to mark {@code DoFns} as an internal component of the Beam SDK. * - * <p>Currently, the only effect of this is to mark any aggregators reported by an annotated - * {@code DoFn} as a system counter (as opposed to a user counter). - * * <p>This is internal to the Beam SDK. */ @Documented http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java index 51880e1..e1c5f08 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java @@ -49,7 +49,7 @@ import org.junit.runners.Parameterized; import org.junit.runners.Suite; /** - * Tests for the ApproximateUnique aggregator transform. + * Tests for the ApproximateUnique transform. */ @RunWith(Suite.class) @Suite.SuiteClasses({ http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index 5c5718c..d609d0e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -374,7 +374,7 @@ public class DoFnTesterTest { } /** - * A {@link DoFn} that adds values to an aggregator and converts input to String in + * A {@link DoFn} that adds values to a user metric and converts input to String in * {@link DoFn.ProcessElement @ProcessElement}. */ private static class CounterDoFn extends DoFn<Long, String> { http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java index b782b6e..f1f2e44 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java @@ -166,7 +166,7 @@ public class LatestFnTest { } @Test - public void testExtractOutputDefaultAggregator() { + public void testExtractOutputDefaultAccumulator() { TimestampedValue<Long> accum = fn.createAccumulator(); assertThat(fn.extractOutput(accum), nullValue()); }
