Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4736#discussion_r142254410
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
---
@@ -85,6 +85,46 @@ class OverWindowITCase extends
StreamingWithStateTestBase {
}
@Test
+ def testOverWindowWithConstant(): Unit = {
+
+ val data = List(
+ (1L, 1, "Hello"),
+ (2L, 2, "Hello"),
+ (3L, 3, "Hello"),
+ (4L, 4, "Hello"),
+ (5L, 5, "Hello"),
+ (6L, 6, "Hello"),
+ (7L, 7, "Hello World"),
+ (8L, 8, "Hello World"),
+ (8L, 8, "Hello World"),
+ (20L, 20, "Hello World"))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+ val weightAvgFun = new WeightedAvg
+
+ val windowedTable = table
+ .window(
+ Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as
'w)
+ .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)
+ .select('c, 'wAvg)
--- End diff --
can be removed
---