Repository: flink
Updated Branches:
  refs/heads/master 7d5100742 -> babee2772


[FLINK-6584] [table] Add SQL group window functions to retrieve time attributes.

This closes #4199.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ad8f7eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ad8f7eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ad8f7eb

Branch: refs/heads/master
Commit: 2ad8f7eb5690755e2a9e7e93181cd34e1093f23a
Parents: 7d51007
Author: twalthr <[email protected]>
Authored: Wed Oct 18 15:28:41 2017 +0200
Committer: Fabian Hueske <[email protected]>
Committed: Wed Oct 25 18:53:29 2017 +0200

----------------------------------------------------------------------
 .../calcite/sql/fun/SqlGroupFunction.java       | 128 +++++++++
 .../calcite/RelTimeIndicatorConverter.scala     |  15 +-
 .../logical/rel/LogicalWindowAggregate.scala    |  17 +-
 .../DataStreamGroupWindowAggregate.scala        |   3 +
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   8 +-
 .../common/LogicalWindowAggregateRule.scala     |  32 ++-
 .../rules/common/WindowPropertiesRule.scala     | 265 +++++++++++++++++++
 .../common/WindowStartEndPropertiesRule.scala   | 184 -------------
 .../DataSetLogicalWindowAggregateRule.scala     |  17 +-
 .../DataStreamLogicalWindowAggregateRule.scala  |  15 +-
 .../table/runtime/aggregate/AggregateUtil.scala |  21 +-
 .../flink/table/validate/FunctionCatalog.scala  |  91 ++++++-
 .../table/api/batch/sql/GroupWindowTest.scala   |  11 +-
 .../validation/GroupWindowValidationTest.scala  |  30 +++
 .../table/api/stream/sql/GroupWindowTest.scala  | 147 +++++++---
 .../plan/TimeIndicatorConversionTest.scala      |  10 +-
 .../runtime/stream/TimeAttributesITCase.scala   | 146 ++++++++++
 17 files changed, 862 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
new file mode 100644
index 0000000..fd5ddf9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.fun;
+
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL 
CALCITE-1867 IS FIXED.
+ */
+
+/**
+ * SQL function that computes keys by which rows can be partitioned and
+ * aggregated.
+ *
+ * <p>Grouped window functions always occur in the GROUP BY clause. They often
+ * have auxiliary functions that access information about the group. For
+ * example, {@code HOP} is a group function, and its auxiliary functions are
+ * {@code HOP_START} and {@code HOP_END}. Here they are used in a streaming
+ * query:
+ *
+ * <blockquote><pre>
+ * SELECT STREAM HOP_START(rowtime, INTERVAL '1' HOUR),
+ *   HOP_END(rowtime, INTERVAL '1' HOUR),
+ *   MIN(unitPrice)
+ * FROM Orders
+ * GROUP BY HOP(rowtime, INTERVAL '1' HOUR), productId
+ * </pre></blockquote>
+ */
+public class SqlGroupFunction extends SqlFunction {
+  /** The grouped function, if this an auxiliary function; null otherwise. */
+  final SqlGroupFunction groupFunction;
+
+  /** Creates a SqlGroupFunction.
+   *
+   * @param name Function name
+   * @param kind Kind
+   * @param groupFunction Group function, if this is an auxiliary;
+   *                      null, if this is a group function
+   * @param operandTypeChecker Operand type checker
+   */
+  public SqlGroupFunction(String name, SqlKind kind, SqlGroupFunction 
groupFunction,
+      SqlOperandTypeChecker operandTypeChecker) {
+    super(name, kind, ReturnTypes.ARG0, null,
+        operandTypeChecker, SqlFunctionCategory.SYSTEM);
+    this.groupFunction = groupFunction;
+    if (groupFunction != null) {
+      assert groupFunction.groupFunction == null;
+    }
+  }
+
+  /** Creates a SqlGroupFunction.
+   *
+   * @param kind Kind; also determines function name
+   * @param groupFunction Group function, if this is an auxiliary;
+   *                      null, if this is a group function
+   * @param operandTypeChecker Operand type checker
+   */
+  public SqlGroupFunction(SqlKind kind, SqlGroupFunction groupFunction,
+      SqlOperandTypeChecker operandTypeChecker) {
+    this(kind.name(), kind, groupFunction, operandTypeChecker);
+  }
+
+  /** Creates an auxiliary function from this grouped window function.
+   *
+   * @param kind Kind; also determines function name
+   */
+  public SqlGroupFunction auxiliary(SqlKind kind) {
+    return auxiliary(kind.name(), kind);
+  }
+
+  /** Creates an auxiliary function from this grouped window function.
+   *
+   * @param name Function name
+   * @param kind Kind
+   */
+  public SqlGroupFunction auxiliary(String name, SqlKind kind) {
+    return new SqlGroupFunction(name, kind, this, getOperandTypeChecker());
+  }
+
+  /** Returns a list of this grouped window function's auxiliary functions. */
+  public List<SqlGroupFunction> getAuxiliaryFunctions() {
+    return ImmutableList.of();
+  }
+
+  @Override public boolean isGroup() {
+    // Auxiliary functions are not group functions
+    return groupFunction == null;
+  }
+
+  @Override public boolean isGroupAuxiliary() {
+    return groupFunction != null;
+  }
+
+  @Override public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) {
+    // Monotonic iff its first argument is, but not strict.
+    //
+    // Note: This strategy happens to works for all current group functions
+    // (HOP, TUMBLE, SESSION). When there are exceptions to this rule, we'll
+    // make the method abstract.
+    return call.getOperandMonotonicity(0).unstrict();
+  }
+}
+
+// End SqlGroupFunction.java

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 1f88737..4f3fbaa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType,
 import org.apache.flink.table.functions.sql.ProctimeSqlFunction
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.validate.BasicOperatorTable
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -400,7 +401,7 @@ class RexTimeIndicatorMaterializer(
     val materializedOperands = updatedCall.getOperator match {
 
       // skip materialization for special operators
-      case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | 
SqlStdOperatorTable.TUMBLE =>
+      case BasicOperatorTable.SESSION | BasicOperatorTable.HOP | 
BasicOperatorTable.TUMBLE =>
         updatedCall.getOperands.toList
 
       case _ =>
@@ -427,6 +428,18 @@ class RexTimeIndicatorMaterializer(
       isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
         updatedCall
 
+      // do not modify window time attributes
+      case BasicOperatorTable.TUMBLE_ROWTIME |
+          BasicOperatorTable.TUMBLE_PROCTIME |
+          BasicOperatorTable.HOP_ROWTIME |
+          BasicOperatorTable.HOP_PROCTIME |
+          BasicOperatorTable.SESSION_ROWTIME |
+          BasicOperatorTable.SESSION_PROCTIME
+          // since we materialize groupings on time indicators,
+          // we cannot check the operands anymore but the return type at least
+          if isTimeIndicatorType(updatedCall.getType) =>
+      updatedCall
+
       // materialize function's result and operands
       case _ if isTimeIndicatorType(updatedCall.getType) =>
         updatedCall.clone(timestamp, materializedOperands)

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
index 81f6bf0..6424828 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
@@ -35,11 +35,11 @@ class LogicalWindowAggregate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     child: RelNode,
-    indicator: Boolean,
+    indicatorFlag: Boolean,
     groupSet: ImmutableBitSet,
     groupSets: util.List[ImmutableBitSet],
     aggCalls: util.List[AggregateCall])
-  extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, 
aggCalls) {
+  extends Aggregate(cluster, traitSet, child, indicatorFlag, groupSet, 
groupSets, aggCalls) {
 
   def getWindow: LogicalWindow = window
 
@@ -66,6 +66,19 @@ class LogicalWindowAggregate(
       aggCalls)
   }
 
+  def copy(namedProperties: Seq[NamedWindowProperty]): LogicalWindowAggregate 
= {
+    new LogicalWindowAggregate(
+      window,
+      namedProperties,
+      cluster,
+      traitSet,
+      input,
+      indicator,
+      getGroupSet,
+      getGroupSets,
+      aggCalls)
+  }
+
   override def accept(shuttle: RelShuttle): RelNode = shuttle.visit(this)
 
   override def deriveRowType(): RelDataType = {

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 267bc3b..d527dc8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -146,6 +146,9 @@ class DataStreamGroupWindowAggregate(
       // copy the window rowtime attribute into the StreamRecord timestamp 
field
       val timeAttribute = 
window.timeAttribute.asInstanceOf[ResolvedFieldReference].name
       val timeIdx = inputSchema.fieldNames.indexOf(timeAttribute)
+      if (timeIdx < 0) {
+        throw TableException("Time attribute could not be found. This is a 
bug.")
+      }
 
       inputDS
         .process(

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 073a8cc..36fdc6c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -147,8 +147,8 @@ object FlinkRuleSets {
 
     // Transform window to LogicalWindowAggregate
     DataSetLogicalWindowAggregateRule.INSTANCE,
-    WindowStartEndPropertiesRule.INSTANCE,
-    WindowStartEndPropertiesHavingRule.INSTANCE
+    WindowPropertiesRule.INSTANCE,
+    WindowPropertiesHavingRule.INSTANCE
   )
 
   /**
@@ -179,8 +179,8 @@ object FlinkRuleSets {
   val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
     // Transform window to LogicalWindowAggregate
     DataStreamLogicalWindowAggregateRule.INSTANCE,
-    WindowStartEndPropertiesRule.INSTANCE,
-    WindowStartEndPropertiesHavingRule.INSTANCE,
+    WindowPropertiesRule.INSTANCE,
+    WindowPropertiesHavingRule.INSTANCE,
 
     // simplify expressions rules
     ReduceExpressionsRule.FILTER_INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
index 34433f9..927700b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
@@ -23,11 +23,11 @@ import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
 import org.apache.calcite.rex._
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.util.ImmutableBitSet
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+import org.apache.flink.table.validate.BasicOperatorTable
 
 import _root_.scala.collection.JavaConversions._
 
@@ -65,20 +65,28 @@ abstract class LogicalWindowAggregateRule(ruleName: String)
     val (windowExpr, windowExprIdx) = getWindowExpressions(agg).head
     val window = translateWindowExpression(windowExpr, 
project.getInput.getRowType)
 
-    val builder = call.builder()
-    val rexBuilder = builder.getRexBuilder
+    val rexBuilder = call.builder().getRexBuilder
 
     val inAggGroupExpression = getInAggregateGroupExpression(rexBuilder, 
windowExpr)
+
     val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
-    val newAgg = builder
+
+    val builder = call.builder()
+
+    val newProject = builder
       .push(project.getInput)
       .project(project.getChildExps.updated(windowExprIdx, 
inAggGroupExpression))
-      .aggregate(builder.groupKey(
-        newGroupSet,
-        agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
-      .build().asInstanceOf[LogicalAggregate]
+      .build()
+
+    // we don't use the builder here because it uses RelMetadataQuery which 
affects the plan
+    val newAgg = LogicalAggregate.create(
+      newProject,
+      agg.indicator,
+      newGroupSet,
+      ImmutableList.of(newGroupSet),
+      agg.getAggCallList)
 
-    // Create an additional project to conform with types
+    // create an additional project to conform with types
     val outAggGroupExpression = getOutAggregateGroupExpression(rexBuilder, 
windowExpr)
     val transformed = call.builder()
     transformed.push(LogicalWindowAggregate.create(
@@ -103,19 +111,19 @@ abstract class LogicalWindowAggregateRule(ruleName: 
String)
       g._1 match {
         case call: RexCall =>
           call.getOperator match {
-            case SqlStdOperatorTable.TUMBLE =>
+            case BasicOperatorTable.TUMBLE =>
               if (call.getOperands.size() == 2) {
                 true
               } else {
                 throw TableException("TUMBLE window with alignment is not 
supported yet.")
               }
-            case SqlStdOperatorTable.HOP =>
+            case BasicOperatorTable.HOP =>
               if (call.getOperands.size() == 3) {
                 true
               } else {
                 throw TableException("HOP window with alignment is not 
supported yet.")
               }
-            case SqlStdOperatorTable.SESSION =>
+            case BasicOperatorTable.SESSION =>
               if (call.getOperands.size() == 2) {
                 true
               } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala
new file mode 100644
index 0000000..c228528
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject}
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical.LogicalWindow
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+import org.apache.flink.table.validate.BasicOperatorTable
+
+import scala.collection.JavaConversions._
+
+abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, 
ruleName: String)
+  extends RelOptRule(rulePredicate, ruleName) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val project = call.rel(0).asInstanceOf[LogicalProject]
+    // project includes at least on group auxiliary function
+
+    def hasGroupAuxiliaries(node: RexNode): Boolean = {
+      node match {
+        case c: RexCall if c.getOperator.isGroupAuxiliary => true
+        case c: RexCall =>
+          c.operands.exists(hasGroupAuxiliaries)
+        case _ => false
+      }
+    }
+
+    project.getProjects.exists(hasGroupAuxiliaries)
+  }
+
+  def convertWindowNodes(
+      builder: RelBuilder,
+      project: LogicalProject,
+      filter: Option[LogicalFilter],
+      innerProject: LogicalProject,
+      agg: LogicalWindowAggregate): RelNode = {
+
+    val w = agg.getWindow
+
+    val isRowtime = ExpressionUtils.isRowtimeAttribute(w.timeAttribute)
+    val isProctime = ExpressionUtils.isProctimeAttribute(w.timeAttribute)
+
+    val startEndProperties = Seq(
+      NamedWindowProperty(propertyName(w, "start"), 
WindowStart(w.aliasAttribute)),
+      NamedWindowProperty(propertyName(w, "end"), WindowEnd(w.aliasAttribute)))
+
+    // allow rowtime/proctime for rowtime windows and proctime for proctime 
windows
+    val timeProperties = if (isRowtime) {
+      Seq(
+        NamedWindowProperty(propertyName(w, "rowtime"), 
RowtimeAttribute(w.aliasAttribute)),
+        NamedWindowProperty(propertyName(w, "proctime"), 
ProctimeAttribute(w.aliasAttribute)))
+    } else if (isProctime) {
+      Seq(NamedWindowProperty(propertyName(w, "proctime"), 
ProctimeAttribute(w.aliasAttribute)))
+    } else {
+      Seq()
+    }
+
+    val properties = startEndProperties ++ timeProperties
+
+    // retrieve window properties
+    builder.push(agg.copy(properties))
+
+    // forward window properties
+    builder.project(
+      innerProject.getProjects ++ properties.map(np => builder.field(np.name)))
+
+    // replace window auxiliary functions in filter by access to window 
properties
+    filter.foreach { f =>
+      builder.filter(
+        f.getChildExps.map(expr => replaceGroupAuxiliaries(expr, w, builder))
+      )
+    }
+
+    // replace window auxiliary functions in projection by access to window 
properties
+    builder.project(
+      project.getProjects.map(expr => replaceGroupAuxiliaries(expr, w, 
builder)),
+      project.getRowType.getFieldNames
+    )
+
+    builder.build()
+  }
+
+  /** Generates a property name for a window. */
+  private def propertyName(window: LogicalWindow, name: String): String = {
+    window.aliasAttribute.asInstanceOf[WindowReference].name + name
+  }
+
+  /** Replace group auxiliaries with field references. */
+  private def replaceGroupAuxiliaries(
+      node: RexNode,
+      window: LogicalWindow,
+      builder: RelBuilder): RexNode = {
+
+    val rexBuilder = builder.getRexBuilder
+
+    val isRowtime = ExpressionUtils.isRowtimeAttribute(window.timeAttribute)
+    val isProctime = ExpressionUtils.isProctimeAttribute(window.timeAttribute)
+
+    node match {
+      case c: RexCall if isWindowStart(c) =>
+        // replace expression by access to window start
+        rexBuilder.makeCast(c.getType, builder.field(propertyName(window, 
"start")), false)
+
+      case c: RexCall if isWindowEnd(c) =>
+        // replace expression by access to window end
+        rexBuilder.makeCast(c.getType, builder.field(propertyName(window, 
"end")), false)
+
+      case c: RexCall if isWindowRowtime(c) =>
+        if (isProctime) {
+          throw ValidationException("A proctime window cannot provide a 
rowtime attribute.")
+        } else if (isRowtime) {
+          // replace expression by access to window rowtime
+          builder.field(propertyName(window, "rowtime"))
+        } else {
+          throw TableException("Accessing the rowtime attribute of a window is 
not yet " +
+            "supported in a batch environment.")
+        }
+
+      case c: RexCall if isWindowProctime(c) =>
+        if (isProctime || isRowtime) {
+          // replace expression by access to window proctime
+          builder.field(propertyName(window, "proctime"))
+        } else {
+          throw ValidationException("Proctime is not supported in a batch 
environment.")
+        }
+
+      case c: RexCall =>
+        // replace expressions in children
+        val newOps = c.getOperands.map(replaceGroupAuxiliaries(_, window, 
builder))
+        c.clone(c.getType, newOps)
+
+      case x =>
+        // preserve expression
+        x
+    }
+  }
+
+  /** Checks if a RexNode is a window start auxiliary function. */
+  private def isWindowStart(node: RexNode): Boolean = {
+    node match {
+      case n: RexCall if n.getOperator.isGroupAuxiliary =>
+        n.getOperator match {
+          case BasicOperatorTable.TUMBLE_START |
+               BasicOperatorTable.HOP_START |
+               BasicOperatorTable.SESSION_START
+          => true
+          case _ => false
+        }
+      case _ => false
+    }
+  }
+
+  /** Checks if a RexNode is a window end auxiliary function. */
+  private def isWindowEnd(node: RexNode): Boolean = {
+    node match {
+      case n: RexCall if n.getOperator.isGroupAuxiliary =>
+        n.getOperator match {
+          case BasicOperatorTable.TUMBLE_END |
+               BasicOperatorTable.HOP_END |
+               BasicOperatorTable.SESSION_END
+          => true
+          case _ => false
+        }
+      case _ => false
+    }
+  }
+
+  /** Checks if a RexNode is a window rowtime auxiliary function. */
+  private def isWindowRowtime(node: RexNode): Boolean = {
+    node match {
+      case n: RexCall if n.getOperator.isGroupAuxiliary =>
+        n.getOperator match {
+          case BasicOperatorTable.TUMBLE_ROWTIME |
+               BasicOperatorTable.HOP_ROWTIME |
+               BasicOperatorTable.SESSION_ROWTIME
+            => true
+          case _ => false
+        }
+      case _ => false
+    }
+  }
+
+  /** Checks if a RexNode is a window proctime auxiliary function. */
+  private def isWindowProctime(node: RexNode): Boolean = {
+    node match {
+      case n: RexCall if n.getOperator.isGroupAuxiliary =>
+        n.getOperator match {
+          case BasicOperatorTable.TUMBLE_PROCTIME |
+               BasicOperatorTable.HOP_PROCTIME |
+               BasicOperatorTable.SESSION_PROCTIME
+            => true
+          case _ => false
+        }
+      case _ => false
+    }
+  }
+}
+
+object WindowPropertiesRule {
+
+  val INSTANCE = new WindowPropertiesBaseRule(
+    RelOptRule.operand(classOf[LogicalProject],
+      RelOptRule.operand(classOf[LogicalProject],
+        RelOptRule.operand(classOf[LogicalWindowAggregate], 
RelOptRule.none()))),
+    "WindowPropertiesRule") {
+
+    override def onMatch(call: RelOptRuleCall): Unit = {
+
+      val project = call.rel(0).asInstanceOf[LogicalProject]
+      val innerProject = call.rel(1).asInstanceOf[LogicalProject]
+      val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+
+      val converted = convertWindowNodes(call.builder(), project, None, 
innerProject, agg)
+
+      call.transformTo(converted)
+    }
+  }
+}
+
+object WindowPropertiesHavingRule {
+
+  val INSTANCE = new WindowPropertiesBaseRule(
+    RelOptRule.operand(classOf[LogicalProject],
+      RelOptRule.operand(classOf[LogicalFilter],
+        RelOptRule.operand(classOf[LogicalProject],
+          RelOptRule.operand(classOf[LogicalWindowAggregate], 
RelOptRule.none())))),
+    "WindowPropertiesHavingRule") {
+
+    override def onMatch(call: RelOptRuleCall): Unit = {
+
+      val project = call.rel(0).asInstanceOf[LogicalProject]
+      val filter = call.rel(1).asInstanceOf[LogicalFilter]
+      val innerProject = call.rel(2).asInstanceOf[LogicalProject]
+      val agg = call.rel(3).asInstanceOf[LogicalWindowAggregate]
+
+      val converted = convertWindowNodes(call.builder(), project, 
Some(filter), innerProject, agg)
+
+      call.transformTo(converted)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
deleted file mode 100644
index 33190e6..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.common
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject}
-import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.expressions.{WindowEnd, WindowStart}
-import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
-
-import scala.collection.JavaConversions._
-
-abstract class WindowStartEndPropertiesBaseRule(rulePredicate: 
RelOptRuleOperand, ruleName: String)
-  extends RelOptRule(rulePredicate, ruleName) {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val project = call.rel(0).asInstanceOf[LogicalProject]
-    // project includes at least on group auxiliary function
-
-    def hasGroupAuxiliaries(node: RexNode): Boolean = {
-      node match {
-        case c: RexCall if c.getOperator.isGroupAuxiliary => true
-        case c: RexCall =>
-          c.operands.exists(hasGroupAuxiliaries)
-        case _ => false
-      }
-    }
-
-    project.getProjects.exists(hasGroupAuxiliaries)
-  }
-
-  def replaceGroupAuxiliaries(node: RexNode, relBuilder: RelBuilder): RexNode 
= {
-    val rexBuilder = relBuilder.getRexBuilder
-    node match {
-      case c: RexCall if isWindowStart(c) =>
-        // replace expression by access to window start
-        rexBuilder.makeCast(c.getType, relBuilder.field("w$start"), false)
-      case c: RexCall if isWindowEnd(c) =>
-        // replace expression by access to window end
-        rexBuilder.makeCast(c.getType, relBuilder.field("w$end"), false)
-      case c: RexCall =>
-        // replace expressions in children
-        val newOps = c.getOperands.map(x => replaceGroupAuxiliaries(x, 
relBuilder))
-        c.clone(c.getType, newOps)
-      case x =>
-        // preserve expression
-        x
-    }
-  }
-
-  /** Checks if a RexNode is a window start auxiliary function. */
-  private def isWindowStart(node: RexNode): Boolean = {
-    node match {
-      case n: RexCall if n.getOperator.isGroupAuxiliary =>
-        n.getOperator match {
-          case SqlStdOperatorTable.TUMBLE_START |
-               SqlStdOperatorTable.HOP_START |
-               SqlStdOperatorTable.SESSION_START
-          => true
-          case _ => false
-        }
-      case _ => false
-    }
-  }
-
-  /** Checks if a RexNode is a window end auxiliary function. */
-  private def isWindowEnd(node: RexNode): Boolean = {
-    node match {
-      case n: RexCall if n.getOperator.isGroupAuxiliary =>
-        n.getOperator match {
-          case SqlStdOperatorTable.TUMBLE_END |
-               SqlStdOperatorTable.HOP_END |
-               SqlStdOperatorTable.SESSION_END
-          => true
-          case _ => false
-        }
-      case _ => false
-    }
-  }
-}
-
-object WindowStartEndPropertiesRule {
-
-  val INSTANCE = new WindowStartEndPropertiesBaseRule(
-    RelOptRule.operand(classOf[LogicalProject],
-      RelOptRule.operand(classOf[LogicalProject],
-        RelOptRule.operand(classOf[LogicalWindowAggregate], 
RelOptRule.none()))),
-    "WindowStartEndPropertiesRule") {
-
-    override def onMatch(call: RelOptRuleCall): Unit = {
-
-      val project = call.rel(0).asInstanceOf[LogicalProject]
-      val innerProject = call.rel(1).asInstanceOf[LogicalProject]
-      val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
-
-      // Retrieve window start and end properties
-      val builder = call.builder()
-      builder.push(LogicalWindowAggregate.create(
-        agg.getWindow,
-        Seq(
-          NamedWindowProperty("w$start", 
WindowStart(agg.getWindow.aliasAttribute)),
-          NamedWindowProperty("w$end", 
WindowEnd(agg.getWindow.aliasAttribute))),
-        agg)
-      )
-
-      // forward window start and end properties
-      builder.project(
-        innerProject.getProjects ++ Seq(builder.field("w$start"), 
builder.field("w$end")))
-
-      // replace window auxiliary function by access to window properties
-      builder.project(
-        project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
-      )
-      val res = builder.build()
-      call.transformTo(res)
-    }
-  }
-}
-
-object WindowStartEndPropertiesHavingRule {
-
-  val INSTANCE = new WindowStartEndPropertiesBaseRule(
-    RelOptRule.operand(classOf[LogicalProject],
-      RelOptRule.operand(classOf[LogicalFilter],
-        RelOptRule.operand(classOf[LogicalProject],
-          RelOptRule.operand(classOf[LogicalWindowAggregate], 
RelOptRule.none())))),
-    "WindowStartEndPropertiesHavingRule") {
-
-    override def onMatch(call: RelOptRuleCall): Unit = {
-
-      val project = call.rel(0).asInstanceOf[LogicalProject]
-      val filter = call.rel(1).asInstanceOf[LogicalFilter]
-      val innerProject = call.rel(2).asInstanceOf[LogicalProject]
-      val agg = call.rel(3).asInstanceOf[LogicalWindowAggregate]
-
-      // Retrieve window start and end properties
-      val builder = call.builder()
-      builder.push(LogicalWindowAggregate.create(
-        agg.getWindow,
-        Seq(
-          NamedWindowProperty("w$start", 
WindowStart(agg.getWindow.aliasAttribute)),
-          NamedWindowProperty("w$end", 
WindowEnd(agg.getWindow.aliasAttribute))),
-        agg)
-      )
-
-      // forward window start and end properties
-      builder.project(
-        innerProject.getProjects ++ Seq(builder.field("w$start"), 
builder.field("w$end")))
-
-      // replace window auxiliary function by access to window properties
-      builder.filter(
-        filter.getChildExps.map(expr => replaceGroupAuxiliaries(expr, builder))
-      )
-
-      // replace window auxiliary function by access to window properties
-      builder.project(
-        project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
-      )
-
-      val res = builder.build()
-      call.transformTo(res)
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
index 883f5ae..129e0d3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
@@ -19,15 +19,16 @@
 package org.apache.flink.table.plan.rules.dataSet
 
 import java.math.BigDecimal
+
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
 import org.apache.flink.table.api.{TableException, Window}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference, WindowReference}
 import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.table.validate.BasicOperatorTable
 
 class DataSetLogicalWindowAggregateRule
   extends LogicalWindowAggregateRule("DataSetLogicalWindowAggregateRule") {
@@ -67,22 +68,22 @@ class DataSetLogicalWindowAggregateRule
     }
 
     windowExpr.getOperator match {
-      case SqlStdOperatorTable.TUMBLE =>
+      case BasicOperatorTable.TUMBLE =>
         val interval = getOperandAsLong(windowExpr, 1)
         val w = Tumble.over(Literal(interval, 
TimeIntervalTypeInfo.INTERVAL_MILLIS))
-        w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")
+        
w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$"))
 
-      case SqlStdOperatorTable.HOP =>
+      case BasicOperatorTable.HOP =>
         val (slide, size) = (getOperandAsLong(windowExpr, 1), 
getOperandAsLong(windowExpr, 2))
         val w = Slide
           .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
           .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-        w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")
+        
w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$"))
 
-      case SqlStdOperatorTable.SESSION =>
+      case BasicOperatorTable.SESSION =>
         val gap = getOperandAsLong(windowExpr, 1)
         val w = Session.withGap(Literal(gap, 
TimeIntervalTypeInfo.INTERVAL_MILLIS))
-        w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")
+        
w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$"))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index 050e2cd..eaad885 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -25,11 +25,12 @@ import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
-import org.apache.flink.table.api.{TableException, Window}
+import org.apache.flink.table.api.{TableException, ValidationException, Window}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.expressions.{Literal, ResolvedFieldReference, 
WindowReference}
 import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.table.validate.BasicOperatorTable
 
 class DataStreamLogicalWindowAggregateRule
   extends LogicalWindowAggregateRule("DataStreamLogicalWindowAggregateRule") {
@@ -47,7 +48,7 @@ class DataStreamLogicalWindowAggregateRule
 
       case _ =>
         throw TableException(
-          s"""Time attribute expected but ${timeAttribute.getType} 
encountered.""")
+          s"Time attribute expected but ${timeAttribute.getType} encountered.")
     }
   }
 
@@ -69,7 +70,7 @@ class DataStreamLogicalWindowAggregateRule
     def getOperandAsLong(call: RexCall, idx: Int): Long =
       call.getOperands.get(idx) match {
         case v: RexLiteral => v.getValue.asInstanceOf[JBigDecimal].longValue()
-        case _ => throw new TableException("Only constant window descriptors 
are supported.")
+        case _ => throw TableException("Only constant window descriptors are 
supported.")
       }
 
     def getOperandAsTimeIndicator(call: RexCall, idx: Int): 
ResolvedFieldReference =
@@ -79,18 +80,18 @@ class DataStreamLogicalWindowAggregateRule
             rowType.getFieldList.get(v.getIndex).getName,
             FlinkTypeFactory.toTypeInfo(v.getType))
         case _ =>
-          throw new TableException("Window can only be defined over a time 
attribute column.")
+          throw ValidationException("Window can only be defined over a time 
attribute column.")
       }
 
     windowExpr.getOperator match {
-      case SqlStdOperatorTable.TUMBLE =>
+      case BasicOperatorTable.TUMBLE =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
         val interval = getOperandAsLong(windowExpr, 1)
         val w = Tumble.over(Literal(interval, 
TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
         w.on(time).as(WindowReference("w$"))
 
-      case SqlStdOperatorTable.HOP =>
+      case BasicOperatorTable.HOP =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
         val (slide, size) = (getOperandAsLong(windowExpr, 1), 
getOperandAsLong(windowExpr, 2))
         val w = Slide
@@ -99,7 +100,7 @@ class DataStreamLogicalWindowAggregateRule
 
         w.on(time).as(WindowReference("w$"))
 
-      case SqlStdOperatorTable.SESSION =>
+      case BasicOperatorTable.SESSION =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
         val gap = getOperandAsLong(windowExpr, 1)
         val w = Session.withGap(Literal(gap, 
TimeIntervalTypeInfo.INTERVAL_MILLIS))

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 2efd13d..ce13cdc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -1145,27 +1145,32 @@ object AggregateUtil {
     }
   }
 
+  /**
+    * Computes the positions of (window start, window end, rowtime).
+    */
   private[flink] def computeWindowPropertyPos(
       properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int], 
Option[Int]) = {
 
     val propPos = properties.foldRight(
       (None: Option[Int], None: Option[Int], None: Option[Int], 0)) {
-      case (p, (s, e, t, i)) => p match {
+      case (p, (s, e, rt, i)) => p match {
         case NamedWindowProperty(_, prop) =>
           prop match {
             case WindowStart(_) if s.isDefined =>
-              throw new TableException("Duplicate WindowStart property 
encountered. This is a bug.")
+              throw TableException("Duplicate window start property 
encountered. This is a bug.")
             case WindowStart(_) =>
-              (Some(i), e, t, i - 1)
+              (Some(i), e, rt, i - 1)
             case WindowEnd(_) if e.isDefined =>
-              throw new TableException("Duplicate WindowEnd property 
encountered. This is a bug.")
+              throw TableException("Duplicate window end property encountered. 
This is a bug.")
             case WindowEnd(_) =>
-              (s, Some(i), t, i - 1)
-            case RowtimeAttribute(_) if t.isDefined =>
-              throw new TableException(
-                "Duplicate Window rowtime property encountered. This is a 
bug.")
+              (s, Some(i), rt, i - 1)
+            case RowtimeAttribute(_) if rt.isDefined =>
+              throw TableException("Duplicate window rowtime property 
encountered. This is a bug.")
             case RowtimeAttribute(_) =>
               (s, e, Some(i), i - 1)
+            case ProctimeAttribute(_) =>
+              // ignore this property, it will be null at the position later
+              (s, e, rt, i - 1)
           }
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 3398a93..6c6be0b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -18,9 +18,10 @@
 
 package org.apache.flink.table.validate
 
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.`type`.OperandTypes
+import org.apache.calcite.sql.fun.{SqlGroupFunction, SqlStdOperatorTable}
 import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, 
ListSqlOperatorTable, ReflectiveSqlOperatorTable}
-import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlOperator, 
SqlOperatorTable}
 import org.apache.flink.table.api._
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.sql.{DateTimeSqlFunction, 
ScalarSqlFunctions}
@@ -401,16 +402,84 @@ class BasicOperatorTable extends 
ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.LOG,
 
     // EXTENSIONS
-    SqlStdOperatorTable.TUMBLE,
-    SqlStdOperatorTable.TUMBLE_START,
-    SqlStdOperatorTable.TUMBLE_END,
-    SqlStdOperatorTable.HOP,
-    SqlStdOperatorTable.HOP_START,
-    SqlStdOperatorTable.HOP_END,
-    SqlStdOperatorTable.SESSION,
-    SqlStdOperatorTable.SESSION_START,
-    SqlStdOperatorTable.SESSION_END
+    BasicOperatorTable.TUMBLE,
+    BasicOperatorTable.HOP,
+    BasicOperatorTable.SESSION,
+    BasicOperatorTable.TUMBLE_START,
+    BasicOperatorTable.TUMBLE_END,
+    BasicOperatorTable.HOP_START,
+    BasicOperatorTable.HOP_END,
+    BasicOperatorTable.SESSION_START,
+    BasicOperatorTable.SESSION_END,
+    BasicOperatorTable.TUMBLE_PROCTIME,
+    BasicOperatorTable.TUMBLE_ROWTIME,
+    BasicOperatorTable.HOP_PROCTIME,
+    BasicOperatorTable.HOP_ROWTIME,
+    BasicOperatorTable.SESSION_PROCTIME,
+    BasicOperatorTable.SESSION_ROWTIME
   )
 
   builtInSqlOperators.foreach(register)
 }
+
+object BasicOperatorTable {
+
+  /**
+    * We need custom group auxiliary functions in order to support nested 
windows.
+    */
+
+  val TUMBLE: SqlGroupFunction = new SqlGroupFunction(
+    SqlKind.TUMBLE,
+    null,
+    OperandTypes.or(OperandTypes.DATETIME_INTERVAL, 
OperandTypes.DATETIME_INTERVAL_TIME)) {
+    override def getAuxiliaryFunctions: 
_root_.java.util.List[SqlGroupFunction] =
+      Seq(
+        TUMBLE_START,
+        TUMBLE_END,
+        TUMBLE_ROWTIME,
+        TUMBLE_PROCTIME)
+  }
+  val TUMBLE_START: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_START)
+  val TUMBLE_END: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_END)
+  val TUMBLE_ROWTIME: SqlGroupFunction =
+    TUMBLE.auxiliary("TUMBLE_ROWTIME", SqlKind.OTHER_FUNCTION)
+  val TUMBLE_PROCTIME: SqlGroupFunction =
+    TUMBLE.auxiliary("TUMBLE_PROCTIME", SqlKind.OTHER_FUNCTION)
+
+  val HOP: SqlGroupFunction = new SqlGroupFunction(
+    SqlKind.HOP,
+    null,
+    OperandTypes.or(
+      OperandTypes.DATETIME_INTERVAL_INTERVAL,
+      OperandTypes.DATETIME_INTERVAL_INTERVAL_TIME)) {
+    override def getAuxiliaryFunctions: 
_root_.java.util.List[SqlGroupFunction] =
+      Seq(
+        HOP_START,
+        HOP_END,
+        HOP_ROWTIME,
+        HOP_PROCTIME)
+  }
+  val HOP_START: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_START)
+  val HOP_END: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_END)
+  val HOP_ROWTIME: SqlGroupFunction = HOP.auxiliary("HOP_ROWTIME", 
SqlKind.OTHER_FUNCTION)
+  val HOP_PROCTIME: SqlGroupFunction = HOP.auxiliary("HOP_PROCTIME", 
SqlKind.OTHER_FUNCTION)
+
+  val SESSION: SqlGroupFunction = new SqlGroupFunction(
+    SqlKind.SESSION,
+    null,
+    OperandTypes.or(OperandTypes.DATETIME_INTERVAL, 
OperandTypes.DATETIME_INTERVAL_TIME)) {
+    override def getAuxiliaryFunctions: 
_root_.java.util.List[SqlGroupFunction] =
+      Seq(
+        SESSION_START,
+        SESSION_END,
+        SESSION_ROWTIME,
+        SESSION_PROCTIME)
+  }
+  val SESSION_START: SqlGroupFunction = 
SESSION.auxiliary(SqlKind.SESSION_START)
+  val SESSION_END: SqlGroupFunction = SESSION.auxiliary(SqlKind.SESSION_END)
+  val SESSION_ROWTIME: SqlGroupFunction =
+    SESSION.auxiliary("SESSION_ROWTIME", SqlKind.OTHER_FUNCTION)
+  val SESSION_PROCTIME: SqlGroupFunction =
+    SESSION.auxiliary("SESSION_PROCTIME", SqlKind.OTHER_FUNCTION)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
index a78aa8c..cb31866 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.batch.sql
 import java.sql.Timestamp
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.logical._
@@ -79,7 +80,7 @@ class GroupWindowTest extends TableTestBase {
           term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " +
             "start('w$) AS w$start, end('w$) AS w$end")
         ),
-        term("select", "CAST(w$start) AS w$start, CAST(w$end) AS w$end, c, 
sumA, minB")
+        term("select", "CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, c, 
sumA, minB")
       )
 
     util.verifySql(sqlQuery, expected)
@@ -165,7 +166,7 @@ class GroupWindowTest extends TableTestBase {
           term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " +
             "start('w$) AS w$start, end('w$) AS w$end")
         ),
-        term("select", "c, CAST(w$end) AS w$end, CAST(w$start) AS w$start, 
sumA, avgB")
+        term("select", "c, CAST(w$end) AS EXPR$1, CAST(w$start) AS EXPR$2, 
sumA, avgB")
       )
 
     util.verifySql(sqlQuery, expected)
@@ -220,7 +221,7 @@ class GroupWindowTest extends TableTestBase {
           term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " +
             "start('w$) AS w$start, end('w$) AS w$end")
         ),
-        term("select", "c, d, CAST(w$start) AS w$start, CAST(w$end) AS w$end, 
sumA, minB")
+        term("select", "c, d, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3, 
sumA, minB")
       )
 
     util.verifySql(sqlQuery, expected)
@@ -251,7 +252,7 @@ class GroupWindowTest extends TableTestBase {
           term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
           term("select", "c, start('w$) AS w$start, end('w$) AS w$end")
         ),
-        term("select", "CAST(w$end) AS w$end")
+        term("select", "CAST(w$end) AS EXPR$0")
       )
 
     util.verifySql(sqlQuery, expected)
@@ -289,7 +290,7 @@ class GroupWindowTest extends TableTestBase {
             "start('w$) AS w$start",
             "end('w$) AS w$end")
         ),
-        term("select", "EXPR$0", "CAST(w$start) AS w$start"),
+        term("select", "EXPR$0", "CAST(w$start) AS EXPR$1"),
         term("where",
           "AND(>($f1, 0), " +
             "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(CAST(w$start)), 
86400000)), 1))")

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
index 4272170..cbf3029 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
@@ -79,4 +79,34 @@ class GroupWindowValidationTest extends TableTestBase {
       "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)"
     util.verifySql(sql, "n/a")
   }
+
+  @Test(expected = classOf[TableException])
+  def testWindowRowtime(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT " +
+        "  TUMBLE_ROWTIME(ts, INTERVAL '4' MINUTE)" +
+        "FROM T " +
+        "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
+
+    // should fail because ROWTIME properties are not yet supported in batch
+    util.verifySql(sqlQuery, "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWindowProctime(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT " +
+        "  TUMBLE_PROCTIME(ts, INTERVAL '4' MINUTE)" +
+        "FROM T " +
+        "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
+
+    // should fail because PROCTIME properties are not yet supported in batch
+    util.verifySql(sqlQuery, "FAIL")
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
index 722c4f0..e49a63f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
@@ -44,18 +44,24 @@ class GroupWindowTest extends TableTestBase {
         "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
     val expected =
       unaryNode(
-        "DataStreamGroupWindowAggregate",
+        "DataStreamCalc",
         unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "rowtime", "c", "a")
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "rowtime", "c", "a")
+          ),
+          term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+          term("select",
+            "COUNT(*) AS EXPR$0",
+              "weightedAvg(c, a) AS wAvg",
+              "start('w$) AS w$start",
+              "end('w$) AS w$end",
+              "rowtime('w$) AS w$rowtime",
+              "proctime('w$) AS w$proctime")
         ),
-        term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-        term("select",
-          "COUNT(*) AS EXPR$0, " +
-            "weightedAvg(c, a) AS wAvg, " +
-            "start('w$) AS w$start, " +
-            "end('w$) AS w$end")
+        term("select", "EXPR$0", "wAvg", "w$start AS EXPR$2", "w$end AS 
EXPR$3")
       )
     streamUtil.verifySql(sql, expected)
   }
@@ -72,19 +78,25 @@ class GroupWindowTest extends TableTestBase {
         "GROUP BY HOP(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
     val expected =
       unaryNode(
-        "DataStreamGroupWindowAggregate",
+        "DataStreamCalc",
         unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "proctime", "c", "a")
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "proctime", "c", "a")
+          ),
+          term("window", SlidingGroupWindow('w$, 'proctime, 3600000.millis, 
900000.millis)),
+          term("select",
+            "COUNT(*) AS EXPR$0",
+              "weightedAvg(c, a) AS wAvg",
+              "start('w$) AS w$start",
+              "end('w$) AS w$end",
+              "proctime('w$) AS w$proctime")
         ),
-        term("window", SlidingGroupWindow('w$, 'proctime, 3600000.millis, 
900000.millis)),
-        term("select",
-          "COUNT(*) AS EXPR$0, " +
-            "weightedAvg(c, a) AS wAvg, " +
-            "start('w$) AS w$start, " +
-            "end('w$) AS w$end")
+        term("select", "EXPR$0", "wAvg", "w$start AS EXPR$2", "w$end AS 
EXPR$3")
       )
+
     streamUtil.verifySql(sql, expected)
   }
 
@@ -101,19 +113,25 @@ class GroupWindowTest extends TableTestBase {
         "GROUP BY SESSION(proctime, INTERVAL '15' MINUTE)"
     val expected =
       unaryNode(
-        "DataStreamGroupWindowAggregate",
+        "DataStreamCalc",
         unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "proctime", "c", "a")
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "proctime", "c", "a")
+          ),
+          term("window", SessionGroupWindow('w$, 'proctime, 900000.millis)),
+          term("select",
+            "COUNT(*) AS EXPR$0",
+            "weightedAvg(c, a) AS wAvg",
+            "start('w$) AS w$start",
+            "end('w$) AS w$end",
+            "proctime('w$) AS w$proctime")
         ),
-        term("window", SessionGroupWindow('w$, 'proctime, 900000.millis)),
-        term("select",
-          "COUNT(*) AS EXPR$0, " +
-            "weightedAvg(c, a) AS wAvg, " +
-            "start('w$) AS w$start, " +
-            "end('w$) AS w$end")
+        term("select", "EXPR$0", "wAvg", "w$start AS EXPR$2", "w$end AS 
EXPR$3")
       )
+
     streamUtil.verifySql(sql, expected)
   }
 
@@ -136,9 +154,14 @@ class GroupWindowTest extends TableTestBase {
             term("select", "rowtime")
           ),
           term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-          term("select", "COUNT(*) AS EXPR$0", "start('w$) AS w$start", 
"end('w$) AS w$end")
+          term("select",
+            "COUNT(*) AS EXPR$0",
+            "start('w$) AS w$start",
+            "end('w$) AS w$end",
+            "rowtime('w$) AS w$rowtime",
+            "proctime('w$) AS w$proctime")
         ),
-        term("select", "EXPR$0", "DATETIME_PLUS(w$end, 60000) AS $f1")
+        term("select", "EXPR$0", "DATETIME_PLUS(w$end, 60000) AS EXPR$1")
       )
 
     streamUtil.verifySql(sql, expected)
@@ -171,9 +194,11 @@ class GroupWindowTest extends TableTestBase {
             "COUNT(*) AS EXPR$0",
             "SUM(a) AS $f1",
             "start('w$) AS w$start",
-            "end('w$) AS w$end")
+            "end('w$) AS w$end",
+            "rowtime('w$) AS w$rowtime",
+            "proctime('w$) AS w$proctime")
         ),
-        term("select", "EXPR$0", "w$start"),
+        term("select", "EXPR$0", "w$start AS EXPR$1"),
         term("where",
           "AND(>($f1, 0), " +
             "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(w$start), 
86400000)), 1))")
@@ -181,4 +206,58 @@ class GroupWindowTest extends TableTestBase {
 
     streamUtil.verifySql(sql, expected)
   }
+
+  @Test
+  def testMultiWindowSqlWithAggregation() = {
+    val sql =
+      s"""SELECT
+          TUMBLE_ROWTIME(zzzzz, INTERVAL '0.004' SECOND),
+          TUMBLE_END(zzzzz, INTERVAL '0.004' SECOND),
+          COUNT(`a`) AS `a`
+        FROM (
+          SELECT
+            COUNT(`a`) AS `a`,
+            TUMBLE_ROWTIME(rowtime, INTERVAL '0.002' SECOND) AS `zzzzz`
+          FROM MyTable
+          GROUP BY TUMBLE(rowtime, INTERVAL '0.002' SECOND)
+        )
+        GROUP BY TUMBLE(zzzzz, INTERVAL '0.004' SECOND)"""
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            unaryNode(
+              "DataStreamGroupWindowAggregate",
+              unaryNode(
+                "DataStreamCalc",
+                streamTableNode(0),
+                term("select", "rowtime, a")
+              ),
+              term("window", TumblingGroupWindow('w$, 'rowtime, 2.millis)),
+              term("select",
+                "COUNT(a) AS a",
+                "start('w$) AS w$start",
+                "end('w$) AS w$end",
+                "rowtime('w$) AS w$rowtime",
+                "proctime('w$) AS w$proctime")
+            ),
+            term("select", "a", "w$rowtime AS zzzzz")
+          ),
+          term("window", TumblingGroupWindow('w$, 'zzzzz, 4.millis)),
+          term("select",
+            "COUNT(*) AS a",
+            "start('w$) AS w$start",
+            "end('w$) AS w$end",
+            "rowtime('w$) AS w$rowtime",
+            "proctime('w$) AS w$proctime")
+        ),
+        term("select", "w$rowtime AS EXPR$0", "w$end AS EXPR$1", "a")
+      )
+
+    streamUtil.verifySql(sql, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 1714ec8..009ae40 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -343,9 +343,15 @@ class TimeIndicatorConversionTest extends TableTestBase {
             WindowReference("w$"),
             'rowtime,
             100.millis)),
-        term("select", "long", "SUM(int) AS EXPR$2", "start('w$) AS w$start", 
"end('w$) AS w$end")
+        term("select",
+          "long",
+          "SUM(int) AS EXPR$2",
+          "start('w$) AS w$start",
+          "end('w$) AS w$end",
+          "rowtime('w$) AS w$rowtime",
+          "proctime('w$) AS w$proctime")
       ),
-      term("select", "w$end", "long", "EXPR$2")
+      term("select", "w$end AS rowtime", "long", "EXPR$2")
     )
 
     util.verifyTable(result, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/2ad8f7eb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 47a7341..e672335 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -283,6 +283,123 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
   }
 
   @Test
+  def testMultiWindowSqlNoAggregation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
'bigdec, 'string)
+
+    val window1 = tEnv.sqlQuery(
+      s"""SELECT
+          TUMBLE_ROWTIME(rowtime, INTERVAL '0.002' SECOND) AS rowtime,
+          TUMBLE_END(rowtime, INTERVAL '0.002' SECOND) AS endtime
+        FROM $table
+        GROUP BY TUMBLE(rowtime, INTERVAL '0.002' SECOND)""")
+
+    val window2 = tEnv.sqlQuery(
+      s"""SELECT
+          TUMBLE_ROWTIME(rowtime, INTERVAL '0.004' SECOND),
+          TUMBLE_END(rowtime, INTERVAL '0.004' SECOND)
+        FROM $window1
+        GROUP BY TUMBLE(rowtime, INTERVAL '0.004' SECOND)""")
+
+    val results = window2.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.003,1970-01-01 00:00:00.004",
+      "1970-01-01 00:00:00.007,1970-01-01 00:00:00.008",
+      "1970-01-01 00:00:00.011,1970-01-01 00:00:00.012",
+      "1970-01-01 00:00:00.019,1970-01-01 00:00:00.02"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testMultiWindowSqlWithAggregation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
'bigdec, 'string)
+
+    val window = tEnv.sqlQuery(
+      s"""SELECT
+          TUMBLE_ROWTIME(rowtime, INTERVAL '0.004' SECOND),
+          TUMBLE_END(rowtime, INTERVAL '0.004' SECOND),
+          COUNT(`int`) AS `int`
+        FROM (
+          SELECT
+            COUNT(`int`) AS `int`,
+            TUMBLE_ROWTIME(rowtime, INTERVAL '0.002' SECOND) AS `rowtime`
+          FROM $table
+          GROUP BY TUMBLE(rowtime, INTERVAL '0.002' SECOND)
+        )
+        GROUP BY TUMBLE(rowtime, INTERVAL '0.004' SECOND)""")
+
+    val results = window.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.003,1970-01-01 00:00:00.004,2",
+      "1970-01-01 00:00:00.007,1970-01-01 00:00:00.008,2",
+      "1970-01-01 00:00:00.011,1970-01-01 00:00:00.012,1",
+      "1970-01-01 00:00:00.019,1970-01-01 00:00:00.02,1"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testMultiWindowSqlWithAggregation2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'rowtime1.rowtime, 'int, 'double, 'float, 
'bigdec, 'string)
+
+    val window = tEnv.sqlQuery(
+      s"""SELECT
+          TUMBLE_ROWTIME(rowtime2, INTERVAL '0.004' SECOND),
+          TUMBLE_END(rowtime2, INTERVAL '0.004' SECOND),
+          COUNT(`int`) as `int`
+        FROM (
+          SELECT
+            TUMBLE_ROWTIME(rowtime1, INTERVAL '0.002' SECOND) AS rowtime2,
+            COUNT(`int`) as `int`
+          FROM $table
+          GROUP BY TUMBLE(rowtime1, INTERVAL '0.002' SECOND)
+        )
+        GROUP BY TUMBLE(rowtime2, INTERVAL '0.004' SECOND)""")
+
+    val results = window.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.003,1970-01-01 00:00:00.004,2",
+      "1970-01-01 00:00:00.007,1970-01-01 00:00:00.008,2",
+      "1970-01-01 00:00:00.011,1970-01-01 00:00:00.012,1",
+      "1970-01-01 00:00:00.019,1970-01-01 00:00:00.02,1"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
   def testCalcMaterializationWithPojoType(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -414,6 +531,35 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
 
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testSqlWindowRowtime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
'bigdec, 'string)
+    tEnv.registerTable("MyTable", table)
+
+    val t = tEnv.sqlQuery("SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '0.003' 
SECOND) FROM MyTable " +
+      "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
+
+    val results = t.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.002",
+      "1970-01-01 00:00:00.005",
+      "1970-01-01 00:00:00.008",
+      "1970-01-01 00:00:00.017"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 object TimeAttributesITCase {

Reply via email to