This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new a106738 [FLINK-13564][table-planner-blink] throw exception if constant with YEAR TO MONTH resolution was used for group windows a106738 is described below commit a106738721edd2f7853605ac68f6bb16e1d817b0 Author: godfreyhe <godfre...@163.com> AuthorDate: Sat Aug 3 17:50:55 2019 +0800 [FLINK-13564][table-planner-blink] throw exception if constant with YEAR TO MONTH resolution was used for group windows This is a same fix with FLINK-11017 in blink planner. This closes #9349 --- .../logical/BatchLogicalWindowAggregateRule.scala | 9 ++++++ .../logical/LogicalWindowAggregateRuleBase.scala | 12 ++++---- .../logical/StreamLogicalWindowAggregateRule.scala | 14 ++++++++- .../plan/stream/sql/agg/WindowAggregateTest.xml | 21 +++++++++++++ .../plan/stream/sql/agg/WindowAggregateTest.scala | 35 +++++++++++++++++++++- 5 files changed, 82 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/BatchLogicalWindowAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/BatchLogicalWindowAggregateRule.scala index 86b9098..e711d8d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/BatchLogicalWindowAggregateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/BatchLogicalWindowAggregateRule.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.rules.logical +import org.apache.flink.table.api.TableException import org.apache.flink.table.expressions.FieldReferenceExpression import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType @@ -28,6 +29,8 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject} import org.apache.calcite.rex._ +import _root_.java.math.{BigDecimal => JBigDecimal} + /** * Planner rule that transforms simple [[LogicalAggregate]] on a [[LogicalProject]] * with windowing expression to [[LogicalWindowAggregate]] for batch. @@ -73,6 +76,12 @@ class BatchLogicalWindowAggregateRule ref.getIndex) } } + + def getOperandAsLong(call: RexCall, idx: Int): Long = + call.getOperands.get(idx) match { + case v: RexLiteral => v.getValue.asInstanceOf[JBigDecimal].longValue() + case _ => throw new TableException("Only constant window descriptors are supported") + } } object BatchLogicalWindowAggregateRule { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala index ee24adb..9f88b8f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala @@ -39,8 +39,6 @@ import org.apache.calcite.rex._ import org.apache.calcite.sql.`type`.SqlTypeUtil import org.apache.calcite.util.ImmutableBitSet -import _root_.java.math.BigDecimal - import _root_.scala.collection.JavaConversions._ /** @@ -247,11 +245,6 @@ abstract class LogicalWindowAggregateRuleBase(description: String) windowExpr: RexCall, windowExprIdx: Int, rowType: RelDataType): LogicalWindow = { - def getOperandAsLong(call: RexCall, idx: Int): Long = - call.getOperands.get(idx) match { - case v: RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue() - case _ => throw new TableException("Only constant window descriptors are supported") - } val timeField = getTimeFieldReference(windowExpr.getOperands.get(0), windowExprIdx, rowType) val resultType = Some(fromDataTypeToLogicalType(timeField.getOutputDataType)) @@ -288,4 +281,9 @@ abstract class LogicalWindowAggregateRuleBase(description: String) operand: RexNode, windowExprIdx: Int, rowType: RelDataType): FieldReferenceExpression + + /** + * get operand value as Long type + */ + def getOperandAsLong(call: RexCall, idx: Int): Long } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/StreamLogicalWindowAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/StreamLogicalWindowAggregateRule.scala index aced557..af1a481 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/StreamLogicalWindowAggregateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/StreamLogicalWindowAggregateRule.scala @@ -28,7 +28,9 @@ import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLog import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject} import org.apache.calcite.rex._ -import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.{SqlTypeFamily, SqlTypeName} + +import _root_.java.math.{BigDecimal => JBigDecimal} /** * Planner rule that transforms simple [[LogicalAggregate]] on a [[LogicalProject]] @@ -83,6 +85,16 @@ class StreamLogicalWindowAggregateRule throw new ValidationException("Window can only be defined over a time attribute column.") } } + + def getOperandAsLong(call: RexCall, idx: Int): Long = + call.getOperands.get(idx) match { + case v: RexLiteral if v.getTypeName.getFamily == SqlTypeFamily.INTERVAL_DAY_TIME => + v.getValue.asInstanceOf[JBigDecimal].longValue() + case _: RexLiteral => throw new TableException( + "Window aggregate only support SECOND, MINUTE, HOUR, DAY as the time unit. " + + "MONTH and YEAR time unit are not supported yet.") + case _ => throw new TableException("Only constant window descriptors are supported.") + } } object StreamLogicalWindowAggregateRule { diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index 45f4725..e59e5a3 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -134,6 +134,27 @@ Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3]) ]]> </Resource> </TestCase> + <TestCase name="testIntervalDay"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, INTERVAL '35' DAY)]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$1]) ++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()]) + +- LogicalProject($f0=[TUMBLE($3, 3024000000:INTERVAL DAY)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +GroupWindowAggregate(window=[TumblingGroupWindow('w$, proctime, 3024000000)], select=[COUNT(*) AS EXPR$0]) ++- Exchange(distribution=[single]) + +- Calc(select=[proctime]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> <TestCase name="testTumbleFunNotInGroupBy"> <Resource name="sql"> <![CDATA[ diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala index 3c773ca..4b6e535 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala @@ -86,6 +86,39 @@ class WindowAggregateTest extends TableTestBase { } @Test + def testWindowWrongWindowParameter1(): Unit = { + expectedException.expect(classOf[TableException]) + expectedException.expectMessage( + "Window aggregate only support SECOND, MINUTE, HOUR, DAY as the time unit. " + + "MONTH and YEAR time unit are not supported yet.") + + val sqlQuery = + "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, INTERVAL '1' MONTH)" + + util.verifyPlan(sqlQuery) + } + + @Test + def testWindowWrongWindowParameter2(): Unit = { + expectedException.expect(classOf[TableException]) + expectedException.expectMessage( + "Window aggregate only support SECOND, MINUTE, HOUR, DAY as the time unit. " + + "MONTH and YEAR time unit are not supported yet.") + + val sqlQuery = + "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, INTERVAL '2-10' YEAR TO MONTH)" + + util.verifyPlan(sqlQuery) + } + + @Test + def testIntervalDay(): Unit = { + val sqlQuery = + "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, INTERVAL '35' DAY)" + util.verifyPlan(sqlQuery) + } + + @Test def testTumbleFunction(): Unit = { val sql = """ @@ -296,7 +329,7 @@ class WindowAggregateTest extends TableTestBase { } @Test - def testReturnTypeInferenceForWindowAgg() = { + def testReturnTypeInferenceForWindowAgg(): Unit = { val sql = """