This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e9fa33  [FLINK-23283][table-planner] Fix unstable case 
GroupWindowITCase#testWindowAggregateOnUpsertSource
4e9fa33 is described below

commit 4e9fa339dfd921d2cbc4c169bf8b894635d1c528
Author: Jing Zhang <[email protected]>
AuthorDate: Wed Jul 7 14:24:14 2021 +0800

    [FLINK-23283][table-planner] Fix unstable case 
GroupWindowITCase#testWindowAggregateOnUpsertSource
    
    This closes #16408
---
 .../flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
index 455c024..36b2d42 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
@@ -384,6 +384,7 @@ class GroupWindowITCase(mode: StateBackendMode, 
useTimestampLtz: Boolean)
 
   @Test
   def testWindowAggregateOnUpsertSource(): Unit = {
+    env.setParallelism(1)
     val upsertSourceDataId = registerData(upsertSourceCurrencyData)
     tEnv.executeSql(
       s"""
@@ -417,7 +418,8 @@ class GroupWindowITCase(mode: StateBackendMode, 
useTimestampLtz: Boolean)
     val expected = Seq(
       "US Dollar,1,102,1970-01-01T00:00,1970-01-01T00:00:05",
       "Yen,1,1,1970-01-01T00:00,1970-01-01T00:00:05",
-      "Euro,1,118,1970-01-01T00:00:15,1970-01-01T00:00:20")
+      "Euro,1,118,1970-01-01T00:00:15,1970-01-01T00:00:20",
+      "RMB,1,702,1970-01-01T00:00,1970-01-01T00:00:05")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
@@ -472,6 +474,7 @@ class GroupWindowITCase(mode: StateBackendMode, 
useTimestampLtz: Boolean)
 
   @Test
   def testWindowAggregateOnUpsertSourcePushdownWatermark(): Unit = {
+    env.setParallelism(1)
     val upsertSourceDataId = registerData(upsertSourceCurrencyData)
     tEnv.executeSql(
       s"""
@@ -501,7 +504,7 @@ class GroupWindowITCase(mode: StateBackendMode, 
useTimestampLtz: Boolean)
     tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
     env.execute()
     val expected = Seq(
-      "1970-01-01T00:00,1970-01-01T00:00:05,102",
+      "1970-01-01T00:00,1970-01-01T00:00:05,702",
       "1970-01-01T00:00:15,1970-01-01T00:00:20,118")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }

Reply via email to