This is an automated email from the ASF dual-hosted git repository.
jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bb01c3070ea [FLINK-34115][table-planner] Fix TableAggregateITCase
unstable test
bb01c3070ea is described below
commit bb01c3070ea476e0410a867e930bda7518e1b4aa
Author: Jane Chan <[email protected]>
AuthorDate: Tue Jan 30 10:22:45 2024 +0800
[FLINK-34115][table-planner] Fix TableAggregateITCase unstable test
This closes #24214
---
.../stream/table/TableAggregateITCase.scala | 29 +++++++++++-----------
.../table/planner/runtime/utils/TestData.scala | 8 ++++++
2 files changed, 22 insertions(+), 15 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
index 2e2b70ba983..ea2c9b7ce6c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
@@ -21,7 +21,8 @@ import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
-import
org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableAggFunctions,
StreamingWithStateTestBase, TestingRetractSink}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import
org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableAggFunctions,
StreamingWithStateTestBase, TestData, TestingRetractSink}
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.OverloadedDoubleMaxFunction
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.runtime.utils.TestData.tupleData3
@@ -39,24 +40,21 @@ import java.time.Duration
@ExtendWith(Array(classOf[ParameterizedTestExtension]))
class TableAggregateITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase(mode) {
- var myTable: Table = _
-
@BeforeEach
override def before(): Unit = {
super.before()
tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
// Create a Table from the array of Rows
- myTable = tEnv.fromValues(
- DataTypes.ROW(
- DataTypes.FIELD("id", DataTypes.INT),
- DataTypes.FIELD("name", DataTypes.STRING),
- DataTypes.FIELD("price", DataTypes.INT)),
- row(1, "Latte", 6: java.lang.Integer),
- row(2, "Milk", 3: java.lang.Integer),
- row(3, "Breve", 5: java.lang.Integer),
- row(4, "Mocha", 8: java.lang.Integer),
- row(5, "Tea", 4: java.lang.Integer)
- )
+ tEnv.executeSql(s"""
+ |CREATE TABLE myTable (
+ | `id` INT,
+ | `name` STRING,
+ | `price` INT
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' =
'${TestValuesTableFactory.registerData(TestData.tupleData4)}'
+ |)
+ |""".stripMargin)
}
@TestTemplate
@@ -118,7 +116,8 @@ class TableAggregateITCase(mode: StateBackendMode) extends
StreamingWithStateTes
def checkRank(func: String, expectedResult: List[String]): Unit = {
val resultTable =
- myTable
+ tEnv
+ .from("myTable")
.flatAggregate(call(func, $("price")).as("top_price", "rank"))
.select($("top_price"), $("rank"))
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
index ec966842482..75ec5759a8d 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
@@ -281,6 +281,14 @@ object TestData {
data
}
+ lazy val tupleData4: Seq[Row] = Seq(
+ row(1, "Latte", 6),
+ row(2, "Milk", 3),
+ row(3, "Breve", 5),
+ row(4, "Mocha", 8),
+ row(5, "Tea", 4)
+ )
+
lazy val data3: Seq[Row] = tupleData3.map(d => row(d.productIterator.toList:
_*))
val nullablesOfData3 = Array(true, true, true)