[ 
https://issues.apache.org/jira/browse/FLINK-11017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704651#comment-16704651
 ] 

ASF GitHub Bot commented on FLINK-11017:
----------------------------------------

asfgit closed pull request #7200: [FLINK-11017][table] Throw exception if 
constant with YEAR TO MONTH resolution was used for group windows
URL: https://github.com/apache/flink/pull/7200
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index 62eecbb0532..49e2ab3db69 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -22,7 +22,7 @@ import java.math.{BigDecimal => JBigDecimal}
 
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
-import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.{SqlTypeFamily, SqlTypeName}
 import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
 import org.apache.flink.table.api.{TableException, ValidationException, Window}
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -68,8 +68,10 @@ class DataStreamLogicalWindowAggregateRule
 
     def getOperandAsLong(call: RexCall, idx: Int): Long =
       call.getOperands.get(idx) match {
-        case v: RexLiteral => v.getValue.asInstanceOf[JBigDecimal].longValue()
-        case _ => throw new TableException("Only constant window descriptors 
are supported.")
+        case v: RexLiteral if v.getTypeName.getFamily == 
SqlTypeFamily.INTERVAL_DAY_TIME =>
+          v.getValue.asInstanceOf[JBigDecimal].longValue()
+        case _ => throw new TableException(
+          "Only constant window descriptors with DAY TO SECOND resolution are 
supported.")
       }
 
     def getOperandAsTimeIndicator(call: RexCall, idx: Int): 
ResolvedFieldReference =
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala
index e7cf597ca64..16749a25c45 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala
@@ -30,8 +30,11 @@ class WindowAggregateValidationTest extends TableTestBase {
   streamUtil.addTable[(Int, String, Long)](
     "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
 
-  @Test(expected = classOf[TableException])
+  @Test
   def testTumbleWindowNoOffset(): Unit = {
+    expectedException.expect(classOf[TableException])
+    expectedException.expectMessage("TUMBLE window with alignment is not 
supported yet")
+
     val sqlQuery =
       "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
         "FROM MyTable " +
@@ -40,8 +43,11 @@ class WindowAggregateValidationTest extends TableTestBase {
     streamUtil.verifySql(sqlQuery, "n/a")
   }
 
-  @Test(expected = classOf[TableException])
+  @Test
   def testHopWindowNoOffset(): Unit = {
+    expectedException.expect(classOf[TableException])
+    expectedException.expectMessage("HOP window with alignment is not 
supported yet")
+
     val sqlQuery =
       "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
         "FROM MyTable " +
@@ -50,8 +56,11 @@ class WindowAggregateValidationTest extends TableTestBase {
     streamUtil.verifySql(sqlQuery, "n/a")
   }
 
-  @Test(expected = classOf[TableException])
+  @Test
   def testSessionWindowNoOffset(): Unit = {
+    expectedException.expect(classOf[TableException])
+    expectedException.expectMessage("SESSION window with alignment is not 
supported yet")
+
     val sqlQuery =
       "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
         "FROM MyTable " +
@@ -60,20 +69,40 @@ class WindowAggregateValidationTest extends TableTestBase {
     streamUtil.verifySql(sqlQuery, "n/a")
   }
 
-  @Test(expected = classOf[TableException])
+  @Test
   def testVariableWindowSize() = {
+    expectedException.expect(classOf[TableException])
+    expectedException.expectMessage("Only constant window descriptors with DAY 
TO SECOND " +
+      "resolution are supported")
+
     val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * 
INTERVAL '1' MINUTE)"
     streamUtil.verifySql(sql, "n/a")
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testWindowUdAggInvalidArgs(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage("Given parameters of function do not match 
any signature")
+
     streamUtil.tableEnv.registerFunction("weightedAvg", new 
WeightedAvgWithMerge)
 
     val sqlQuery =
       "SELECT SUM(a) AS sumA, weightedAvg(a, b) AS wAvg " +
         "FROM MyTable " +
-        "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
+        "GROUP BY TUMBLE(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    streamUtil.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test
+  def testWindowWrongWindowParameter(): Unit = {
+    expectedException.expect(classOf[TableException])
+    expectedException.expectMessage("Only constant window descriptors with DAY 
TO SECOND " +
+      "resolution are supported")
+
+    val sqlQuery =
+      "SELECT COUNT(*) FROM MyTable " +
+        "GROUP BY TUMBLE(proctime, INTERVAL '2-10' YEAR TO MONTH)"
 
     streamUtil.verifySql(sqlQuery, "n/a")
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Time interval for window aggregations in SQL is wrongly translated if 
> specified with YEAR_MONTH resolution
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11017
>                 URL: https://issues.apache.org/jira/browse/FLINK-11017
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.6.2, 1.7.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.3, 1.8.0, 1.7.1
>
>
> If a time interval was specified with {{YEAR TO MONTH}} resolution like e.g.:
> {code}
> SELECT * 
> FROM Mytable
> GROUP BY 
>     TUMBLE(rowtime, INTERVAL '1-2' YEAR TO MONTH)
> {code}
> it will be wrongly translated to 14 milliseconds window. We should allow for 
> only DAY TO SECOND resolution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to