Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3397#discussion_r102937926
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
---
@@ -171,4 +183,79 @@ class SqlITCase extends
StreamingMultipleProgramsTestBase {
val expected = mutable.MutableList("Hello", "Hello world")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ //TODO ORDER BY will be fixed, once FLINK-5710 is solved
+ tEnv.registerFunction("ProcTime", ProcTime)
+ val sqlQuery = "SELECT c, count(a) OVER (PARTITION BY c ORDER BY
ProcTime() RANGE " +
+ "UNBOUNDED " +
+ "preceding) as firstCount, count(a) OVER (PARTITION BY c ORDER BY
ProcTime() RANGE UNBOUNDED" +
+ " preceding) as secondCount from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Hello World,1,1", "Hello World,2,2", "Hello World,3,3",
+ "Hello,1,1", "Hello,2,2", "Hello,3,3", "Hello,4,4", "Hello,5,5",
"Hello,6,6")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testUnboundPartitionedProcessingWindowWithRow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT c, count(a) OVER (PARTITION BY c ROWS BETWEEN
UNBOUNDED preceding " +
+ "AND CURRENT ROW) as firstCount from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Hello World,1", "Hello World,2", "Hello World,3",
+ "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test(expected = classOf[UnsupportedOperationException])
--- End diff --
Please add a comment why this test is expected to fail.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---