[
https://issues.apache.org/jira/browse/FLINK-1293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14230020#comment-14230020
]
ASF GitHub Bot commented on FLINK-1293:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/243#discussion_r21100941
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/aggregation/AggregationOperatorFactory.java
---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.aggregation;
+
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static org.apache.flink.api.java.aggregation.Aggregations.key;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Vector;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import
org.apache.flink.api.java.aggregation.AggregationFunction.ResultTypeBehavior;
+import org.apache.flink.api.java.operators.AggregationOperator;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+
+import com.google.common.primitives.Ints;
+
+/**
+ * Factory method container to construct an
+ * {@link AggregationOperator} from a {@link DataSet} or
+ * {@link UnsortedGrouping}.
+ *
+ * <p>The factory performs the following tasks:
+ *
+ * <ol>
+ * <li>Decompose composite aggregation functions into intermediates.
+ * <li>Insert missing key aggregation function for any group keys.
+ * <li>Set intermediate, and output position for each aggregation
function.
+ * <li>Map any group keys to their position in the intermediate tuple.
+ * <li>Compute the types of intermediate tuple and aggregation result.
+ * <li>Create the aggregation operator.
+ * </ol>
+ *
+ * <p>Note: Tasks are implemented in a member class in order to
+ * facilitate testing.
+ */
+public class AggregationOperatorFactory {
+
+ private static final AggregationOperatorFactory INSTANCE = new
AggregationOperatorFactory();
+ private AggregationFunctionPreprocessor aggregationFunctionPreprocessor
= new AggregationFunctionPreprocessor();
+ private ResultTypeFactory resultTypeFactory = new ResultTypeFactory();
+
+
+ /**
+ * Construct an {@link AggregationOperator} that implements the
+ * aggregation functions listed in {@code functions} on the
+ * (ungrouped) DataSet {@code input}.
+ * @param input An (ungrouped) DataSet.
+ * @param functions The aggregation functions that should be computed.
+ * @return An AggregationOperator representing the specified
aggregations.
+ */
+ public <T, R extends Tuple> AggregationOperator<T, R>
aggregate(DataSet<T> input, AggregationFunction<?, ?>[] functions) {
+ AggregationOperator<T, R> op = createAggregationOperator(input,
new int[0], functions);
+ return op;
+ }
+
+ /**
+ * Construct an {@link AggregationOperator} that implements the
+ * aggregation functions listed in {@code functions} on the grouped
+ * DataSet {@code input}.
+ *
+ * <p>If there are no {@link Aggregations.keys} specified in
+ * {@code functions} then a {@code key()} aggregation function is
+ * inserted for each group key.
+ *
+ * @param input An grouped DataSet.
+ * @param functions The aggregation functions that should be computed.
+ * @return An AggregationOperator representing the specified
aggregations.
+ */
+ public <T, R extends Tuple> AggregationOperator<T, R>
aggregate(UnsortedGrouping<T> grouping, AggregationFunction<?, ?>[] functions) {
+ DataSet<T> input = grouping.getDataSet();
+ int[] groupKeys =
grouping.getKeys().computeLogicalKeyPositions();
+ AggregationOperator<T, R> op = createAggregationOperator(input,
groupKeys, functions);
+ return op;
+ }
+
+ // TODO if sum and/or count are present, use these to compute average
+ <T, R extends Tuple> AggregationOperator<T, R>
createAggregationOperator(DataSet<T> input, int[] groupKeys,
AggregationFunction<?, ?>[] functions) {
+ AggregationFunction<?, ?>[] functionsWithExpandedKeys =
aggregationFunctionPreprocessor.expandKeys(functions, groupKeys);
+ AggregationFunction<?, ?>[] intermediateFunctions =
aggregationFunctionPreprocessor.createIntermediateFunctions(functionsWithExpandedKeys,
groupKeys);
+ int[] intermediateGroupKeys =
aggregationFunctionPreprocessor.createIntermediateGroupKeys(intermediateFunctions);
+ TypeInformation<R> resultType =
resultTypeFactory.createAggregationResultType(input.getType(),
functionsWithExpandedKeys);
+ TypeInformation<Tuple> intermediateType =
resultTypeFactory.createAggregationResultType(input.getType(),
intermediateFunctions);
+ AggregationOperator<T, R> op = new AggregationOperator<T,
R>(input, resultType, intermediateType, intermediateGroupKeys,
functionsWithExpandedKeys, intermediateFunctions);
+ return op;
+ }
+
+ static class AggregationFunctionPreprocessor {
+
+ public AggregationFunction<?, ?>[]
expandKeys(AggregationFunction<?, ?>[] functions, int[] groupKeys) {
+ Vector<AggregationFunction<?, ?>> expanded = new
Vector<AggregationFunction<?,?>>();
+
+ // test where keys should be included and save keys
defined by user
+ int insertionPosition = -1;
+ int currentPosition = 0;
+ List<Integer> definedByUser = new ArrayList<Integer>();
+ for (AggregationFunction<?, ?> function : functions) {
+ if (function instanceof
KeySelectionAggregationFunction) {
+ if (function ==
KeySelectionAggregationFunction.INCLUDE_ALL_KEYS_FUNCTION) {
+ if (insertionPosition == -1) {
+ insertionPosition =
currentPosition;
+ }
+ continue;
+ } else {
+ int field =
function.getInputPosition();
+
Validate.isTrue(ArrayUtils.contains(groupKeys, field),
+ format("The key
%d is not in the grouping %s",
+
field, asList(groupKeys)));
+ definedByUser.add(field);
+ }
+ }
+ expanded.add(function);
+ currentPosition += 1;
+ }
+
+ // insert missing keys if requested
+ AggregationFunction<?, ?>[] result = null;
+ if (insertionPosition != -1) {
+ for (int groupKey : groupKeys) {
+ if ( ! definedByUser.contains(groupKey)
) {
+ AggregationFunction<?, ?> key =
key(groupKey);
+ expanded.insertElementAt(key,
insertionPosition);
+ insertionPosition += 1;
+ }
+ }
+ result = new AggregationFunction<?,
?>[expanded.size()];
+ expanded.toArray(result);
+ } else {
+ result = functions;
+ }
+ return result;
+ }
+
+ public AggregationFunction<?, ?>[]
createIntermediateFunctions(AggregationFunction<?, ?>[] functions, int[]
groupKeys) {
+ List<AggregationFunction<?, ?>> intermediates = new
ArrayList<AggregationFunction<?,?>>();
+ List<CompositeAggregationFunction<?, ?>> composites =
new ArrayList<CompositeAggregationFunction<?, ?>>();
+ int outputPosition = 0;
+ for (AggregationFunction<?, ?> function : functions) {
+
+ // set output position according to the order
specified by the user
+ function.setOutputPosition(outputPosition);
+ outputPosition += 1;
+
+ // check if key() is used without groupBy
+ if (groupKeys.length == 0
+ && function instanceof
KeySelectionAggregationFunction) {
+ throw new IllegalArgumentException("Key
selection aggregation function can only be used on grouped DataSets.");
+ }
+
+ // separate composites
+ if (function instanceof
CompositeAggregationFunction) {
+
composites.add((CompositeAggregationFunction<?, ?>) function);
--- End diff --
Oh yes, sure. Good point!
> Add support for out-of-place aggregations
> -----------------------------------------
>
> Key: FLINK-1293
> URL: https://issues.apache.org/jira/browse/FLINK-1293
> Project: Flink
> Issue Type: Improvement
> Components: Java API, Scala API
> Affects Versions: 0.7.0-incubating
> Reporter: Viktor Rosenfeld
> Assignee: Viktor Rosenfeld
> Priority: Minor
>
> Currently, the output of an aggregation is of the same type as the input.
> This restriction has to major drawbacks:
> 1. Every tuple field can only be used in one aggregation because the
> aggregations result is stored in the field.
> 2. Aggregations having a return type that is different from the input type,
> e.g., count or average, cannot be implemented.
> It would be nice to have the aggregation return any kind of tuple as a
> result, so the restrictions above no longer apply.
> See also:
> -
> https://github.com/stratosphere/stratosphere/wiki/Design-of-Aggregate-Operator
> -
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-td2311.html
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)