[ https://issues.apache.org/jira/browse/FLINK-11017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704651#comment-16704651 ]
ASF GitHub Bot commented on FLINK-11017: ---------------------------------------- asfgit closed pull request #7200: [FLINK-11017][table] Throw exception if constant with YEAR TO MONTH resolution was used for group windows URL: https://github.com/apache/flink/pull/7200 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala index 62eecbb0532..49e2ab3db69 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala @@ -22,7 +22,7 @@ import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ -import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.{SqlTypeFamily, SqlTypeName} import org.apache.flink.table.api.scala.{Session, Slide, Tumble} import org.apache.flink.table.api.{TableException, ValidationException, Window} import org.apache.flink.table.calcite.FlinkTypeFactory @@ -68,8 +68,10 @@ class DataStreamLogicalWindowAggregateRule def getOperandAsLong(call: RexCall, idx: Int): Long = call.getOperands.get(idx) match { - case v: RexLiteral => v.getValue.asInstanceOf[JBigDecimal].longValue() - case _ => throw new TableException("Only constant window descriptors are supported.") + case v: RexLiteral if v.getTypeName.getFamily == SqlTypeFamily.INTERVAL_DAY_TIME => + v.getValue.asInstanceOf[JBigDecimal].longValue() + case _ => throw new TableException( + "Only constant window descriptors with DAY TO SECOND resolution are supported.") } def getOperandAsTimeIndicator(call: RexCall, idx: Int): ResolvedFieldReference = diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala index e7cf597ca64..16749a25c45 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala @@ -30,8 +30,11 @@ class WindowAggregateValidationTest extends TableTestBase { streamUtil.addTable[(Int, String, Long)]( "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) - @Test(expected = classOf[TableException]) + @Test def testTumbleWindowNoOffset(): Unit = { + expectedException.expect(classOf[TableException]) + expectedException.expectMessage("TUMBLE window with alignment is not supported yet") + val sqlQuery = "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " + "FROM MyTable " + @@ -40,8 +43,11 @@ class WindowAggregateValidationTest extends TableTestBase { streamUtil.verifySql(sqlQuery, "n/a") } - @Test(expected = classOf[TableException]) + @Test def testHopWindowNoOffset(): Unit = { + expectedException.expect(classOf[TableException]) + expectedException.expectMessage("HOP window with alignment is not supported yet") + val sqlQuery = "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " + "FROM MyTable " + @@ -50,8 +56,11 @@ class WindowAggregateValidationTest extends TableTestBase { streamUtil.verifySql(sqlQuery, "n/a") } - @Test(expected = classOf[TableException]) + @Test def testSessionWindowNoOffset(): Unit = { + expectedException.expect(classOf[TableException]) + expectedException.expectMessage("SESSION window with alignment is not supported yet") + val sqlQuery = "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " + "FROM MyTable " + @@ -60,20 +69,40 @@ class WindowAggregateValidationTest extends TableTestBase { streamUtil.verifySql(sqlQuery, "n/a") } - @Test(expected = classOf[TableException]) + @Test def testVariableWindowSize() = { + expectedException.expect(classOf[TableException]) + expectedException.expectMessage("Only constant window descriptors with DAY TO SECOND " + + "resolution are supported") + val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * INTERVAL '1' MINUTE)" streamUtil.verifySql(sql, "n/a") } - @Test(expected = classOf[ValidationException]) + @Test def testWindowUdAggInvalidArgs(): Unit = { + expectedException.expect(classOf[ValidationException]) + expectedException.expectMessage("Given parameters of function do not match any signature") + streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge) val sqlQuery = "SELECT SUM(a) AS sumA, weightedAvg(a, b) AS wAvg " + "FROM MyTable " + - "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')" + "GROUP BY TUMBLE(proctime, INTERVAL '2' HOUR, TIME '10:00:00')" + + streamUtil.verifySql(sqlQuery, "n/a") + } + + @Test + def testWindowWrongWindowParameter(): Unit = { + expectedException.expect(classOf[TableException]) + expectedException.expectMessage("Only constant window descriptors with DAY TO SECOND " + + "resolution are supported") + + val sqlQuery = + "SELECT COUNT(*) FROM MyTable " + + "GROUP BY TUMBLE(proctime, INTERVAL '2-10' YEAR TO MONTH)" streamUtil.verifySql(sqlQuery, "n/a") } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Time interval for window aggregations in SQL is wrongly translated if > specified with YEAR_MONTH resolution > ---------------------------------------------------------------------------------------------------------- > > Key: FLINK-11017 > URL: https://issues.apache.org/jira/browse/FLINK-11017 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.6.2, 1.7.0 > Reporter: Dawid Wysakowicz > Assignee: Dawid Wysakowicz > Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.8.0, 1.7.1 > > > If a time interval was specified with {{YEAR TO MONTH}} resolution like e.g.: > {code} > SELECT * > FROM Mytable > GROUP BY > TUMBLE(rowtime, INTERVAL '1-2' YEAR TO MONTH) > {code} > it will be wrongly translated to 14 milliseconds window. We should allow for > only DAY TO SECOND resolution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)