XuQianJin-Stars closed pull request #7228: [FLINK-9740][Table API &SQL] Support group windows over intervals of months URL: https://github.com/apache/flink/pull/7228
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala index 6080ba49437..96139535c8e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala @@ -44,6 +44,11 @@ object ExpressionUtils { case _ => false } + private[flink] def isMonthsIntervalLiteral(expr: Expression): Boolean = expr match { + case Literal(_, TimeIntervalTypeInfo.INTERVAL_MONTHS) => true + case _ => false + } + private[flink] def isRowCountLiteral(expr: Expression): Boolean = expr match { case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) => true case _ => false @@ -71,6 +76,12 @@ object ExpressionUtils { case _ => throw new IllegalArgumentException() } + private[flink] def toMonths(expr: Expression): FlinkTime = expr match { + case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MONTHS) => + FlinkTime.months(value) + case _ => throw new IllegalArgumentException() + } + private[flink] def toLong(expr: Expression): Long = expr match { case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value case _ => throw new IllegalArgumentException() diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index 78a7273f6af..734bb551562 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -264,6 +264,10 @@ object DataStreamGroupWindowAggregate { if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size) => stream.window(TumblingEventTimeWindows.of(toTime(size))) + case TumblingGroupWindow(_, timeField, size) + if isRowtimeAttribute(timeField) && isMonthsIntervalLiteral(size) => + stream.window(TumblingEventTimeWindows.of(toMonths(size))) + case TumblingGroupWindow(_, _, size) => // TODO: EventTimeTumblingGroupWindow should sort the stream on event time // before applying the windowing logic. Otherwise, this would be the same as a @@ -284,6 +288,10 @@ object DataStreamGroupWindowAggregate { if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=> stream.window(SlidingEventTimeWindows.of(toTime(size), toTime(slide))) + case SlidingGroupWindow(_, timeField, size, slide) + if isRowtimeAttribute(timeField) && isMonthsIntervalLiteral(size) => + stream.window(SlidingEventTimeWindows.of(toMonths(size), toMonths(slide))) + case SlidingGroupWindow(_, _, size, slide) => // TODO: EventTimeTumblingGroupWindow should sort the stream on event time // before applying the windowing logic. Otherwise, this would be the same as a @@ -292,9 +300,13 @@ object DataStreamGroupWindowAggregate { "Event-time grouping windows on row intervals are currently not supported.") case SessionGroupWindow(_, timeField, gap) - if isProctimeAttribute(timeField) => + if isProctimeAttribute(timeField) => stream.window(ProcessingTimeSessionWindows.withGap(toTime(gap))) + case SessionGroupWindow(_, timeField, gap) + if isRowtimeAttribute(timeField) && isMonthsIntervalLiteral(gap) => + stream.window(EventTimeSessionWindows.withGap(toMonths(gap))) + case SessionGroupWindow(_, timeField, gap) if isRowtimeAttribute(timeField) => stream.window(EventTimeSessionWindows.withGap(toTime(gap))) @@ -318,6 +330,9 @@ object DataStreamGroupWindowAggregate { case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => stream.windowAll(TumblingEventTimeWindows.of(toTime(size))) + case TumblingGroupWindow(_, _, size) if isMonthsIntervalLiteral(size) => + stream.windowAll(TumblingEventTimeWindows.of(toMonths(size))) + case TumblingGroupWindow(_, _, size) => // TODO: EventTimeTumblingGroupWindow should sort the stream on event time // before applying the windowing logic. Otherwise, this would be the same as a @@ -338,6 +353,10 @@ object DataStreamGroupWindowAggregate { if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=> stream.windowAll(SlidingEventTimeWindows.of(toTime(size), toTime(slide))) + case SlidingGroupWindow(_, timeField, size, slide) + if isRowtimeAttribute(timeField) && isMonthsIntervalLiteral(size)=> + stream.windowAll(SlidingEventTimeWindows.of(toMonths(size), toMonths(slide))) + case SlidingGroupWindow(_, _, size, slide) => // TODO: EventTimeTumblingGroupWindow should sort the stream on event time // before applying the windowing logic. Otherwise, this would be the same as a @@ -349,6 +368,10 @@ object DataStreamGroupWindowAggregate { if isProctimeAttribute(timeField) && isTimeIntervalLiteral(gap) => stream.windowAll(ProcessingTimeSessionWindows.withGap(toTime(gap))) + case SessionGroupWindow(_, timeField, gap) + if isProctimeAttribute(timeField) && isMonthsIntervalLiteral(gap) => + stream.windowAll(ProcessingTimeSessionWindows.withGap(toMonths(gap))) + case SessionGroupWindow(_, timeField, gap) if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(gap) => stream.windowAll(EventTimeSessionWindows.withGap(toTime(gap))) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala index c8c58adf226..db8a8ebb2ea 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala @@ -70,8 +70,10 @@ class DataStreamLogicalWindowAggregateRule call.getOperands.get(idx) match { case v: RexLiteral if v.getTypeName.getFamily == SqlTypeFamily.INTERVAL_DAY_TIME => v.getValue.asInstanceOf[JBigDecimal].longValue() + case v: RexLiteral if v.getTypeName.getFamily == SqlTypeFamily.INTERVAL_YEAR_MONTH => + v.getValue.asInstanceOf[JBigDecimal].longValue() case _ => throw new TableException( - "Only constant window intervals with millisecond resolution are supported.") + "Only constant window intervals with millisecond and months resolution are supported.") } def getOperandAsTimeIndicator(call: RexCall, idx: Int): ResolvedFieldReference = @@ -84,27 +86,37 @@ class DataStreamLogicalWindowAggregateRule throw new ValidationException("Window can only be defined over a time attribute column.") } + def getOperandAsLiteral(call: RexCall, idx: Int): Literal = + call.getOperands.get(idx) match { + case v: RexLiteral if v.getTypeName.getFamily == SqlTypeFamily.INTERVAL_DAY_TIME => + Literal(getOperandAsLong(call, idx), TimeIntervalTypeInfo.INTERVAL_MILLIS) + case v: RexLiteral if v.getTypeName.getFamily == SqlTypeFamily.INTERVAL_YEAR_MONTH => + Literal(getOperandAsLong(call, idx), TimeIntervalTypeInfo.INTERVAL_MONTHS) + case _ => throw new TableException( + "Only constant window intervals with millisecond and months resolution are supported.") + } + windowExpr.getOperator match { case BasicOperatorTable.TUMBLE => val time = getOperandAsTimeIndicator(windowExpr, 0) - val interval = getOperandAsLong(windowExpr, 1) - val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - + val tumbleLiteral = getOperandAsLiteral(windowExpr, 1) + val w = Tumble.over(tumbleLiteral) w.on(time).as(WindowReference("w$", Some(time.resultType))) case BasicOperatorTable.HOP => val time = getOperandAsTimeIndicator(windowExpr, 0) - val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2)) + val (slideLiteral, sizeLiteral) = (getOperandAsLiteral(windowExpr, 1), + getOperandAsLiteral(windowExpr, 2)) val w = Slide - .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS)) + .over(sizeLiteral) + .every(slideLiteral) w.on(time).as(WindowReference("w$", Some(time.resultType))) case BasicOperatorTable.SESSION => val time = getOperandAsTimeIndicator(windowExpr, 0) - val gap = getOperandAsLong(windowExpr, 1) - val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS)) + val gapLiteral = getOperandAsLiteral(windowExpr, 1) + val w = Session.withGap(gapLiteral) w.on(time).as(WindowReference("w$", Some(time.resultType))) } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala index cfbab5c074c..8fe9f17ae83 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala @@ -19,11 +19,18 @@ package org.apache.flink.table.api.stream.sql import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.plan.logical._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge +import org.apache.flink.table.runtime.utils.StreamITCase import org.apache.flink.table.utils.TableTestUtil._ import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row import org.junit.Test class GroupWindowTest extends TableTestBase { @@ -306,4 +313,51 @@ class GroupWindowTest extends TableTestBase { ) streamUtil.verifySql(sql, expected) } + + def RunWindowForMonthsInterval(groupWindow: String, interval: String): Unit = { + val execEnv = StreamExecutionEnvironment.getExecutionEnvironment + execEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + execEnv.setParallelism(1) + val tableEnv = TableEnvironment.getTableEnvironment(execEnv) + StreamITCase.clear + + val table = tableEnv.sqlQuery("SELECT * FROM (VALUES " + + "('Bob', 1543052856000), " + + "('Bob', 1543052857000), " + + "('Lucy', 1543054856000)" + + ") AS NameTable(name,create_time)") + + var inputStream: DataStream[Row] = tableEnv.toAppendStream(table) + inputStream.javaStream.getTransformation.setOutputType(table.getSchema.toRowType) + inputStream = inputStream.assignTimestampsAndWatermarks( + new AssignerWithPeriodicWatermarks[Row]() { + override def getCurrentWatermark = new Watermark(System.nanoTime) + + def extractTimestamp(element: Row, previousElementTimestamp: Long): Long = + element.getField(1).asInstanceOf[Long] + }) + + val tb1 = tableEnv.fromDataStream(inputStream, 'name, 'create_time, 'rowtime.rowtime) + + val groupWhereSql = groupWindow match { + case "TUMBLE" => s" GROUP BY TUMBLE(rowtime, $interval),name" + case "HOP" => s" GROUP BY HOP(rowtime, $interval),name" + case "SESSION" => s" GROUP BY SESSION(rowtime, $interval),name" + } + + val result = tableEnv.sqlQuery(s"select name,count(1) from $tb1 " + groupWhereSql) + result.addSink(new StreamITCase.StringSink[Row]) + + execEnv.execute() + } + + @Test + def TestWindowForMonthsInterval(): Unit = { + RunWindowForMonthsInterval("TUMBLE", "interval '10' month") + RunWindowForMonthsInterval("TUMBLE", "INTERVAL '2-10' YEAR TO MONTH") + RunWindowForMonthsInterval("HOP", "INTERVAL '1' MONTH, INTERVAL '10' MONTH") + RunWindowForMonthsInterval("HOP", "INTERVAL '1' MONTH, INTERVAL '2-10' YEAR TO MONTH") + RunWindowForMonthsInterval("SESSION", "interval '10' month") + RunWindowForMonthsInterval("SESSION", "INTERVAL '2-10' YEAR TO MONTH") + } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala index 5c237ffc5ee..f2cd087692d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala @@ -73,7 +73,7 @@ class WindowAggregateValidationTest extends TableTestBase { def testVariableWindowSize(): Unit = { expectedException.expect(classOf[TableException]) expectedException.expectMessage( - "Only constant window intervals with millisecond resolution are supported") + "Only constant window intervals with millisecond and months resolution are supported.") val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * INTERVAL '1' MINUTE)" streamUtil.verifySql(sql, "n/a") @@ -93,17 +93,4 @@ class WindowAggregateValidationTest extends TableTestBase { streamUtil.verifySql(sqlQuery, "n/a") } - - @Test - def testWindowWrongWindowParameter(): Unit = { - expectedException.expect(classOf[TableException]) - expectedException.expectMessage( - "Only constant window intervals with millisecond resolution are supported") - - val sqlQuery = - "SELECT COUNT(*) FROM MyTable " + - "GROUP BY TUMBLE(proctime, INTERVAL '2-10' YEAR TO MONTH)" - - streamUtil.verifySql(sqlQuery, "n/a") - } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index 46dde8e0225..a0aa8b4f247 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -21,12 +21,10 @@ package org.apache.flink.table.runtime.stream.sql import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableEnvironment, Types} +import org.apache.flink.table.api.Types import org.apache.flink.table.descriptors.{Rowtime, Schema} import org.apache.flink.table.expressions.utils.Func15 import org.apache.flink.table.runtime.stream.sql.SqlITCase.TimestampAndWatermarkWithOffset @@ -34,6 +32,9 @@ import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction import org.apache.flink.table.runtime.utils.{JavaUserDefinedTableFunctions, StreamITCase, StreamTestData, StreamingWithStateTestBase} import org.apache.flink.table.utils.{InMemoryTableFactory, MemoryTableSourceSinkUtil} import org.apache.flink.types.Row +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.api.TableEnvironment import org.junit.Assert._ import org.junit._ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java index d09928694eb..d8a059133fa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java @@ -124,4 +124,11 @@ public static Time hours(long hours) { public static Time days(long days) { return of(days, TimeUnit.DAYS); } + + /** + * Creates a new {@link Time} that represents the given number of months. + */ + public static Time months(long months) { + return of(months, TimeUnit.DAYS); + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services