Move AggregatorFactory to runners-core and deprecate SDK version
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08dd1498 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08dd1498 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08dd1498 Branch: refs/heads/master Commit: 08dd14981bad95a029be8ac758a6091c55850200 Parents: 139437b Author: Kenneth Knowles <[email protected]> Authored: Fri Oct 21 11:49:07 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sun Oct 23 19:52:51 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/core/AggregatorFactory.java | 39 ++++++++++++++++++++ .../apache/beam/sdk/transforms/Aggregator.java | 11 ++---- 2 files changed, 42 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dd1498/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java new file mode 100644 index 0000000..153d30d --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.ExecutionContext; + +/** + * A factory for creating aggregators. + */ +public interface AggregatorFactory { + /** + * Create an aggregator with the given {@code name} and {@link CombineFn}. + * + * <p>This method is called to create an aggregator for a {@link DoFn}. It receives the + * class of the {@link DoFn} being executed and the context of the step it is being + * executed in. + */ + <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( + Class<?> fnClass, ExecutionContext.StepContext stepContext, + String aggregatorName, CombineFn<InputT, AccumT, OutputT> combine); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dd1498/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java index e8f6247..13bf322 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java @@ -73,8 +73,10 @@ public interface Aggregator<InputT, OutputT> { CombineFn<InputT, ?, OutputT> getCombineFn(); /** - * A factory for creating aggregators. + * @deprecated this is for use only by runners and exists only for a migration period. Please + * use the identical interface in org.apache.beam.runners.core */ + @Deprecated interface AggregatorFactory { /** * Create an aggregator with the given {@code name} and {@link CombineFn}. @@ -87,11 +89,4 @@ public interface Aggregator<InputT, OutputT> { Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName, CombineFn<InputT, AccumT, OutputT> combine); } - - // TODO: Consider the following additional API conveniences: - // - In addition to createAggregator(), consider adding getAggregator() to - // avoid the need to store the aggregator locally in a DoFn, i.e., create - // if not already present. - // - Add a shortcut for the most common aggregator: - // c.createAggregator("name", new Sum.SumIntegerFn()). }
