This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bb01c3070ea [FLINK-34115][table-planner] Fix TableAggregateITCase 
unstable test
bb01c3070ea is described below

commit bb01c3070ea476e0410a867e930bda7518e1b4aa
Author: Jane Chan <[email protected]>
AuthorDate: Tue Jan 30 10:22:45 2024 +0800

    [FLINK-34115][table-planner] Fix TableAggregateITCase unstable test
    
    This closes #24214
---
 .../stream/table/TableAggregateITCase.scala        | 29 +++++++++++-----------
 .../table/planner/runtime/utils/TestData.scala     |  8 ++++++
 2 files changed, 22 insertions(+), 15 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
index 2e2b70ba983..ea2c9b7ce6c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
@@ -21,7 +21,8 @@ import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
-import 
org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableAggFunctions, 
StreamingWithStateTestBase, TestingRetractSink}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import 
org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableAggFunctions, 
StreamingWithStateTestBase, TestData, TestingRetractSink}
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.OverloadedDoubleMaxFunction
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.runtime.utils.TestData.tupleData3
@@ -39,24 +40,21 @@ import java.time.Duration
 @ExtendWith(Array(classOf[ParameterizedTestExtension]))
 class TableAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
 
-  var myTable: Table = _
-
   @BeforeEach
   override def before(): Unit = {
     super.before()
     tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
     // Create a Table from the array of Rows
-    myTable = tEnv.fromValues(
-      DataTypes.ROW(
-        DataTypes.FIELD("id", DataTypes.INT),
-        DataTypes.FIELD("name", DataTypes.STRING),
-        DataTypes.FIELD("price", DataTypes.INT)),
-      row(1, "Latte", 6: java.lang.Integer),
-      row(2, "Milk", 3: java.lang.Integer),
-      row(3, "Breve", 5: java.lang.Integer),
-      row(4, "Mocha", 8: java.lang.Integer),
-      row(5, "Tea", 4: java.lang.Integer)
-    )
+    tEnv.executeSql(s"""
+                       |CREATE TABLE myTable (
+                       |  `id` INT,
+                       |  `name` STRING,
+                       |  `price` INT
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'data-id' = 
'${TestValuesTableFactory.registerData(TestData.tupleData4)}'
+                       |)
+                       |""".stripMargin)
   }
 
   @TestTemplate
@@ -118,7 +116,8 @@ class TableAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTes
 
   def checkRank(func: String, expectedResult: List[String]): Unit = {
     val resultTable =
-      myTable
+      tEnv
+        .from("myTable")
         .flatAggregate(call(func, $("price")).as("top_price", "rank"))
         .select($("top_price"), $("rank"))
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
index ec966842482..75ec5759a8d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
@@ -281,6 +281,14 @@ object TestData {
     data
   }
 
+  lazy val tupleData4: Seq[Row] = Seq(
+    row(1, "Latte", 6),
+    row(2, "Milk", 3),
+    row(3, "Breve", 5),
+    row(4, "Mocha", 8),
+    row(5, "Tea", 4)
+  )
+
   lazy val data3: Seq[Row] = tupleData3.map(d => row(d.productIterator.toList: 
_*))
 
   val nullablesOfData3 = Array(true, true, true)

Reply via email to