[
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833830#comment-15833830
]
ASF GitHub Bot commented on FLINK-5582:
---------------------------------------
Github user shaoxuan-wang commented on a diff in the pull request:
https://github.com/apache/flink/pull/3186#discussion_r97246135
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ *
+ * <p>Aggregation functions must be {@link Serializable} because they are
sent around
+ * between distributed processes during distributed execution.
+ *
+ * <p>An example how to use this interface is below:
+ *
+ * <pre>{@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ *
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction<Integer,
AverageAccumulator, Double> {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a,
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ *
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for
'average'
+ * public class WeightedAverage implements AggregateFunction<Datum,
AverageAccumulator, Double> {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a,
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }</pre>
+ */
+public interface AggregateFunction<IN, ACC, OUT> extends Function,
Serializable {
+
+ ACC createAccumulator();
+
+ void add(IN value, ACC accumulator);
--- End diff --
TableAPI UDAGG will be eventually translated to this windowStream API. The
accumulate and retract will be handled in this add function. I think it is OK
if we "view retractions as adding negative values".
> Add a general distributive aggregate function
> ---------------------------------------------
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be
> used on windows and in state, both of which have limitations:
> - {{ReduceFunction}} only supports one type as the type that is added and
> aggregated/returned.
> - {{FoldFunction}} Supports different types to add and return, but is not
> distributive, i.e. it cannot be used for hierarchical aggregation, for
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
> - Different types to add, accumulate, and return
> - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many
> APIs, like that of various databases, and also in Apache Beam:
> - The accumulator is the state of the running aggregate
> - Accumulators can be merged
> - Values are added to the accumulator
> - Getting the result from the accumulator perform an optional finalizing
> operation
> {code}
> public interface AggregateFunction<IN, ACC, OUT> extends Function {
> ACC createAccumulator();
> void add(IN value, ACC accumulator);
> OUT getResult(ACC accumulator);
> ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
> long count;
> long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction<Integer,
> AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Integer value, AverageAccumulator acc) {
> acc.sum += value;
> acc.count++;
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for
> 'average'
> public class WeightedAverage implements AggregateFunction<Datum,
> AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Datum value, AverageAccumulator acc) {
> acc.count += value.getWeight();
> acc.sum += value.getValue();
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)