This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new a106738  [FLINK-13564][table-planner-blink] throw exception if 
constant with YEAR TO MONTH resolution was used for group windows
a106738 is described below

commit a106738721edd2f7853605ac68f6bb16e1d817b0
Author: godfreyhe <godfre...@163.com>
AuthorDate: Sat Aug 3 17:50:55 2019 +0800

    [FLINK-13564][table-planner-blink] throw exception if constant with YEAR TO 
MONTH resolution was used for group windows
    
    This is a same fix with FLINK-11017 in blink planner.
    
    This closes #9349
---
 .../logical/BatchLogicalWindowAggregateRule.scala  |  9 ++++++
 .../logical/LogicalWindowAggregateRuleBase.scala   | 12 ++++----
 .../logical/StreamLogicalWindowAggregateRule.scala | 14 ++++++++-
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 21 +++++++++++++
 .../plan/stream/sql/agg/WindowAggregateTest.scala  | 35 +++++++++++++++++++++-
 5 files changed, 82 insertions(+), 9 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/BatchLogicalWindowAggregateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/BatchLogicalWindowAggregateRule.scala
index 86b9098..e711d8d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/BatchLogicalWindowAggregateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/BatchLogicalWindowAggregateRule.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.rules.logical
 
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.expressions.FieldReferenceExpression
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType
@@ -28,6 +29,8 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
 import org.apache.calcite.rex._
 
+import _root_.java.math.{BigDecimal => JBigDecimal}
+
 /**
   * Planner rule that transforms simple [[LogicalAggregate]] on a 
[[LogicalProject]]
   * with windowing expression to [[LogicalWindowAggregate]] for batch.
@@ -73,6 +76,12 @@ class BatchLogicalWindowAggregateRule
           ref.getIndex)
     }
   }
+
+  def getOperandAsLong(call: RexCall, idx: Int): Long =
+    call.getOperands.get(idx) match {
+      case v: RexLiteral => v.getValue.asInstanceOf[JBigDecimal].longValue()
+      case _ => throw new TableException("Only constant window descriptors are 
supported")
+    }
 }
 
 object BatchLogicalWindowAggregateRule {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
index ee24adb..9f88b8f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
@@ -39,8 +39,6 @@ import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeUtil
 import org.apache.calcite.util.ImmutableBitSet
 
-import _root_.java.math.BigDecimal
-
 import _root_.scala.collection.JavaConversions._
 
 /**
@@ -247,11 +245,6 @@ abstract class LogicalWindowAggregateRuleBase(description: 
String)
       windowExpr: RexCall,
       windowExprIdx: Int,
       rowType: RelDataType): LogicalWindow = {
-    def getOperandAsLong(call: RexCall, idx: Int): Long =
-      call.getOperands.get(idx) match {
-        case v: RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue()
-        case _ => throw new TableException("Only constant window descriptors 
are supported")
-      }
 
     val timeField = getTimeFieldReference(windowExpr.getOperands.get(0), 
windowExprIdx, rowType)
     val resultType = 
Some(fromDataTypeToLogicalType(timeField.getOutputDataType))
@@ -288,4 +281,9 @@ abstract class LogicalWindowAggregateRuleBase(description: 
String)
       operand: RexNode,
       windowExprIdx: Int,
       rowType: RelDataType): FieldReferenceExpression
+
+  /**
+    * get operand value as Long type
+    */
+  def getOperandAsLong(call: RexCall, idx: Int): Long
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/StreamLogicalWindowAggregateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/StreamLogicalWindowAggregateRule.scala
index aced557..af1a481 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/StreamLogicalWindowAggregateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/StreamLogicalWindowAggregateRule.scala
@@ -28,7 +28,9 @@ import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLog
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
 import org.apache.calcite.rex._
-import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.{SqlTypeFamily, SqlTypeName}
+
+import _root_.java.math.{BigDecimal => JBigDecimal}
 
 /**
   * Planner rule that transforms simple [[LogicalAggregate]] on a 
[[LogicalProject]]
@@ -83,6 +85,16 @@ class StreamLogicalWindowAggregateRule
         throw new ValidationException("Window can only be defined over a time 
attribute column.")
     }
   }
+
+  def getOperandAsLong(call: RexCall, idx: Int): Long =
+    call.getOperands.get(idx) match {
+      case v: RexLiteral if v.getTypeName.getFamily == 
SqlTypeFamily.INTERVAL_DAY_TIME =>
+        v.getValue.asInstanceOf[JBigDecimal].longValue()
+      case _: RexLiteral => throw new TableException(
+        "Window aggregate only support SECOND, MINUTE, HOUR, DAY as the time 
unit. " +
+          "MONTH and YEAR time unit are not supported yet.")
+      case _ => throw new TableException("Only constant window descriptors are 
supported.")
+    }
 }
 
 object StreamLogicalWindowAggregateRule {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 45f4725..e59e5a3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -134,6 +134,27 @@ Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS 
EXPR$3])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testIntervalDay">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, INTERVAL 
'35' DAY)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+   +- LogicalProject($f0=[TUMBLE($3, 3024000000:INTERVAL DAY)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupWindowAggregate(window=[TumblingGroupWindow('w$, proctime, 3024000000)], 
select=[COUNT(*) AS EXPR$0])
++- Exchange(distribution=[single])
+   +- Calc(select=[proctime])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testTumbleFunNotInGroupBy">
     <Resource name="sql">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index 3c773ca..4b6e535 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -86,6 +86,39 @@ class WindowAggregateTest extends TableTestBase {
   }
 
   @Test
+  def testWindowWrongWindowParameter1(): Unit = {
+    expectedException.expect(classOf[TableException])
+    expectedException.expectMessage(
+      "Window aggregate only support SECOND, MINUTE, HOUR, DAY as the time 
unit. " +
+        "MONTH and YEAR time unit are not supported yet.")
+
+    val sqlQuery =
+      "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, INTERVAL '1' 
MONTH)"
+
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testWindowWrongWindowParameter2(): Unit = {
+    expectedException.expect(classOf[TableException])
+    expectedException.expectMessage(
+      "Window aggregate only support SECOND, MINUTE, HOUR, DAY as the time 
unit. " +
+        "MONTH and YEAR time unit are not supported yet.")
+
+    val sqlQuery =
+      "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, INTERVAL '2-10' 
YEAR TO MONTH)"
+
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testIntervalDay(): Unit = {
+    val sqlQuery =
+      "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, INTERVAL '35' 
DAY)"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
   def testTumbleFunction(): Unit = {
     val sql =
       """
@@ -296,7 +329,7 @@ class WindowAggregateTest extends TableTestBase {
   }
 
   @Test
-  def testReturnTypeInferenceForWindowAgg() = {
+  def testReturnTypeInferenceForWindowAgg(): Unit = {
 
     val sql =
       """

Reply via email to