Repository: flink Updated Branches: refs/heads/master 7d5100742 -> babee2772
[FLINK-6584] [table] Add SQL group window functions to retrieve time attributes. This closes #4199. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ad8f7eb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ad8f7eb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ad8f7eb Branch: refs/heads/master Commit: 2ad8f7eb5690755e2a9e7e93181cd34e1093f23a Parents: 7d51007 Author: twalthr <[email protected]> Authored: Wed Oct 18 15:28:41 2017 +0200 Committer: Fabian Hueske <[email protected]> Committed: Wed Oct 25 18:53:29 2017 +0200 ---------------------------------------------------------------------- .../calcite/sql/fun/SqlGroupFunction.java | 128 +++++++++ .../calcite/RelTimeIndicatorConverter.scala | 15 +- .../logical/rel/LogicalWindowAggregate.scala | 17 +- .../DataStreamGroupWindowAggregate.scala | 3 + .../flink/table/plan/rules/FlinkRuleSets.scala | 8 +- .../common/LogicalWindowAggregateRule.scala | 32 ++- .../rules/common/WindowPropertiesRule.scala | 265 +++++++++++++++++++ .../common/WindowStartEndPropertiesRule.scala | 184 ------------- .../DataSetLogicalWindowAggregateRule.scala | 17 +- .../DataStreamLogicalWindowAggregateRule.scala | 15 +- .../table/runtime/aggregate/AggregateUtil.scala | 21 +- .../flink/table/validate/FunctionCatalog.scala | 91 ++++++- .../table/api/batch/sql/GroupWindowTest.scala | 11 +- .../validation/GroupWindowValidationTest.scala | 30 +++ .../table/api/stream/sql/GroupWindowTest.scala | 147 +++++++--- .../plan/TimeIndicatorConversionTest.scala | 10 +- .../runtime/stream/TimeAttributesITCase.scala | 146 ++++++++++ 17 files changed, 862 insertions(+), 278 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java new file mode 100644 index 0000000..fd5ddf9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.sql.fun; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; +import org.apache.calcite.sql.validate.SqlMonotonicity; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/* + * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL CALCITE-1867 IS FIXED. + */ + +/** + * SQL function that computes keys by which rows can be partitioned and + * aggregated. + * + * <p>Grouped window functions always occur in the GROUP BY clause. They often + * have auxiliary functions that access information about the group. For + * example, {@code HOP} is a group function, and its auxiliary functions are + * {@code HOP_START} and {@code HOP_END}. Here they are used in a streaming + * query: + * + * <blockquote><pre> + * SELECT STREAM HOP_START(rowtime, INTERVAL '1' HOUR), + * HOP_END(rowtime, INTERVAL '1' HOUR), + * MIN(unitPrice) + * FROM Orders + * GROUP BY HOP(rowtime, INTERVAL '1' HOUR), productId + * </pre></blockquote> + */ +public class SqlGroupFunction extends SqlFunction { + /** The grouped function, if this an auxiliary function; null otherwise. */ + final SqlGroupFunction groupFunction; + + /** Creates a SqlGroupFunction. + * + * @param name Function name + * @param kind Kind + * @param groupFunction Group function, if this is an auxiliary; + * null, if this is a group function + * @param operandTypeChecker Operand type checker + */ + public SqlGroupFunction(String name, SqlKind kind, SqlGroupFunction groupFunction, + SqlOperandTypeChecker operandTypeChecker) { + super(name, kind, ReturnTypes.ARG0, null, + operandTypeChecker, SqlFunctionCategory.SYSTEM); + this.groupFunction = groupFunction; + if (groupFunction != null) { + assert groupFunction.groupFunction == null; + } + } + + /** Creates a SqlGroupFunction. + * + * @param kind Kind; also determines function name + * @param groupFunction Group function, if this is an auxiliary; + * null, if this is a group function + * @param operandTypeChecker Operand type checker + */ + public SqlGroupFunction(SqlKind kind, SqlGroupFunction groupFunction, + SqlOperandTypeChecker operandTypeChecker) { + this(kind.name(), kind, groupFunction, operandTypeChecker); + } + + /** Creates an auxiliary function from this grouped window function. + * + * @param kind Kind; also determines function name + */ + public SqlGroupFunction auxiliary(SqlKind kind) { + return auxiliary(kind.name(), kind); + } + + /** Creates an auxiliary function from this grouped window function. + * + * @param name Function name + * @param kind Kind + */ + public SqlGroupFunction auxiliary(String name, SqlKind kind) { + return new SqlGroupFunction(name, kind, this, getOperandTypeChecker()); + } + + /** Returns a list of this grouped window function's auxiliary functions. */ + public List<SqlGroupFunction> getAuxiliaryFunctions() { + return ImmutableList.of(); + } + + @Override public boolean isGroup() { + // Auxiliary functions are not group functions + return groupFunction == null; + } + + @Override public boolean isGroupAuxiliary() { + return groupFunction != null; + } + + @Override public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) { + // Monotonic iff its first argument is, but not strict. + // + // Note: This strategy happens to works for all current group functions + // (HOP, TUMBLE, SESSION). When there are exceptions to this rule, we'll + // make the method abstract. + return call.getOperandMonotonicity(0).unstrict(); + } +} + +// End SqlGroupFunction.java http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala index 1f88737..4f3fbaa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala @@ -30,6 +30,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, import org.apache.flink.table.functions.sql.ProctimeSqlFunction import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType +import org.apache.flink.table.validate.BasicOperatorTable import scala.collection.JavaConversions._ import scala.collection.mutable @@ -400,7 +401,7 @@ class RexTimeIndicatorMaterializer( val materializedOperands = updatedCall.getOperator match { // skip materialization for special operators - case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE => + case BasicOperatorTable.SESSION | BasicOperatorTable.HOP | BasicOperatorTable.TUMBLE => updatedCall.getOperands.toList case _ => @@ -427,6 +428,18 @@ class RexTimeIndicatorMaterializer( isTimeIndicatorType(updatedCall.getOperands.get(0).getType) => updatedCall + // do not modify window time attributes + case BasicOperatorTable.TUMBLE_ROWTIME | + BasicOperatorTable.TUMBLE_PROCTIME | + BasicOperatorTable.HOP_ROWTIME | + BasicOperatorTable.HOP_PROCTIME | + BasicOperatorTable.SESSION_ROWTIME | + BasicOperatorTable.SESSION_PROCTIME + // since we materialize groupings on time indicators, + // we cannot check the operands anymore but the return type at least + if isTimeIndicatorType(updatedCall.getType) => + updatedCall + // materialize function's result and operands case _ if isTimeIndicatorType(updatedCall.getType) => updatedCall.clone(timestamp, materializedOperands) http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala index 81f6bf0..6424828 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala @@ -35,11 +35,11 @@ class LogicalWindowAggregate( cluster: RelOptCluster, traitSet: RelTraitSet, child: RelNode, - indicator: Boolean, + indicatorFlag: Boolean, groupSet: ImmutableBitSet, groupSets: util.List[ImmutableBitSet], aggCalls: util.List[AggregateCall]) - extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls) { + extends Aggregate(cluster, traitSet, child, indicatorFlag, groupSet, groupSets, aggCalls) { def getWindow: LogicalWindow = window @@ -66,6 +66,19 @@ class LogicalWindowAggregate( aggCalls) } + def copy(namedProperties: Seq[NamedWindowProperty]): LogicalWindowAggregate = { + new LogicalWindowAggregate( + window, + namedProperties, + cluster, + traitSet, + input, + indicator, + getGroupSet, + getGroupSets, + aggCalls) + } + override def accept(shuttle: RelShuttle): RelNode = shuttle.visit(this) override def deriveRowType(): RelDataType = { http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index 267bc3b..d527dc8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -146,6 +146,9 @@ class DataStreamGroupWindowAggregate( // copy the window rowtime attribute into the StreamRecord timestamp field val timeAttribute = window.timeAttribute.asInstanceOf[ResolvedFieldReference].name val timeIdx = inputSchema.fieldNames.indexOf(timeAttribute) + if (timeIdx < 0) { + throw TableException("Time attribute could not be found. This is a bug.") + } inputDS .process( http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index 073a8cc..36fdc6c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -147,8 +147,8 @@ object FlinkRuleSets { // Transform window to LogicalWindowAggregate DataSetLogicalWindowAggregateRule.INSTANCE, - WindowStartEndPropertiesRule.INSTANCE, - WindowStartEndPropertiesHavingRule.INSTANCE + WindowPropertiesRule.INSTANCE, + WindowPropertiesHavingRule.INSTANCE ) /** @@ -179,8 +179,8 @@ object FlinkRuleSets { val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList( // Transform window to LogicalWindowAggregate DataStreamLogicalWindowAggregateRule.INSTANCE, - WindowStartEndPropertiesRule.INSTANCE, - WindowStartEndPropertiesHavingRule.INSTANCE, + WindowPropertiesRule.INSTANCE, + WindowPropertiesHavingRule.INSTANCE, // simplify expressions rules ReduceExpressionsRule.FILTER_INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala index 34433f9..927700b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala @@ -23,11 +23,11 @@ import org.apache.calcite.plan.hep.HepRelVertex import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject} import org.apache.calcite.rex._ -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.util.ImmutableBitSet import org.apache.flink.table.api._ import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate +import org.apache.flink.table.validate.BasicOperatorTable import _root_.scala.collection.JavaConversions._ @@ -65,20 +65,28 @@ abstract class LogicalWindowAggregateRule(ruleName: String) val (windowExpr, windowExprIdx) = getWindowExpressions(agg).head val window = translateWindowExpression(windowExpr, project.getInput.getRowType) - val builder = call.builder() - val rexBuilder = builder.getRexBuilder + val rexBuilder = call.builder().getRexBuilder val inAggGroupExpression = getInAggregateGroupExpression(rexBuilder, windowExpr) + val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx)) - val newAgg = builder + + val builder = call.builder() + + val newProject = builder .push(project.getInput) .project(project.getChildExps.updated(windowExprIdx, inAggGroupExpression)) - .aggregate(builder.groupKey( - newGroupSet, - agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList) - .build().asInstanceOf[LogicalAggregate] + .build() + + // we don't use the builder here because it uses RelMetadataQuery which affects the plan + val newAgg = LogicalAggregate.create( + newProject, + agg.indicator, + newGroupSet, + ImmutableList.of(newGroupSet), + agg.getAggCallList) - // Create an additional project to conform with types + // create an additional project to conform with types val outAggGroupExpression = getOutAggregateGroupExpression(rexBuilder, windowExpr) val transformed = call.builder() transformed.push(LogicalWindowAggregate.create( @@ -103,19 +111,19 @@ abstract class LogicalWindowAggregateRule(ruleName: String) g._1 match { case call: RexCall => call.getOperator match { - case SqlStdOperatorTable.TUMBLE => + case BasicOperatorTable.TUMBLE => if (call.getOperands.size() == 2) { true } else { throw TableException("TUMBLE window with alignment is not supported yet.") } - case SqlStdOperatorTable.HOP => + case BasicOperatorTable.HOP => if (call.getOperands.size() == 3) { true } else { throw TableException("HOP window with alignment is not supported yet.") } - case SqlStdOperatorTable.SESSION => + case BasicOperatorTable.SESSION => if (call.getOperands.size() == 2) { true } else { http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala new file mode 100644 index 0000000..c228528 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject} +import org.apache.calcite.rex.{RexCall, RexNode} +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.table.api.{TableException, ValidationException} +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical.LogicalWindow +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate +import org.apache.flink.table.validate.BasicOperatorTable + +import scala.collection.JavaConversions._ + +abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleName: String) + extends RelOptRule(rulePredicate, ruleName) { + + override def matches(call: RelOptRuleCall): Boolean = { + val project = call.rel(0).asInstanceOf[LogicalProject] + // project includes at least on group auxiliary function + + def hasGroupAuxiliaries(node: RexNode): Boolean = { + node match { + case c: RexCall if c.getOperator.isGroupAuxiliary => true + case c: RexCall => + c.operands.exists(hasGroupAuxiliaries) + case _ => false + } + } + + project.getProjects.exists(hasGroupAuxiliaries) + } + + def convertWindowNodes( + builder: RelBuilder, + project: LogicalProject, + filter: Option[LogicalFilter], + innerProject: LogicalProject, + agg: LogicalWindowAggregate): RelNode = { + + val w = agg.getWindow + + val isRowtime = ExpressionUtils.isRowtimeAttribute(w.timeAttribute) + val isProctime = ExpressionUtils.isProctimeAttribute(w.timeAttribute) + + val startEndProperties = Seq( + NamedWindowProperty(propertyName(w, "start"), WindowStart(w.aliasAttribute)), + NamedWindowProperty(propertyName(w, "end"), WindowEnd(w.aliasAttribute))) + + // allow rowtime/proctime for rowtime windows and proctime for proctime windows + val timeProperties = if (isRowtime) { + Seq( + NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute)), + NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) + } else if (isProctime) { + Seq(NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) + } else { + Seq() + } + + val properties = startEndProperties ++ timeProperties + + // retrieve window properties + builder.push(agg.copy(properties)) + + // forward window properties + builder.project( + innerProject.getProjects ++ properties.map(np => builder.field(np.name))) + + // replace window auxiliary functions in filter by access to window properties + filter.foreach { f => + builder.filter( + f.getChildExps.map(expr => replaceGroupAuxiliaries(expr, w, builder)) + ) + } + + // replace window auxiliary functions in projection by access to window properties + builder.project( + project.getProjects.map(expr => replaceGroupAuxiliaries(expr, w, builder)), + project.getRowType.getFieldNames + ) + + builder.build() + } + + /** Generates a property name for a window. */ + private def propertyName(window: LogicalWindow, name: String): String = { + window.aliasAttribute.asInstanceOf[WindowReference].name + name + } + + /** Replace group auxiliaries with field references. */ + private def replaceGroupAuxiliaries( + node: RexNode, + window: LogicalWindow, + builder: RelBuilder): RexNode = { + + val rexBuilder = builder.getRexBuilder + + val isRowtime = ExpressionUtils.isRowtimeAttribute(window.timeAttribute) + val isProctime = ExpressionUtils.isProctimeAttribute(window.timeAttribute) + + node match { + case c: RexCall if isWindowStart(c) => + // replace expression by access to window start + rexBuilder.makeCast(c.getType, builder.field(propertyName(window, "start")), false) + + case c: RexCall if isWindowEnd(c) => + // replace expression by access to window end + rexBuilder.makeCast(c.getType, builder.field(propertyName(window, "end")), false) + + case c: RexCall if isWindowRowtime(c) => + if (isProctime) { + throw ValidationException("A proctime window cannot provide a rowtime attribute.") + } else if (isRowtime) { + // replace expression by access to window rowtime + builder.field(propertyName(window, "rowtime")) + } else { + throw TableException("Accessing the rowtime attribute of a window is not yet " + + "supported in a batch environment.") + } + + case c: RexCall if isWindowProctime(c) => + if (isProctime || isRowtime) { + // replace expression by access to window proctime + builder.field(propertyName(window, "proctime")) + } else { + throw ValidationException("Proctime is not supported in a batch environment.") + } + + case c: RexCall => + // replace expressions in children + val newOps = c.getOperands.map(replaceGroupAuxiliaries(_, window, builder)) + c.clone(c.getType, newOps) + + case x => + // preserve expression + x + } + } + + /** Checks if a RexNode is a window start auxiliary function. */ + private def isWindowStart(node: RexNode): Boolean = { + node match { + case n: RexCall if n.getOperator.isGroupAuxiliary => + n.getOperator match { + case BasicOperatorTable.TUMBLE_START | + BasicOperatorTable.HOP_START | + BasicOperatorTable.SESSION_START + => true + case _ => false + } + case _ => false + } + } + + /** Checks if a RexNode is a window end auxiliary function. */ + private def isWindowEnd(node: RexNode): Boolean = { + node match { + case n: RexCall if n.getOperator.isGroupAuxiliary => + n.getOperator match { + case BasicOperatorTable.TUMBLE_END | + BasicOperatorTable.HOP_END | + BasicOperatorTable.SESSION_END + => true + case _ => false + } + case _ => false + } + } + + /** Checks if a RexNode is a window rowtime auxiliary function. */ + private def isWindowRowtime(node: RexNode): Boolean = { + node match { + case n: RexCall if n.getOperator.isGroupAuxiliary => + n.getOperator match { + case BasicOperatorTable.TUMBLE_ROWTIME | + BasicOperatorTable.HOP_ROWTIME | + BasicOperatorTable.SESSION_ROWTIME + => true + case _ => false + } + case _ => false + } + } + + /** Checks if a RexNode is a window proctime auxiliary function. */ + private def isWindowProctime(node: RexNode): Boolean = { + node match { + case n: RexCall if n.getOperator.isGroupAuxiliary => + n.getOperator match { + case BasicOperatorTable.TUMBLE_PROCTIME | + BasicOperatorTable.HOP_PROCTIME | + BasicOperatorTable.SESSION_PROCTIME + => true + case _ => false + } + case _ => false + } + } +} + +object WindowPropertiesRule { + + val INSTANCE = new WindowPropertiesBaseRule( + RelOptRule.operand(classOf[LogicalProject], + RelOptRule.operand(classOf[LogicalProject], + RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none()))), + "WindowPropertiesRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { + + val project = call.rel(0).asInstanceOf[LogicalProject] + val innerProject = call.rel(1).asInstanceOf[LogicalProject] + val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] + + val converted = convertWindowNodes(call.builder(), project, None, innerProject, agg) + + call.transformTo(converted) + } + } +} + +object WindowPropertiesHavingRule { + + val INSTANCE = new WindowPropertiesBaseRule( + RelOptRule.operand(classOf[LogicalProject], + RelOptRule.operand(classOf[LogicalFilter], + RelOptRule.operand(classOf[LogicalProject], + RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))), + "WindowPropertiesHavingRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { + + val project = call.rel(0).asInstanceOf[LogicalProject] + val filter = call.rel(1).asInstanceOf[LogicalFilter] + val innerProject = call.rel(2).asInstanceOf[LogicalProject] + val agg = call.rel(3).asInstanceOf[LogicalWindowAggregate] + + val converted = convertWindowNodes(call.builder(), project, Some(filter), innerProject, agg) + + call.transformTo(converted) + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala deleted file mode 100644 index 33190e6..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.rules.common - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} -import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject} -import org.apache.calcite.rex.{RexCall, RexNode} -import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.calcite.tools.RelBuilder -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty -import org.apache.flink.table.expressions.{WindowEnd, WindowStart} -import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate - -import scala.collection.JavaConversions._ - -abstract class WindowStartEndPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleName: String) - extends RelOptRule(rulePredicate, ruleName) { - - override def matches(call: RelOptRuleCall): Boolean = { - val project = call.rel(0).asInstanceOf[LogicalProject] - // project includes at least on group auxiliary function - - def hasGroupAuxiliaries(node: RexNode): Boolean = { - node match { - case c: RexCall if c.getOperator.isGroupAuxiliary => true - case c: RexCall => - c.operands.exists(hasGroupAuxiliaries) - case _ => false - } - } - - project.getProjects.exists(hasGroupAuxiliaries) - } - - def replaceGroupAuxiliaries(node: RexNode, relBuilder: RelBuilder): RexNode = { - val rexBuilder = relBuilder.getRexBuilder - node match { - case c: RexCall if isWindowStart(c) => - // replace expression by access to window start - rexBuilder.makeCast(c.getType, relBuilder.field("w$start"), false) - case c: RexCall if isWindowEnd(c) => - // replace expression by access to window end - rexBuilder.makeCast(c.getType, relBuilder.field("w$end"), false) - case c: RexCall => - // replace expressions in children - val newOps = c.getOperands.map(x => replaceGroupAuxiliaries(x, relBuilder)) - c.clone(c.getType, newOps) - case x => - // preserve expression - x - } - } - - /** Checks if a RexNode is a window start auxiliary function. */ - private def isWindowStart(node: RexNode): Boolean = { - node match { - case n: RexCall if n.getOperator.isGroupAuxiliary => - n.getOperator match { - case SqlStdOperatorTable.TUMBLE_START | - SqlStdOperatorTable.HOP_START | - SqlStdOperatorTable.SESSION_START - => true - case _ => false - } - case _ => false - } - } - - /** Checks if a RexNode is a window end auxiliary function. */ - private def isWindowEnd(node: RexNode): Boolean = { - node match { - case n: RexCall if n.getOperator.isGroupAuxiliary => - n.getOperator match { - case SqlStdOperatorTable.TUMBLE_END | - SqlStdOperatorTable.HOP_END | - SqlStdOperatorTable.SESSION_END - => true - case _ => false - } - case _ => false - } - } -} - -object WindowStartEndPropertiesRule { - - val INSTANCE = new WindowStartEndPropertiesBaseRule( - RelOptRule.operand(classOf[LogicalProject], - RelOptRule.operand(classOf[LogicalProject], - RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none()))), - "WindowStartEndPropertiesRule") { - - override def onMatch(call: RelOptRuleCall): Unit = { - - val project = call.rel(0).asInstanceOf[LogicalProject] - val innerProject = call.rel(1).asInstanceOf[LogicalProject] - val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] - - // Retrieve window start and end properties - val builder = call.builder() - builder.push(LogicalWindowAggregate.create( - agg.getWindow, - Seq( - NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)), - NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))), - agg) - ) - - // forward window start and end properties - builder.project( - innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end"))) - - // replace window auxiliary function by access to window properties - builder.project( - project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder)) - ) - val res = builder.build() - call.transformTo(res) - } - } -} - -object WindowStartEndPropertiesHavingRule { - - val INSTANCE = new WindowStartEndPropertiesBaseRule( - RelOptRule.operand(classOf[LogicalProject], - RelOptRule.operand(classOf[LogicalFilter], - RelOptRule.operand(classOf[LogicalProject], - RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))), - "WindowStartEndPropertiesHavingRule") { - - override def onMatch(call: RelOptRuleCall): Unit = { - - val project = call.rel(0).asInstanceOf[LogicalProject] - val filter = call.rel(1).asInstanceOf[LogicalFilter] - val innerProject = call.rel(2).asInstanceOf[LogicalProject] - val agg = call.rel(3).asInstanceOf[LogicalWindowAggregate] - - // Retrieve window start and end properties - val builder = call.builder() - builder.push(LogicalWindowAggregate.create( - agg.getWindow, - Seq( - NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)), - NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))), - agg) - ) - - // forward window start and end properties - builder.project( - innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end"))) - - // replace window auxiliary function by access to window properties - builder.filter( - filter.getChildExps.map(expr => replaceGroupAuxiliaries(expr, builder)) - ) - - // replace window auxiliary function by access to window properties - builder.project( - project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder)) - ) - - val res = builder.build() - call.transformTo(res) - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala index 883f5ae..129e0d3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala @@ -19,15 +19,16 @@ package org.apache.flink.table.plan.rules.dataSet import java.math.BigDecimal + import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.table.api.scala.{Session, Slide, Tumble} import org.apache.flink.table.api.{TableException, Window} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference, WindowReference} import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.table.validate.BasicOperatorTable class DataSetLogicalWindowAggregateRule extends LogicalWindowAggregateRule("DataSetLogicalWindowAggregateRule") { @@ -67,22 +68,22 @@ class DataSetLogicalWindowAggregateRule } windowExpr.getOperator match { - case SqlStdOperatorTable.TUMBLE => + case BasicOperatorTable.TUMBLE => val interval = getOperandAsLong(windowExpr, 1) val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$") + w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$")) - case SqlStdOperatorTable.HOP => + case BasicOperatorTable.HOP => val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2)) val w = Slide .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)) .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$") + w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$")) - case SqlStdOperatorTable.SESSION => + case BasicOperatorTable.SESSION => val gap = getOperandAsLong(windowExpr, 1) val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$") + w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$")) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala index 050e2cd..eaad885 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala @@ -25,11 +25,12 @@ import org.apache.calcite.rex._ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.table.api.scala.{Session, Slide, Tumble} -import org.apache.flink.table.api.{TableException, Window} +import org.apache.flink.table.api.{TableException, ValidationException, Window} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.expressions.{Literal, ResolvedFieldReference, WindowReference} import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.table.validate.BasicOperatorTable class DataStreamLogicalWindowAggregateRule extends LogicalWindowAggregateRule("DataStreamLogicalWindowAggregateRule") { @@ -47,7 +48,7 @@ class DataStreamLogicalWindowAggregateRule case _ => throw TableException( - s"""Time attribute expected but ${timeAttribute.getType} encountered.""") + s"Time attribute expected but ${timeAttribute.getType} encountered.") } } @@ -69,7 +70,7 @@ class DataStreamLogicalWindowAggregateRule def getOperandAsLong(call: RexCall, idx: Int): Long = call.getOperands.get(idx) match { case v: RexLiteral => v.getValue.asInstanceOf[JBigDecimal].longValue() - case _ => throw new TableException("Only constant window descriptors are supported.") + case _ => throw TableException("Only constant window descriptors are supported.") } def getOperandAsTimeIndicator(call: RexCall, idx: Int): ResolvedFieldReference = @@ -79,18 +80,18 @@ class DataStreamLogicalWindowAggregateRule rowType.getFieldList.get(v.getIndex).getName, FlinkTypeFactory.toTypeInfo(v.getType)) case _ => - throw new TableException("Window can only be defined over a time attribute column.") + throw ValidationException("Window can only be defined over a time attribute column.") } windowExpr.getOperator match { - case SqlStdOperatorTable.TUMBLE => + case BasicOperatorTable.TUMBLE => val time = getOperandAsTimeIndicator(windowExpr, 0) val interval = getOperandAsLong(windowExpr, 1) val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS)) w.on(time).as(WindowReference("w$")) - case SqlStdOperatorTable.HOP => + case BasicOperatorTable.HOP => val time = getOperandAsTimeIndicator(windowExpr, 0) val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2)) val w = Slide @@ -99,7 +100,7 @@ class DataStreamLogicalWindowAggregateRule w.on(time).as(WindowReference("w$")) - case SqlStdOperatorTable.SESSION => + case BasicOperatorTable.SESSION => val time = getOperandAsTimeIndicator(windowExpr, 0) val gap = getOperandAsLong(windowExpr, 1) val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS)) http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 2efd13d..ce13cdc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -1145,27 +1145,32 @@ object AggregateUtil { } } + /** + * Computes the positions of (window start, window end, rowtime). + */ private[flink] def computeWindowPropertyPos( properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int], Option[Int]) = { val propPos = properties.foldRight( (None: Option[Int], None: Option[Int], None: Option[Int], 0)) { - case (p, (s, e, t, i)) => p match { + case (p, (s, e, rt, i)) => p match { case NamedWindowProperty(_, prop) => prop match { case WindowStart(_) if s.isDefined => - throw new TableException("Duplicate WindowStart property encountered. This is a bug.") + throw TableException("Duplicate window start property encountered. This is a bug.") case WindowStart(_) => - (Some(i), e, t, i - 1) + (Some(i), e, rt, i - 1) case WindowEnd(_) if e.isDefined => - throw new TableException("Duplicate WindowEnd property encountered. This is a bug.") + throw TableException("Duplicate window end property encountered. This is a bug.") case WindowEnd(_) => - (s, Some(i), t, i - 1) - case RowtimeAttribute(_) if t.isDefined => - throw new TableException( - "Duplicate Window rowtime property encountered. This is a bug.") + (s, Some(i), rt, i - 1) + case RowtimeAttribute(_) if rt.isDefined => + throw TableException("Duplicate window rowtime property encountered. This is a bug.") case RowtimeAttribute(_) => (s, e, Some(i), i - 1) + case ProctimeAttribute(_) => + // ignore this property, it will be null at the position later + (s, e, rt, i - 1) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 3398a93..6c6be0b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -18,9 +18,10 @@ package org.apache.flink.table.validate -import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.`type`.OperandTypes +import org.apache.calcite.sql.fun.{SqlGroupFunction, SqlStdOperatorTable} import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable} -import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable} +import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlOperator, SqlOperatorTable} import org.apache.flink.table.api._ import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.sql.{DateTimeSqlFunction, ScalarSqlFunctions} @@ -401,16 +402,84 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { ScalarSqlFunctions.LOG, // EXTENSIONS - SqlStdOperatorTable.TUMBLE, - SqlStdOperatorTable.TUMBLE_START, - SqlStdOperatorTable.TUMBLE_END, - SqlStdOperatorTable.HOP, - SqlStdOperatorTable.HOP_START, - SqlStdOperatorTable.HOP_END, - SqlStdOperatorTable.SESSION, - SqlStdOperatorTable.SESSION_START, - SqlStdOperatorTable.SESSION_END + BasicOperatorTable.TUMBLE, + BasicOperatorTable.HOP, + BasicOperatorTable.SESSION, + BasicOperatorTable.TUMBLE_START, + BasicOperatorTable.TUMBLE_END, + BasicOperatorTable.HOP_START, + BasicOperatorTable.HOP_END, + BasicOperatorTable.SESSION_START, + BasicOperatorTable.SESSION_END, + BasicOperatorTable.TUMBLE_PROCTIME, + BasicOperatorTable.TUMBLE_ROWTIME, + BasicOperatorTable.HOP_PROCTIME, + BasicOperatorTable.HOP_ROWTIME, + BasicOperatorTable.SESSION_PROCTIME, + BasicOperatorTable.SESSION_ROWTIME ) builtInSqlOperators.foreach(register) } + +object BasicOperatorTable { + + /** + * We need custom group auxiliary functions in order to support nested windows. + */ + + val TUMBLE: SqlGroupFunction = new SqlGroupFunction( + SqlKind.TUMBLE, + null, + OperandTypes.or(OperandTypes.DATETIME_INTERVAL, OperandTypes.DATETIME_INTERVAL_TIME)) { + override def getAuxiliaryFunctions: _root_.java.util.List[SqlGroupFunction] = + Seq( + TUMBLE_START, + TUMBLE_END, + TUMBLE_ROWTIME, + TUMBLE_PROCTIME) + } + val TUMBLE_START: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_START) + val TUMBLE_END: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_END) + val TUMBLE_ROWTIME: SqlGroupFunction = + TUMBLE.auxiliary("TUMBLE_ROWTIME", SqlKind.OTHER_FUNCTION) + val TUMBLE_PROCTIME: SqlGroupFunction = + TUMBLE.auxiliary("TUMBLE_PROCTIME", SqlKind.OTHER_FUNCTION) + + val HOP: SqlGroupFunction = new SqlGroupFunction( + SqlKind.HOP, + null, + OperandTypes.or( + OperandTypes.DATETIME_INTERVAL_INTERVAL, + OperandTypes.DATETIME_INTERVAL_INTERVAL_TIME)) { + override def getAuxiliaryFunctions: _root_.java.util.List[SqlGroupFunction] = + Seq( + HOP_START, + HOP_END, + HOP_ROWTIME, + HOP_PROCTIME) + } + val HOP_START: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_START) + val HOP_END: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_END) + val HOP_ROWTIME: SqlGroupFunction = HOP.auxiliary("HOP_ROWTIME", SqlKind.OTHER_FUNCTION) + val HOP_PROCTIME: SqlGroupFunction = HOP.auxiliary("HOP_PROCTIME", SqlKind.OTHER_FUNCTION) + + val SESSION: SqlGroupFunction = new SqlGroupFunction( + SqlKind.SESSION, + null, + OperandTypes.or(OperandTypes.DATETIME_INTERVAL, OperandTypes.DATETIME_INTERVAL_TIME)) { + override def getAuxiliaryFunctions: _root_.java.util.List[SqlGroupFunction] = + Seq( + SESSION_START, + SESSION_END, + SESSION_ROWTIME, + SESSION_PROCTIME) + } + val SESSION_START: SqlGroupFunction = SESSION.auxiliary(SqlKind.SESSION_START) + val SESSION_END: SqlGroupFunction = SESSION.auxiliary(SqlKind.SESSION_END) + val SESSION_ROWTIME: SqlGroupFunction = + SESSION.auxiliary("SESSION_ROWTIME", SqlKind.OTHER_FUNCTION) + val SESSION_PROCTIME: SqlGroupFunction = + SESSION.auxiliary("SESSION_PROCTIME", SqlKind.OTHER_FUNCTION) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala index a78aa8c..cb31866 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.api.batch.sql import java.sql.Timestamp import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ import org.apache.flink.table.plan.logical._ @@ -79,7 +80,7 @@ class GroupWindowTest extends TableTestBase { term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " + "start('w$) AS w$start, end('w$) AS w$end") ), - term("select", "CAST(w$start) AS w$start, CAST(w$end) AS w$end, c, sumA, minB") + term("select", "CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, c, sumA, minB") ) util.verifySql(sqlQuery, expected) @@ -165,7 +166,7 @@ class GroupWindowTest extends TableTestBase { term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " + "start('w$) AS w$start, end('w$) AS w$end") ), - term("select", "c, CAST(w$end) AS w$end, CAST(w$start) AS w$start, sumA, avgB") + term("select", "c, CAST(w$end) AS EXPR$1, CAST(w$start) AS EXPR$2, sumA, avgB") ) util.verifySql(sqlQuery, expected) @@ -220,7 +221,7 @@ class GroupWindowTest extends TableTestBase { term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " + "start('w$) AS w$start, end('w$) AS w$end") ), - term("select", "c, d, CAST(w$start) AS w$start, CAST(w$end) AS w$end, sumA, minB") + term("select", "c, d, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3, sumA, minB") ) util.verifySql(sqlQuery, expected) @@ -251,7 +252,7 @@ class GroupWindowTest extends TableTestBase { term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)), term("select", "c, start('w$) AS w$start, end('w$) AS w$end") ), - term("select", "CAST(w$end) AS w$end") + term("select", "CAST(w$end) AS EXPR$0") ) util.verifySql(sqlQuery, expected) @@ -289,7 +290,7 @@ class GroupWindowTest extends TableTestBase { "start('w$) AS w$start", "end('w$) AS w$end") ), - term("select", "EXPR$0", "CAST(w$start) AS w$start"), + term("select", "EXPR$0", "CAST(w$start) AS EXPR$1"), term("where", "AND(>($f1, 0), " + "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(CAST(w$start)), 86400000)), 1))") http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala index 4272170..cbf3029 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala @@ -79,4 +79,34 @@ class GroupWindowValidationTest extends TableTestBase { "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)" util.verifySql(sql, "n/a") } + + @Test(expected = classOf[TableException]) + def testWindowRowtime(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts) + + val sqlQuery = + "SELECT " + + " TUMBLE_ROWTIME(ts, INTERVAL '4' MINUTE)" + + "FROM T " + + "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c" + + // should fail because ROWTIME properties are not yet supported in batch + util.verifySql(sqlQuery, "FAIL") + } + + @Test(expected = classOf[ValidationException]) + def testWindowProctime(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts) + + val sqlQuery = + "SELECT " + + " TUMBLE_PROCTIME(ts, INTERVAL '4' MINUTE)" + + "FROM T " + + "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c" + + // should fail because PROCTIME properties are not yet supported in batch + util.verifySql(sqlQuery, "FAIL") + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala index 722c4f0..e49a63f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala @@ -44,18 +44,24 @@ class GroupWindowTest extends TableTestBase { "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)" val expected = unaryNode( - "DataStreamGroupWindowAggregate", + "DataStreamCalc", unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "rowtime", "c", "a") + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "rowtime", "c", "a") + ), + term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)), + term("select", + "COUNT(*) AS EXPR$0", + "weightedAvg(c, a) AS wAvg", + "start('w$) AS w$start", + "end('w$) AS w$end", + "rowtime('w$) AS w$rowtime", + "proctime('w$) AS w$proctime") ), - term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)), - term("select", - "COUNT(*) AS EXPR$0, " + - "weightedAvg(c, a) AS wAvg, " + - "start('w$) AS w$start, " + - "end('w$) AS w$end") + term("select", "EXPR$0", "wAvg", "w$start AS EXPR$2", "w$end AS EXPR$3") ) streamUtil.verifySql(sql, expected) } @@ -72,19 +78,25 @@ class GroupWindowTest extends TableTestBase { "GROUP BY HOP(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR)" val expected = unaryNode( - "DataStreamGroupWindowAggregate", + "DataStreamCalc", unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "proctime", "c", "a") + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "proctime", "c", "a") + ), + term("window", SlidingGroupWindow('w$, 'proctime, 3600000.millis, 900000.millis)), + term("select", + "COUNT(*) AS EXPR$0", + "weightedAvg(c, a) AS wAvg", + "start('w$) AS w$start", + "end('w$) AS w$end", + "proctime('w$) AS w$proctime") ), - term("window", SlidingGroupWindow('w$, 'proctime, 3600000.millis, 900000.millis)), - term("select", - "COUNT(*) AS EXPR$0, " + - "weightedAvg(c, a) AS wAvg, " + - "start('w$) AS w$start, " + - "end('w$) AS w$end") + term("select", "EXPR$0", "wAvg", "w$start AS EXPR$2", "w$end AS EXPR$3") ) + streamUtil.verifySql(sql, expected) } @@ -101,19 +113,25 @@ class GroupWindowTest extends TableTestBase { "GROUP BY SESSION(proctime, INTERVAL '15' MINUTE)" val expected = unaryNode( - "DataStreamGroupWindowAggregate", + "DataStreamCalc", unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "proctime", "c", "a") + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "proctime", "c", "a") + ), + term("window", SessionGroupWindow('w$, 'proctime, 900000.millis)), + term("select", + "COUNT(*) AS EXPR$0", + "weightedAvg(c, a) AS wAvg", + "start('w$) AS w$start", + "end('w$) AS w$end", + "proctime('w$) AS w$proctime") ), - term("window", SessionGroupWindow('w$, 'proctime, 900000.millis)), - term("select", - "COUNT(*) AS EXPR$0, " + - "weightedAvg(c, a) AS wAvg, " + - "start('w$) AS w$start, " + - "end('w$) AS w$end") + term("select", "EXPR$0", "wAvg", "w$start AS EXPR$2", "w$end AS EXPR$3") ) + streamUtil.verifySql(sql, expected) } @@ -136,9 +154,14 @@ class GroupWindowTest extends TableTestBase { term("select", "rowtime") ), term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)), - term("select", "COUNT(*) AS EXPR$0", "start('w$) AS w$start", "end('w$) AS w$end") + term("select", + "COUNT(*) AS EXPR$0", + "start('w$) AS w$start", + "end('w$) AS w$end", + "rowtime('w$) AS w$rowtime", + "proctime('w$) AS w$proctime") ), - term("select", "EXPR$0", "DATETIME_PLUS(w$end, 60000) AS $f1") + term("select", "EXPR$0", "DATETIME_PLUS(w$end, 60000) AS EXPR$1") ) streamUtil.verifySql(sql, expected) @@ -171,9 +194,11 @@ class GroupWindowTest extends TableTestBase { "COUNT(*) AS EXPR$0", "SUM(a) AS $f1", "start('w$) AS w$start", - "end('w$) AS w$end") + "end('w$) AS w$end", + "rowtime('w$) AS w$rowtime", + "proctime('w$) AS w$proctime") ), - term("select", "EXPR$0", "w$start"), + term("select", "EXPR$0", "w$start AS EXPR$1"), term("where", "AND(>($f1, 0), " + "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(w$start), 86400000)), 1))") @@ -181,4 +206,58 @@ class GroupWindowTest extends TableTestBase { streamUtil.verifySql(sql, expected) } + + @Test + def testMultiWindowSqlWithAggregation() = { + val sql = + s"""SELECT + TUMBLE_ROWTIME(zzzzz, INTERVAL '0.004' SECOND), + TUMBLE_END(zzzzz, INTERVAL '0.004' SECOND), + COUNT(`a`) AS `a` + FROM ( + SELECT + COUNT(`a`) AS `a`, + TUMBLE_ROWTIME(rowtime, INTERVAL '0.002' SECOND) AS `zzzzz` + FROM MyTable + GROUP BY TUMBLE(rowtime, INTERVAL '0.002' SECOND) + ) + GROUP BY TUMBLE(zzzzz, INTERVAL '0.004' SECOND)""" + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "rowtime, a") + ), + term("window", TumblingGroupWindow('w$, 'rowtime, 2.millis)), + term("select", + "COUNT(a) AS a", + "start('w$) AS w$start", + "end('w$) AS w$end", + "rowtime('w$) AS w$rowtime", + "proctime('w$) AS w$proctime") + ), + term("select", "a", "w$rowtime AS zzzzz") + ), + term("window", TumblingGroupWindow('w$, 'zzzzz, 4.millis)), + term("select", + "COUNT(*) AS a", + "start('w$) AS w$start", + "end('w$) AS w$end", + "rowtime('w$) AS w$rowtime", + "proctime('w$) AS w$proctime") + ), + term("select", "w$rowtime AS EXPR$0", "w$end AS EXPR$1", "a") + ) + + streamUtil.verifySql(sql, expected) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala index 1714ec8..009ae40 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala @@ -343,9 +343,15 @@ class TimeIndicatorConversionTest extends TableTestBase { WindowReference("w$"), 'rowtime, 100.millis)), - term("select", "long", "SUM(int) AS EXPR$2", "start('w$) AS w$start", "end('w$) AS w$end") + term("select", + "long", + "SUM(int) AS EXPR$2", + "start('w$) AS w$start", + "end('w$) AS w$end", + "rowtime('w$) AS w$rowtime", + "proctime('w$) AS w$proctime") ), - term("select", "w$end", "long", "EXPR$2") + term("select", "w$end AS rowtime", "long", "EXPR$2") ) util.verifyTable(result, expected) http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index 47a7341..e672335 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -283,6 +283,123 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { } @Test + def testMultiWindowSqlNoAggregation(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val window1 = tEnv.sqlQuery( + s"""SELECT + TUMBLE_ROWTIME(rowtime, INTERVAL '0.002' SECOND) AS rowtime, + TUMBLE_END(rowtime, INTERVAL '0.002' SECOND) AS endtime + FROM $table + GROUP BY TUMBLE(rowtime, INTERVAL '0.002' SECOND)""") + + val window2 = tEnv.sqlQuery( + s"""SELECT + TUMBLE_ROWTIME(rowtime, INTERVAL '0.004' SECOND), + TUMBLE_END(rowtime, INTERVAL '0.004' SECOND) + FROM $window1 + GROUP BY TUMBLE(rowtime, INTERVAL '0.004' SECOND)""") + + val results = window2.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.003,1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007,1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.011,1970-01-01 00:00:00.012", + "1970-01-01 00:00:00.019,1970-01-01 00:00:00.02" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testMultiWindowSqlWithAggregation(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val window = tEnv.sqlQuery( + s"""SELECT + TUMBLE_ROWTIME(rowtime, INTERVAL '0.004' SECOND), + TUMBLE_END(rowtime, INTERVAL '0.004' SECOND), + COUNT(`int`) AS `int` + FROM ( + SELECT + COUNT(`int`) AS `int`, + TUMBLE_ROWTIME(rowtime, INTERVAL '0.002' SECOND) AS `rowtime` + FROM $table + GROUP BY TUMBLE(rowtime, INTERVAL '0.002' SECOND) + ) + GROUP BY TUMBLE(rowtime, INTERVAL '0.004' SECOND)""") + + val results = window.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.003,1970-01-01 00:00:00.004,2", + "1970-01-01 00:00:00.007,1970-01-01 00:00:00.008,2", + "1970-01-01 00:00:00.011,1970-01-01 00:00:00.012,1", + "1970-01-01 00:00:00.019,1970-01-01 00:00:00.02,1" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testMultiWindowSqlWithAggregation2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime1.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val window = tEnv.sqlQuery( + s"""SELECT + TUMBLE_ROWTIME(rowtime2, INTERVAL '0.004' SECOND), + TUMBLE_END(rowtime2, INTERVAL '0.004' SECOND), + COUNT(`int`) as `int` + FROM ( + SELECT + TUMBLE_ROWTIME(rowtime1, INTERVAL '0.002' SECOND) AS rowtime2, + COUNT(`int`) as `int` + FROM $table + GROUP BY TUMBLE(rowtime1, INTERVAL '0.002' SECOND) + ) + GROUP BY TUMBLE(rowtime2, INTERVAL '0.004' SECOND)""") + + val results = window.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.003,1970-01-01 00:00:00.004,2", + "1970-01-01 00:00:00.007,1970-01-01 00:00:00.008,2", + "1970-01-01 00:00:00.011,1970-01-01 00:00:00.012,1", + "1970-01-01 00:00:00.019,1970-01-01 00:00:00.02,1" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test def testCalcMaterializationWithPojoType(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) @@ -414,6 +531,35 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testSqlWindowRowtime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + tEnv.registerTable("MyTable", table) + + val t = tEnv.sqlQuery("SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '0.003' SECOND) FROM MyTable " + + "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)") + + val results = t.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.005", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.017" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } } object TimeAttributesITCase {
