This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4e9fa33 [FLINK-23283][table-planner] Fix unstable case
GroupWindowITCase#testWindowAggregateOnUpsertSource
4e9fa33 is described below
commit 4e9fa339dfd921d2cbc4c169bf8b894635d1c528
Author: Jing Zhang <[email protected]>
AuthorDate: Wed Jul 7 14:24:14 2021 +0800
[FLINK-23283][table-planner] Fix unstable case
GroupWindowITCase#testWindowAggregateOnUpsertSource
This closes #16408
---
.../flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
index 455c024..36b2d42 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
@@ -384,6 +384,7 @@ class GroupWindowITCase(mode: StateBackendMode,
useTimestampLtz: Boolean)
@Test
def testWindowAggregateOnUpsertSource(): Unit = {
+ env.setParallelism(1)
val upsertSourceDataId = registerData(upsertSourceCurrencyData)
tEnv.executeSql(
s"""
@@ -417,7 +418,8 @@ class GroupWindowITCase(mode: StateBackendMode,
useTimestampLtz: Boolean)
val expected = Seq(
"US Dollar,1,102,1970-01-01T00:00,1970-01-01T00:00:05",
"Yen,1,1,1970-01-01T00:00,1970-01-01T00:00:05",
- "Euro,1,118,1970-01-01T00:00:15,1970-01-01T00:00:20")
+ "Euro,1,118,1970-01-01T00:00:15,1970-01-01T00:00:20",
+ "RMB,1,702,1970-01-01T00:00,1970-01-01T00:00:05")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
@@ -472,6 +474,7 @@ class GroupWindowITCase(mode: StateBackendMode,
useTimestampLtz: Boolean)
@Test
def testWindowAggregateOnUpsertSourcePushdownWatermark(): Unit = {
+ env.setParallelism(1)
val upsertSourceDataId = registerData(upsertSourceCurrencyData)
tEnv.executeSql(
s"""
@@ -501,7 +504,7 @@ class GroupWindowITCase(mode: StateBackendMode,
useTimestampLtz: Boolean)
tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
env.execute()
val expected = Seq(
- "1970-01-01T00:00,1970-01-01T00:00:05,102",
+ "1970-01-01T00:00,1970-01-01T00:00:05,702",
"1970-01-01T00:00:15,1970-01-01T00:00:20,118")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}