Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3397#discussion_r103512951
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
---
@@ -171,4 +183,82 @@ class SqlITCase extends
StreamingMultipleProgramsTestBase {
val expected = mutable.MutableList("Hello", "Hello world")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ // for sum aggregation ensure that every time the order of each
element is consistent
+ env.setParallelism(1)
+
+ val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT ProcTime()," +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED
preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED
preceding) as cnt2 " +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
+ "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15",
"Hello,6,21")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testUnboundPartitionedProcessingWindowWithRow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
--- End diff --
This query should not work because `ORDER BY` is missing which is required
for RANGE and ROWS because the result is not completely defined otherwise.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---