This is an automated email from the ASF dual-hosted git repository.
jchan pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new d903990e1ed [FLINK-34115][table-planner] Fix TableAggregateITCase
unstable test
d903990e1ed is described below
commit d903990e1ede423e92b6d6ec93876500519aab14
Author: Jane Chan <[email protected]>
AuthorDate: Tue Jan 30 15:56:02 2024 +0800
[FLINK-34115][table-planner] Fix TableAggregateITCase unstable test
This closes #24222
---
.../stream/table/TableAggregateITCase.scala | 28 +++++++++++-----------
.../table/planner/runtime/utils/TestData.scala | 8 +++++++
2 files changed, 22 insertions(+), 14 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
index d90bdfa9232..cfc828545ab 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import
org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableAggFunctions,
StreamingWithStateTestBase, TestData, TestingRetractSink}
import
org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableAggFunctions,
StreamingWithStateTestBase, TestingRetractSink}
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.OverloadedDoubleMaxFunction
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
@@ -37,23 +39,20 @@ import org.junit.runners.Parameterized
@RunWith(classOf[Parameterized])
class TableAggregateITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase(mode) {
- var myTable: Table = _
-
@Before
override def before(): Unit = {
super.before()
tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
- myTable = tEnv.fromValues(
- DataTypes.ROW(
- DataTypes.FIELD("id", DataTypes.INT),
- DataTypes.FIELD("name", DataTypes.STRING),
- DataTypes.FIELD("price", DataTypes.INT)),
- row(1, "Latte", 6: java.lang.Integer),
- row(2, "Milk", 3: java.lang.Integer),
- row(3, "Breve", 5: java.lang.Integer),
- row(4, "Mocha", 8: java.lang.Integer),
- row(5, "Tea", 4: java.lang.Integer)
- )
+ tEnv.executeSql(s"""
+ |CREATE TABLE myTable (
+ | `id` INT,
+ | `name` STRING,
+ | `price` INT
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' =
'${TestValuesTableFactory.registerData(TestData.tupleData4)}'
+ |)
+ |""".stripMargin)
}
@Test
@@ -115,7 +114,8 @@ class TableAggregateITCase(mode: StateBackendMode) extends
StreamingWithStateTes
def checkRank(func: String, expectedResult: List[String]): Unit = {
val resultTable =
- myTable
+ tEnv
+ .from("myTable")
.flatAggregate(call(func, $("price")).as("top_price", "rank"))
.select($("top_price"), $("rank"))
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
index 2ac34b05e05..0c23c276d16 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
@@ -283,6 +283,14 @@ object TestData {
lazy val data3: Seq[Row] = tupleData3.map(d => row(d.productIterator.toList:
_*))
+ lazy val tupleData4: Seq[Row] = Seq(
+ row(1, "Latte", 6),
+ row(2, "Milk", 3),
+ row(3, "Breve", 5),
+ row(4, "Mocha", 8),
+ row(5, "Tea", 4)
+ )
+
val nullablesOfData3 = Array(true, true, true)
val nullablesOfData4 = Array(true, true, true)