[ 
https://issues.apache.org/jira/browse/FLINK-1293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14230041#comment-14230041
 ] 

ASF GitHub Bot commented on FLINK-1293:
---------------------------------------

Github user he-sk commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/243#discussion_r21102069
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/AggregationOperator.java
 ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.operators;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.operators.Operator;
    +import org.apache.flink.api.common.operators.UnaryOperatorInformation;
    +import org.apache.flink.api.common.operators.base.MapOperatorBase;
    +import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.aggregation.AggregationFunction;
    +import org.apache.flink.api.java.aggregation.AggregationMapFinalUdf;
    +import org.apache.flink.api.java.aggregation.AggregationMapIntermediateUdf;
    +import org.apache.flink.api.java.aggregation.AggregationReduceUdf;
    +import org.apache.flink.api.java.aggregation.AverageAggregationFunction;
    +import org.apache.flink.api.java.tuple.Tuple;
    +
    +/**
    + * Aggregation operator.
    + * 
    + * <p>
    + * The operator has the following dependencies.
    + * 
    + * <dl>
    + *          <dt>functions
    + *         <dd>Aggregation functions that should be computed (may contain
    + *         composites).
    + *   <dt>intermediateFunctions
    + *      <dd>Aggregation functions that do the actual aggregation (may
    + *      contain keys and intermediates expanded from composites).
    + *   <dt>resultType
    + *         <dd>Type information for the final result tuple.
    + *   <dt>intermediateType
    + *         <dd>Type information for the intermediate tuples.
    + *   <dt>groupKeys
    + *         <dd>Fields on which the input should be grouped.
    + * </dl>
    + * 
    + * <p>A tuple field may be aggregated using multiple functions. It is
    + * therefore necessary to construct an intermediate tuple holding a copy
    + * of the field value for each aggregation function that uses it. Also,
    + * an aggregation function may extend the input tuple with additional
    + * information. For example, {@link AverageAggregationFunction} adds a 
field
    + * to count the tuples. Finally, during the aggregation group keys must be
    + * present but they may not be in the final output.
    + * 
    + * <p>Therefore this operator maps to 3 internal Flink operations:
    + * 
    + * <code>Input -> Map1 -> Reduce -> Map2 -> Output</code>
    + * 
    + * <dl>
    + *         <dt>Map1 
    + *                 <dd>Maps input tuples to intermediate tuples; copies 
    + *                 and/or initializes fields.
    + *  <dt>Reduce
    + *         <dd>Performs the actual aggregation
    + *  <dt>Map2
    + *         <dd>Computes composite aggregations and drops unwanted key 
fields.
    + * </dl>
    + * 
    + * @param <IN>     The input type (must be Tuple).
    + * @param <OUT>    The output type (must extend Tuple).
    + */
    +public class AggregationOperator<IN, OUT extends Tuple> extends 
SingleInputOperator<IN, OUT, AggregationOperator<IN, OUT>> {
    +
    +   private TypeInformation<Tuple> intermediateType;
    +   private int[] groupKeys;
    +   private AggregationFunction<?, ?>[] intermediateFunctions;
    +   private AggregationFunction<?, ?>[] finalFunctions;
    +   
    +   public AggregationOperator(DataSet<IN> input, 
    +                   TypeInformation<OUT> resultType, TypeInformation<Tuple> 
intermediateType, int[] groupKeys, AggregationFunction<?, ?>[] finalFunctions, 
AggregationFunction<?, ?>[] intermediateFunctions) {
    +           super(input, resultType);
    +           this.intermediateType = intermediateType;
    +           this.groupKeys = groupKeys;
    +           this.intermediateFunctions = intermediateFunctions;
    +           this.finalFunctions = finalFunctions;
    +   }
    +
    +   @Override
    +   protected org.apache.flink.api.common.operators.SingleInputOperator<?, 
OUT, ?> translateToDataFlow(
    +                   Operator<IN> input) {
    +           MapOperatorBase<IN, Tuple, MapFunction<IN, Tuple>> 
intermediateMapper = createIntermediateMapper();
    +           ReduceOperatorBase<Tuple, ReduceFunction<Tuple>> reducer = 
createReducer();
    +           MapOperatorBase<Tuple, OUT, MapFunction<Tuple, OUT>> 
finalMapper = createFinalMapper();
    +           intermediateMapper.setInput(input);
    +           reducer.setInput(intermediateMapper);
    +           finalMapper.setInput(reducer);
    +           return finalMapper;
    +   }
    +
    +   private MapOperatorBase<IN, Tuple, MapFunction<IN, Tuple>> 
createIntermediateMapper() {
    +           @SuppressWarnings("unchecked")
    +           MapFunction<IN, Tuple> udf = (MapFunction<IN, Tuple>) new 
AggregationMapIntermediateUdf<Tuple, Tuple>(intermediateFunctions);
    +           UnaryOperatorInformation<IN, Tuple> operatorInfo = new 
UnaryOperatorInformation<IN, Tuple>(getInputType(), intermediateType);
    +           String name = 
createOperatorName("aggregate/intermediate-mapper", intermediateFunctions);
    +           MapOperatorBase<IN, Tuple, MapFunction<IN, Tuple>> 
intermediateMapper = new MapOperatorBase<IN, Tuple, MapFunction<IN, 
Tuple>>(udf, operatorInfo, name);
    +           
intermediateMapper.setDegreeOfParallelism(this.getParallelism());
    --- End diff --
    
    Fixed.


> Add support for out-of-place aggregations
> -----------------------------------------
>
>                 Key: FLINK-1293
>                 URL: https://issues.apache.org/jira/browse/FLINK-1293
>             Project: Flink
>          Issue Type: Improvement
>          Components: Java API, Scala API
>    Affects Versions: 0.7.0-incubating
>            Reporter: Viktor Rosenfeld
>            Assignee: Viktor Rosenfeld
>            Priority: Minor
>
> Currently, the output of an aggregation is of the same type as the input. 
> This restriction has to major drawbacks:
> 1. Every tuple field can only be used in one aggregation because the 
> aggregations result is stored in the field.
> 2. Aggregations having a return type that is different from the input type, 
> e.g., count or average, cannot be implemented.
> It would be nice to have the aggregation return any kind of tuple as a 
> result, so the restrictions above no longer apply.
> See also:
> - 
> https://github.com/stratosphere/stratosphere/wiki/Design-of-Aggregate-Operator
> - 
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-td2311.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to