Repository: flink
Updated Branches:
  refs/heads/master 893fabf69 -> 8c042e378


[FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_SAMP, STDEV_POP functions on 
GROUP BY windows.

This closes #5706.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c042e37
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c042e37
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c042e37

Branch: refs/heads/master
Commit: 8c042e378b65504c7d76302d508f1e33b2cfa524
Parents: 893fabf
Author: Fabian Hueske <[email protected]>
Authored: Thu Mar 15 21:04:00 2018 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Thu Mar 22 11:11:26 2018 +0100

----------------------------------------------------------------------
 .../rel/rules/AggregateReduceFunctionsRule.java | 602 +++++++++++++++++++
 .../nodes/logical/FlinkLogicalAggregate.scala   |   9 +-
 .../logical/FlinkLogicalWindowAggregate.scala   |  17 +
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   1 +
 .../WindowAggregateReduceFunctionsRule.scala    |  75 +++
 .../table/runtime/aggregate/AggregateUtil.scala |   4 +-
 .../table/api/batch/sql/GroupWindowTest.scala   |  49 ++
 .../table/api/batch/table/GroupWindowTest.scala |  45 ++
 .../table/api/stream/sql/GroupWindowTest.scala  |  46 ++
 .../api/stream/table/GroupWindowTest.scala      |  45 ++
 10 files changed, 888 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
new file mode 100644
index 0000000..ce466e1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.CompositeList;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT TO MAKE IT MORE 
EXTENSIBLE.
+ *
+ * We have opened an issue to port this change to Apache Calcite 
(CALCITE-2216).
+ * Once CALCITE-2216 is fixed and included in a release, we can remove the 
copied class.
+ *
+ * Modification:
+ * - Added newCalcRel() method to be able to add fields to the projection.
+ */
+
+/**
+ * Planner rule that reduces aggregate functions in
+ * {@link org.apache.calcite.rel.core.Aggregate}s to simpler forms.
+ *
+ * <p>Rewrites:
+ * <ul>
+ *
+ * <li>AVG(x) &rarr; SUM(x) / COUNT(x)
+ *
+ * <li>STDDEV_POP(x) &rarr; SQRT(
+ *     (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ *    / COUNT(x))
+ *
+ * <li>STDDEV_SAMP(x) &rarr; SQRT(
+ *     (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ *     / CASE COUNT(x) WHEN 1 THEN NULL ELSE COUNT(x) - 1 END)
+ *
+ * <li>VAR_POP(x) &rarr; (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ *     / COUNT(x)
+ *
+ * <li>VAR_SAMP(x) &rarr; (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ *        / CASE COUNT(x) WHEN 1 THEN NULL ELSE COUNT(x) - 1 END
+ * </ul>
+ *
+ * <p>Since many of these rewrites introduce multiple occurrences of simpler
+ * forms like {@code COUNT(x)}, the rule gathers common sub-expressions as it
+ * goes.
+ */
+public class AggregateReduceFunctionsRule extends RelOptRule {
+       //~ Static fields/initializers 
---------------------------------------------
+
+       /** The singleton. */
+       public static final AggregateReduceFunctionsRule INSTANCE =
+               new 
AggregateReduceFunctionsRule(operand(LogicalAggregate.class, any()),
+                       RelFactories.LOGICAL_BUILDER);
+
+       //~ Constructors 
-----------------------------------------------------------
+
+       /** Creates an AggregateReduceFunctionsRule. */
+       public AggregateReduceFunctionsRule(RelOptRuleOperand operand,
+                                                                               
RelBuilderFactory relBuilderFactory) {
+               super(operand, relBuilderFactory, null);
+       }
+
+       //~ Methods 
----------------------------------------------------------------
+
+       @Override public boolean matches(RelOptRuleCall call) {
+               if (!super.matches(call)) {
+                       return false;
+               }
+               Aggregate oldAggRel = (Aggregate) call.rels[0];
+               return containsAvgStddevVarCall(oldAggRel.getAggCallList());
+       }
+
+       public void onMatch(RelOptRuleCall ruleCall) {
+               Aggregate oldAggRel = (Aggregate) ruleCall.rels[0];
+               reduceAggs(ruleCall, oldAggRel);
+       }
+
+       /**
+        * Returns whether any of the aggregates are calls to AVG, STDDEV_*, 
VAR_*.
+        *
+        * @param aggCallList List of aggregate calls
+        */
+       private boolean containsAvgStddevVarCall(List<AggregateCall> 
aggCallList) {
+               for (AggregateCall call : aggCallList) {
+                       if (isReducible(call.getAggregation().getKind())) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       /**
+        * Returns whether the aggregate call is a reducible function
+        */
+       private boolean isReducible(final SqlKind kind) {
+               if (SqlKind.AVG_AGG_FUNCTIONS.contains(kind)) {
+                       return true;
+               }
+               switch (kind) {
+                       case SUM:
+                               return true;
+               }
+               return false;
+       }
+
+       /**
+        * Reduces all calls to AVG, STDDEV_POP, STDDEV_SAMP, VAR_POP, VAR_SAMP 
in
+        * the aggregates list to.
+        *
+        * <p>It handles newly generated common subexpressions since this was 
done
+        * at the sql2rel stage.
+        */
+       private void reduceAggs(
+               RelOptRuleCall ruleCall,
+               Aggregate oldAggRel) {
+               RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+
+               List<AggregateCall> oldCalls = oldAggRel.getAggCallList();
+               final int groupCount = oldAggRel.getGroupCount();
+               final int indicatorCount = oldAggRel.getIndicatorCount();
+
+               final List<AggregateCall> newCalls = Lists.newArrayList();
+               final Map<AggregateCall, RexNode> aggCallMapping = 
Maps.newHashMap();
+
+               final List<RexNode> projList = Lists.newArrayList();
+
+               // pass through group key (+ indicators if present)
+               for (int i = 0; i < groupCount + indicatorCount; ++i) {
+                       projList.add(
+                               rexBuilder.makeInputRef(
+                                       getFieldType(oldAggRel, i),
+                                       i));
+               }
+
+               // List of input expressions. If a particular aggregate needs 
more, it
+               // will add an expression to the end, and we will create an 
extra
+               // project.
+               final RelBuilder relBuilder = ruleCall.builder();
+               relBuilder.push(oldAggRel.getInput());
+               final List<RexNode> inputExprs = new 
ArrayList<>(relBuilder.fields());
+
+               // create new agg function calls and rest of project list 
together
+               for (AggregateCall oldCall : oldCalls) {
+                       projList.add(
+                               reduceAgg(
+                                       oldAggRel, oldCall, newCalls, 
aggCallMapping, inputExprs));
+               }
+
+               final int extraArgCount =
+                       inputExprs.size() - 
relBuilder.peek().getRowType().getFieldCount();
+               if (extraArgCount > 0) {
+                       relBuilder.project(inputExprs,
+                               CompositeList.of(
+                                       
relBuilder.peek().getRowType().getFieldNames(),
+                                       
Collections.<String>nCopies(extraArgCount, null)));
+               }
+               newAggregateRel(relBuilder, oldAggRel, newCalls);
+               newCalcRel(relBuilder, oldAggRel, projList);
+               ruleCall.transformTo(relBuilder.build());
+       }
+
+       private RexNode reduceAgg(
+               Aggregate oldAggRel,
+               AggregateCall oldCall,
+               List<AggregateCall> newCalls,
+               Map<AggregateCall, RexNode> aggCallMapping,
+               List<RexNode> inputExprs) {
+               final SqlKind kind = oldCall.getAggregation().getKind();
+               if (isReducible(kind)) {
+                       switch (kind) {
+                               case SUM:
+                                       // replace original SUM(x) with
+                                       // case COUNT(x) when 0 then null else 
SUM0(x) end
+                                       return reduceSum(oldAggRel, oldCall, 
newCalls, aggCallMapping);
+                               case AVG:
+                                       // replace original AVG(x) with SUM(x) 
/ COUNT(x)
+                                       return reduceAvg(oldAggRel, oldCall, 
newCalls, aggCallMapping, inputExprs);
+                               case STDDEV_POP:
+                                       // replace original STDDEV_POP(x) with
+                                       //   SQRT(
+                                       //     (SUM(x * x) - SUM(x) * SUM(x) / 
COUNT(x))
+                                       //     / COUNT(x))
+                                       return reduceStddev(oldAggRel, oldCall, 
true, true, newCalls,
+                                               aggCallMapping, inputExprs);
+                               case STDDEV_SAMP:
+                                       // replace original STDDEV_POP(x) with
+                                       //   SQRT(
+                                       //     (SUM(x * x) - SUM(x) * SUM(x) / 
COUNT(x))
+                                       //     / CASE COUNT(x) WHEN 1 THEN NULL 
ELSE COUNT(x) - 1 END)
+                                       return reduceStddev(oldAggRel, oldCall, 
false, true, newCalls,
+                                               aggCallMapping, inputExprs);
+                               case VAR_POP:
+                                       // replace original VAR_POP(x) with
+                                       //     (SUM(x * x) - SUM(x) * SUM(x) / 
COUNT(x))
+                                       //     / COUNT(x)
+                                       return reduceStddev(oldAggRel, oldCall, 
true, false, newCalls,
+                                               aggCallMapping, inputExprs);
+                               case VAR_SAMP:
+                                       // replace original VAR_POP(x) with
+                                       //     (SUM(x * x) - SUM(x) * SUM(x) / 
COUNT(x))
+                                       //     / CASE COUNT(x) WHEN 1 THEN NULL 
ELSE COUNT(x) - 1 END
+                                       return reduceStddev(oldAggRel, oldCall, 
false, false, newCalls,
+                                               aggCallMapping, inputExprs);
+                               default:
+                                       throw Util.unexpected(kind);
+                       }
+               } else {
+                       // anything else:  preserve original call
+                       RexBuilder rexBuilder = 
oldAggRel.getCluster().getRexBuilder();
+                       final int nGroups = oldAggRel.getGroupCount();
+                       List<RelDataType> oldArgTypes =
+                               SqlTypeUtil.projectTypes(
+                                       oldAggRel.getInput().getRowType(), 
oldCall.getArgList());
+                       return rexBuilder.addAggCall(oldCall,
+                               nGroups,
+                               oldAggRel.indicator,
+                               newCalls,
+                               aggCallMapping,
+                               oldArgTypes);
+               }
+       }
+
+       private AggregateCall createAggregateCallWithBinding(
+               RelDataTypeFactory typeFactory,
+               SqlAggFunction aggFunction,
+               RelDataType operandType,
+               Aggregate oldAggRel,
+               AggregateCall oldCall,
+               int argOrdinal) {
+               final Aggregate.AggCallBinding binding =
+                       new Aggregate.AggCallBinding(typeFactory, aggFunction,
+                               ImmutableList.of(operandType), 
oldAggRel.getGroupCount(),
+                               oldCall.filterArg >= 0);
+               return AggregateCall.create(aggFunction,
+                       oldCall.isDistinct(),
+                       oldCall.isApproximate(),
+                       ImmutableIntList.of(argOrdinal),
+                       oldCall.filterArg,
+                       aggFunction.inferReturnType(binding),
+                       null);
+       }
+
+       private RexNode reduceAvg(
+               Aggregate oldAggRel,
+               AggregateCall oldCall,
+               List<AggregateCall> newCalls,
+               Map<AggregateCall, RexNode> aggCallMapping,
+               List<RexNode> inputExprs) {
+               final int nGroups = oldAggRel.getGroupCount();
+               final RexBuilder rexBuilder = 
oldAggRel.getCluster().getRexBuilder();
+               final int iAvgInput = oldCall.getArgList().get(0);
+               final RelDataType avgInputType =
+                       getFieldType(
+                               oldAggRel.getInput(),
+                               iAvgInput);
+               final AggregateCall sumCall =
+                       AggregateCall.create(SqlStdOperatorTable.SUM,
+                               oldCall.isDistinct(),
+                               oldCall.isApproximate(),
+                               oldCall.getArgList(),
+                               oldCall.filterArg,
+                               oldAggRel.getGroupCount(),
+                               oldAggRel.getInput(),
+                               null,
+                               null);
+               final AggregateCall countCall =
+                       AggregateCall.create(SqlStdOperatorTable.COUNT,
+                               oldCall.isDistinct(),
+                               oldCall.isApproximate(),
+                               oldCall.getArgList(),
+                               oldCall.filterArg,
+                               oldAggRel.getGroupCount(),
+                               oldAggRel.getInput(),
+                               null,
+                               null);
+
+               // NOTE:  these references are with respect to the output
+               // of newAggRel
+               RexNode numeratorRef =
+                       rexBuilder.addAggCall(sumCall,
+                               nGroups,
+                               oldAggRel.indicator,
+                               newCalls,
+                               aggCallMapping,
+                               ImmutableList.of(avgInputType));
+               final RexNode denominatorRef =
+                       rexBuilder.addAggCall(countCall,
+                               nGroups,
+                               oldAggRel.indicator,
+                               newCalls,
+                               aggCallMapping,
+                               ImmutableList.of(avgInputType));
+
+               final RelDataTypeFactory typeFactory = 
oldAggRel.getCluster().getTypeFactory();
+               final RelDataType avgType = 
typeFactory.createTypeWithNullability(
+                       oldCall.getType(), numeratorRef.getType().isNullable());
+               numeratorRef = rexBuilder.ensureType(avgType, numeratorRef, 
true);
+               final RexNode divideRef =
+                       rexBuilder.makeCall(SqlStdOperatorTable.DIVIDE, 
numeratorRef, denominatorRef);
+               return rexBuilder.makeCast(oldCall.getType(), divideRef);
+       }
+
+       private RexNode reduceSum(
+               Aggregate oldAggRel,
+               AggregateCall oldCall,
+               List<AggregateCall> newCalls,
+               Map<AggregateCall, RexNode> aggCallMapping) {
+               final int nGroups = oldAggRel.getGroupCount();
+               RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+               int arg = oldCall.getArgList().get(0);
+               RelDataType argType =
+                       getFieldType(
+                               oldAggRel.getInput(),
+                               arg);
+               final AggregateCall sumZeroCall =
+                       AggregateCall.create(SqlStdOperatorTable.SUM0, 
oldCall.isDistinct(),
+                               oldCall.isApproximate(), oldCall.getArgList(), 
oldCall.filterArg,
+                               oldAggRel.getGroupCount(), 
oldAggRel.getInput(), null,
+                               oldCall.name);
+               final AggregateCall countCall =
+                       AggregateCall.create(SqlStdOperatorTable.COUNT,
+                               oldCall.isDistinct(),
+                               oldCall.isApproximate(),
+                               oldCall.getArgList(),
+                               oldCall.filterArg,
+                               oldAggRel.getGroupCount(),
+                               oldAggRel,
+                               null,
+                               null);
+
+               // NOTE:  these references are with respect to the output
+               // of newAggRel
+               RexNode sumZeroRef =
+                       rexBuilder.addAggCall(sumZeroCall,
+                               nGroups,
+                               oldAggRel.indicator,
+                               newCalls,
+                               aggCallMapping,
+                               ImmutableList.of(argType));
+               if (!oldCall.getType().isNullable()) {
+                       // If SUM(x) is not nullable, the validator must have 
determined that
+                       // nulls are impossible (because the group is never 
empty and x is never
+                       // null). Therefore we translate to SUM0(x).
+                       return sumZeroRef;
+               }
+               RexNode countRef =
+                       rexBuilder.addAggCall(countCall,
+                               nGroups,
+                               oldAggRel.indicator,
+                               newCalls,
+                               aggCallMapping,
+                               ImmutableList.of(argType));
+               return rexBuilder.makeCall(SqlStdOperatorTable.CASE,
+                       rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+                               countRef, 
rexBuilder.makeExactLiteral(BigDecimal.ZERO)),
+                       rexBuilder.makeCast(sumZeroRef.getType(), 
rexBuilder.constantNull()),
+                       sumZeroRef);
+       }
+
+       private RexNode reduceStddev(
+               Aggregate oldAggRel,
+               AggregateCall oldCall,
+               boolean biased,
+               boolean sqrt,
+               List<AggregateCall> newCalls,
+               Map<AggregateCall, RexNode> aggCallMapping,
+               List<RexNode> inputExprs) {
+               // stddev_pop(x) ==>
+               //   power(
+               //     (sum(x * x) - sum(x) * sum(x) / count(x))
+               //     / count(x),
+               //     .5)
+               //
+               // stddev_samp(x) ==>
+               //   power(
+               //     (sum(x * x) - sum(x) * sum(x) / count(x))
+               //     / nullif(count(x) - 1, 0),
+               //     .5)
+               final int nGroups = oldAggRel.getGroupCount();
+               final RelOptCluster cluster = oldAggRel.getCluster();
+               final RexBuilder rexBuilder = cluster.getRexBuilder();
+               final RelDataTypeFactory typeFactory = cluster.getTypeFactory();
+
+               assert oldCall.getArgList().size() == 1 : oldCall.getArgList();
+               final int argOrdinal = oldCall.getArgList().get(0);
+               final RelDataType argOrdinalType = 
getFieldType(oldAggRel.getInput(), argOrdinal);
+               final RelDataType oldCallType =
+                       typeFactory.createTypeWithNullability(oldCall.getType(),
+                               argOrdinalType.isNullable());
+
+               final RexNode argRef =
+                       rexBuilder.ensureType(oldCallType, 
inputExprs.get(argOrdinal), true);
+               final int argRefOrdinal = lookupOrAdd(inputExprs, argRef);
+
+               final RexNode argSquared = 
rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY,
+                       argRef, argRef);
+               final int argSquaredOrdinal = lookupOrAdd(inputExprs, 
argSquared);
+
+               final AggregateCall sumArgSquaredAggCall =
+                       createAggregateCallWithBinding(typeFactory, 
SqlStdOperatorTable.SUM,
+                               argSquared.getType(), oldAggRel, oldCall, 
argSquaredOrdinal);
+
+               final RexNode sumArgSquared =
+                       rexBuilder.addAggCall(sumArgSquaredAggCall,
+                               nGroups,
+                               oldAggRel.indicator,
+                               newCalls,
+                               aggCallMapping,
+                               
ImmutableList.of(sumArgSquaredAggCall.getType()));
+
+               final AggregateCall sumArgAggCall =
+                       AggregateCall.create(SqlStdOperatorTable.SUM,
+                               oldCall.isDistinct(),
+                               oldCall.isApproximate(),
+                               ImmutableIntList.of(argOrdinal),
+                               oldCall.filterArg,
+                               oldAggRel.getGroupCount(),
+                               oldAggRel.getInput(),
+                               null,
+                               null);
+
+               final RexNode sumArg =
+                       rexBuilder.addAggCall(sumArgAggCall,
+                               nGroups,
+                               oldAggRel.indicator,
+                               newCalls,
+                               aggCallMapping,
+                               ImmutableList.of(sumArgAggCall.getType()));
+               final RexNode sumArgCast = rexBuilder.ensureType(oldCallType, 
sumArg, true);
+               final RexNode sumSquaredArg =
+                       rexBuilder.makeCall(
+                               SqlStdOperatorTable.MULTIPLY, sumArgCast, 
sumArgCast);
+
+               final AggregateCall countArgAggCall =
+                       AggregateCall.create(SqlStdOperatorTable.COUNT,
+                               oldCall.isDistinct(),
+                               oldCall.isApproximate(),
+                               oldCall.getArgList(),
+                               oldCall.filterArg,
+                               oldAggRel.getGroupCount(),
+                               oldAggRel,
+                               null,
+                               null);
+
+               final RexNode countArg =
+                       rexBuilder.addAggCall(countArgAggCall,
+                               nGroups,
+                               oldAggRel.indicator,
+                               newCalls,
+                               aggCallMapping,
+                               ImmutableList.of(argOrdinalType));
+
+               final RexNode avgSumSquaredArg =
+                       rexBuilder.makeCall(
+                               SqlStdOperatorTable.DIVIDE, sumSquaredArg, 
countArg);
+
+               final RexNode diff =
+                       rexBuilder.makeCall(
+                               SqlStdOperatorTable.MINUS,
+                               sumArgSquared, avgSumSquaredArg);
+
+               final RexNode denominator;
+               if (biased) {
+                       denominator = countArg;
+               } else {
+                       final RexLiteral one =
+                               rexBuilder.makeExactLiteral(BigDecimal.ONE);
+                       final RexNode nul =
+                               rexBuilder.makeCast(countArg.getType(), 
rexBuilder.constantNull());
+                       final RexNode countMinusOne =
+                               rexBuilder.makeCall(
+                                       SqlStdOperatorTable.MINUS, countArg, 
one);
+                       final RexNode countEqOne =
+                               rexBuilder.makeCall(
+                                       SqlStdOperatorTable.EQUALS, countArg, 
one);
+                       denominator =
+                               rexBuilder.makeCall(
+                                       SqlStdOperatorTable.CASE,
+                                       countEqOne, nul, countMinusOne);
+               }
+
+               final RexNode div =
+                       rexBuilder.makeCall(
+                               SqlStdOperatorTable.DIVIDE, diff, denominator);
+
+               RexNode result = div;
+               if (sqrt) {
+                       final RexNode half =
+                               rexBuilder.makeExactLiteral(new 
BigDecimal("0.5"));
+                       result =
+                               rexBuilder.makeCall(
+                                       SqlStdOperatorTable.POWER, div, half);
+               }
+
+               return rexBuilder.makeCast(
+                       oldCall.getType(), result);
+       }
+
+       /**
+        * Finds the ordinal of an element in a list, or adds it.
+        *
+        * @param list    List
+        * @param element Element to lookup or add
+        * @param <T>     Element type
+        * @return Ordinal of element in list
+        */
+       private static <T> int lookupOrAdd(List<T> list, T element) {
+               int ordinal = list.indexOf(element);
+               if (ordinal == -1) {
+                       ordinal = list.size();
+                       list.add(element);
+               }
+               return ordinal;
+       }
+
+       /**
+        * Do a shallow clone of oldAggRel and update aggCalls. Could be 
refactored
+        * into Aggregate and subclasses - but it's only needed for some
+        * subclasses.
+        *
+        * @param relBuilder Builder of relational expressions; at the top of 
its
+        *                   stack is its input
+        * @param oldAggregate LogicalAggregate to clone.
+        * @param newCalls  New list of AggregateCalls
+        */
+       protected void newAggregateRel(RelBuilder relBuilder,
+                                                                  Aggregate 
oldAggregate,
+                                                                  
List<AggregateCall> newCalls) {
+               relBuilder.aggregate(
+                       relBuilder.groupKey(oldAggregate.getGroupSet(),
+                               oldAggregate.getGroupSets()),
+                       newCalls);
+       }
+
+       /**
+        * Add a calc with the expressions to compute the original agg calls 
from the
+        * decomposed ones.
+        *
+        * @param relBuilder Builder of relational expressions; at the top of 
its
+        *                   stack is its input
+        * @param oldAggregate The original LogicalAggregate that is replaced.
+        * @param exprs The expressions to compute the original agg calls.
+        */
+       protected void newCalcRel(RelBuilder relBuilder,
+                                                         Aggregate 
oldAggregate,
+                                                         List<RexNode> exprs) {
+               relBuilder.project(exprs, 
oldAggregate.getRowType().getFieldNames());
+       }
+
+       private RelDataType getFieldType(RelNode relNode, int i) {
+               final RelDataTypeField inputField =
+                       relNode.getRowType().getFieldList().get(i);
+               return inputField.getType();
+       }
+}
+
+// End AggregateReduceFunctionsRule.java

http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
index e1e93c7..17b6f1b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -30,7 +30,7 @@ import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.util.ImmutableBitSet
 import org.apache.flink.table.plan.nodes.FlinkConventions
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class FlinkLogicalAggregate(
     cluster: RelOptCluster,
@@ -74,8 +74,11 @@ private class FlinkLogicalAggregateConverter
 
     // we do not support these functions natively
     // they have to be converted using the AggregateReduceFunctionsRule
-    val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
-      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | 
SqlKind.VAR_SAMP => false
+    val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+      // we support AVG
+      case SqlKind.AVG => true
+      // but none of the other AVG agg functions
+      case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
       case _ => true
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 3e605e8..f2576f4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelShuttle}
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.util.ImmutableBitSet
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -33,6 +34,8 @@ import org.apache.flink.table.plan.logical.LogicalWindow
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 import org.apache.flink.table.plan.nodes.FlinkConventions
 
+import scala.collection.JavaConverters._
+
 class FlinkLogicalWindowAggregate(
     window: LogicalWindow,
     namedProperties: Seq[NamedWindowProperty],
@@ -103,6 +106,20 @@ class FlinkLogicalWindowAggregateConverter
     FlinkConventions.LOGICAL,
     "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+    // we do not support these functions natively
+    // they have to be converted using the WindowAggregateReduceFunctionsRule
+    agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+      // we support AVG
+      case SqlKind.AVG => true
+      // but none of the other AVG agg functions
+      case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+      case _ => true
+    }
+  }
+
   override def convert(rel: RelNode): RelNode = {
     val agg = rel.asInstanceOf[LogicalWindowAggregate]
     val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)

http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index d3ad2ac..9f3b8e9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -93,6 +93,7 @@ object FlinkRuleSets {
 
     // reduce aggregate functions like AVG, STDDEV_POP etc.
     AggregateReduceFunctionsRule.INSTANCE,
+    WindowAggregateReduceFunctionsRule.INSTANCE,
 
     // remove unnecessary sort rule
     SortRemoveRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala
new file mode 100644
index 0000000..4ca2b33
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import java.util
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall, RelFactories}
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+/**
+  * Rule to convert complex aggregation functions into simpler ones.
+  * Have a look at [[AggregateReduceFunctionsRule]] for details.
+  */
+class WindowAggregateReduceFunctionsRule extends AggregateReduceFunctionsRule(
+    RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.any()),
+    RelFactories.LOGICAL_BUILDER) {
+
+  override def newAggregateRel(
+      relBuilder: RelBuilder,
+      oldAgg: Aggregate,
+      newCalls: util.List[AggregateCall]): Unit = {
+
+    // create a LogicalAggregate with simpler aggregation functions
+    super.newAggregateRel(relBuilder, oldAgg, newCalls)
+    // pop LogicalAggregate from RelBuilder
+    val newAgg = relBuilder.build().asInstanceOf[LogicalAggregate]
+
+    // create a new LogicalWindowAggregate (based on the new LogicalAggregate) 
and push it on the
+    // RelBuilder
+    val oldWindowAgg = oldAgg.asInstanceOf[LogicalWindowAggregate]
+    relBuilder.push(LogicalWindowAggregate.create(
+      oldWindowAgg.getWindow,
+      oldWindowAgg.getNamedProperties,
+      newAgg))
+  }
+
+  override def newCalcRel(
+      relBuilder: RelBuilder,
+      oldAgg: Aggregate,
+      exprs: util.List[RexNode]): Unit = {
+
+    // add all named properties of the window to the selection
+    val oldWindowAgg = oldAgg.asInstanceOf[LogicalWindowAggregate]
+    oldWindowAgg.getNamedProperties.foreach(np => 
exprs.add(relBuilder.field(np.name)))
+
+    // create a LogicalCalc that computes the complex aggregates and forwards 
the window properties
+    relBuilder.project(exprs, oldAgg.getRowType.getFieldNames)
+  }
+
+}
+
+object WindowAggregateReduceFunctionsRule {
+  val INSTANCE = new WindowAggregateReduceFunctionsRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index df9b1c5..ce0a9c9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -1259,7 +1259,7 @@ object AggregateUtil {
               }
             }
 
-          case _: SqlAvgAggFunction =>
+          case a: SqlAvgAggFunction if a.kind == SqlKind.AVG =>
             aggregates(index) = sqlTypeName match {
               case TINYINT =>
                 new ByteAvgAggFunction
@@ -1413,7 +1413,7 @@ object AggregateUtil {
             accTypes(index) = udagg.accType
 
           case unSupported: SqlAggFunction =>
-            throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+            throw new TableException(s"Unsupported Function: 
'${unSupported.getName}'")
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
index 8d06bcd..b1369e2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
@@ -304,4 +304,53 @@ class GroupWindowTest extends TableTestBase {
 
     util.verifySql(sql, expected)
   }
+
+  @Test
+  def testDecomposableAggFunctions() = {
+    val util = batchTestUtil()
+    util.addTable[(Int, String, Long, Timestamp)]("MyTable", 'a, 'b, 'c, 
'rowtime)
+
+    val sql =
+      "SELECT " +
+        "  VAR_POP(c), VAR_SAMP(c), STDDEV_POP(c), STDDEV_SAMP(c), " +
+        "  TUMBLE_START(rowtime, INTERVAL '15' MINUTE), " +
+        "  TUMBLE_END(rowtime, INTERVAL '15' MINUTE)" +
+        "FROM MyTable " +
+        "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "rowtime", "c",
+              "*(c, c) AS $f2", "*(c, c) AS $f3", "*(c, c) AS $f4", "*(c, c) 
AS $f5")
+          ),
+          term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+          term("select",
+            "SUM($f2) AS $f0",
+            "SUM(c) AS $f1",
+            "COUNT(c) AS $f2",
+            "SUM($f3) AS $f3",
+            "SUM($f4) AS $f4",
+            "SUM($f5) AS $f5",
+            "start('w$) AS w$start",
+            "end('w$) AS w$end",
+            "rowtime('w$) AS w$rowtime")
+        ),
+        term("select",
+          "CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0",
+          "CAST(/(-($f3, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 
1)))) AS EXPR$1",
+          "CAST(POWER(/(-($f4, /(*($f1, $f1), $f2)), $f2), 0.5)) AS EXPR$2",
+          "CAST(POWER(/(-($f5, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, 
-($f2, 1))), 0.5)) " +
+            "AS EXPR$3",
+          "CAST(w$start) AS EXPR$4",
+          "CAST(w$end) AS EXPR$5")
+      )
+
+    util.verifySql(sql, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
index ad44e09..27c1d7f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
@@ -449,4 +449,49 @@ class GroupWindowTest extends TableTestBase {
 
     util.verifyTable(windowedTable, expected)
   }
+
+  @Test
+  def testDecomposableAggFunctions(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String, Long)]('rowtime, 'a, 'b, 'c)
+
+    val windowedTable = table
+      .window(Tumble over 15.minutes on 'rowtime as 'w)
+      .groupBy('w)
+      .select('c.varPop, 'c.varSamp, 'c.stddevPop, 'c.stddevSamp, 'w.start, 
'w.end)
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "c", "rowtime",
+              "*(c, c) AS $f2", "*(c, c) AS $f3", "*(c, c) AS $f4", "*(c, c) 
AS $f5")
+          ),
+          term("window", TumblingGroupWindow('w, 'rowtime, 900000.millis)),
+          term("select",
+            "SUM($f2) AS $f0",
+            "SUM(c) AS $f1",
+            "COUNT(c) AS $f2",
+            "SUM($f3) AS $f3",
+            "SUM($f4) AS $f4",
+            "SUM($f5) AS $f5",
+            "start('w) AS TMP_4",
+            "end('w) AS TMP_5")
+        ),
+        term("select",
+          "CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS TMP_0",
+          "CAST(/(-($f3, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 
1)))) AS TMP_1",
+          "CAST(POWER(/(-($f4, /(*($f1, $f1), $f2)), $f2), 0.5)) AS TMP_2",
+          "CAST(POWER(/(-($f5, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, 
-($f2, 1))), 0.5)) " +
+            "AS TMP_3",
+          "TMP_4",
+          "TMP_5")
+      )
+
+    util.verifyTable(windowedTable, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
index d7d5f1e..d292834 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
@@ -260,4 +260,50 @@ class GroupWindowTest extends TableTestBase {
 
     streamUtil.verifySql(sql, expected)
   }
+
+  @Test
+  def testDecomposableAggFunctions() = {
+
+    val sql =
+      "SELECT " +
+        "  VAR_POP(c), VAR_SAMP(c), STDDEV_POP(c), STDDEV_SAMP(c), " +
+        "  TUMBLE_START(rowtime, INTERVAL '15' MINUTE), " +
+        "  TUMBLE_END(rowtime, INTERVAL '15' MINUTE)" +
+        "FROM MyTable " +
+        "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "rowtime", "c",
+              "*(c, c) AS $f2", "*(c, c) AS $f3", "*(c, c) AS $f4", "*(c, c) 
AS $f5")
+          ),
+          term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+          term("select",
+            "SUM($f2) AS $f0",
+            "SUM(c) AS $f1",
+            "COUNT(c) AS $f2",
+            "SUM($f3) AS $f3",
+            "SUM($f4) AS $f4",
+            "SUM($f5) AS $f5",
+            "start('w$) AS w$start",
+            "end('w$) AS w$end",
+            "rowtime('w$) AS w$rowtime",
+            "proctime('w$) AS w$proctime")
+        ),
+        term("select",
+          "CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0",
+          "CAST(/(-($f3, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 
1)))) AS EXPR$1",
+          "CAST(POWER(/(-($f4, /(*($f1, $f1), $f2)), $f2), 0.5)) AS EXPR$2",
+          "CAST(POWER(/(-($f5, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, 
-($f2, 1))), 0.5)) " +
+            "AS EXPR$3",
+          "w$start AS EXPR$4",
+          "w$end AS EXPR$5")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
index 260726b..a59ad83 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
@@ -782,4 +782,49 @@ class GroupWindowTest extends TableTestBase {
 
     util.verifyTable(windowedTable, expected)
   }
+
+  @Test
+  def testDecomposableAggFunctions(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String, Long)]('rowtime.rowtime, 'a, 
'b, 'c)
+
+    val windowedTable = table
+      .window(Tumble over 15.minutes on 'rowtime as 'w)
+      .groupBy('w)
+      .select('c.varPop, 'c.varSamp, 'c.stddevPop, 'c.stddevSamp, 'w.start, 
'w.end)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "c", "rowtime",
+              "*(c, c) AS $f2", "*(c, c) AS $f3", "*(c, c) AS $f4", "*(c, c) 
AS $f5")
+          ),
+          term("window", TumblingGroupWindow('w, 'rowtime, 900000.millis)),
+          term("select",
+            "SUM($f2) AS $f0",
+            "SUM(c) AS $f1",
+            "COUNT(c) AS $f2",
+            "SUM($f3) AS $f3",
+            "SUM($f4) AS $f4",
+            "SUM($f5) AS $f5",
+            "start('w) AS TMP_4",
+            "end('w) AS TMP_5")
+        ),
+        term("select",
+          "CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS TMP_0",
+          "CAST(/(-($f3, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 
1)))) AS TMP_1",
+          "CAST(POWER(/(-($f4, /(*($f1, $f1), $f2)), $f2), 0.5)) AS TMP_2",
+          "CAST(POWER(/(-($f5, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, 
-($f2, 1))), 0.5)) " +
+            "AS TMP_3",
+          "TMP_4",
+          "TMP_5")
+      )
+
+    util.verifyTable(windowedTable, expected)
+  }
 }

Reply via email to