reswqa commented on code in PR #22573:
URL: https://github.com/apache/flink/pull/22573#discussion_r1195112680


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala:
##########
@@ -74,13 +73,14 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, 
schedulerType: Schedule
       MultipleInputITCase.rowType,
       "a, b, c, nt",
       MultipleInputITCase.nullables)
-
-    tEnv.getConfig.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode)
-    tEnv.getConfig.set(JobManagerOptions.SCHEDULER, schedulerType)
   }
 
-  @Test
-  def testBasicMultipleInput(): Unit = {
+  @ParameterizedTest
+  @MethodSource(Array("parameters"))

Review Comment:
   Can `ParameterizedTestExtension` be used here? It seems that all tests 
require parameterization.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala:
##########
@@ -83,9 +79,12 @@ class PartitionableSinkITCase extends BatchTestBase {
         "insert into sinkTable select a, max(b), c"
           + " from nonSortTable group by a, c")
       .await()
-    assertEquals(List("1,5,Hi", "1,5,Hi01", "1,5,Hi02"), RESULT1.sorted)
-    assert(RESULT2.isEmpty)
-    assertEquals(
+    assertThatIterable(RESULT1.sorted).containsExactlyElementsOf(
+      List("1,5,Hi", "1,5,Hi01", "1,5,Hi02"))
+    assertThat(RESULT2.isEmpty).isTrue

Review Comment:
   ```suggestion
       assertThat(RESULT2).isEmpty
   ```



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala:
##########
@@ -31,41 +31,32 @@ import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
 import org.apache.flink.table.descriptors.DescriptorProperties
 import org.apache.flink.table.descriptors.Schema.SCHEMA
 import org.apache.flink.table.factories.TableSinkFactory
-import 
org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase.{type4,
 type_int_string, _}
+import 
org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase.{type4,
 _}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, 
TableSink}
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 import org.apache.flink.table.types.utils.TypeConversions
-import org.apache.flink.table.utils.LegacyRowResource
-import org.apache.flink.types.Row
+import org.apache.flink.types.{Row, RowUtils}
 
-import org.junit.{Before, Rule, Test}
-import org.junit.Assert._
-import org.junit.rules.ExpectedException
+import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import org.assertj.core.api.IterableAssert.assertThatIterable
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
 import java.util
 import java.util.{function, ArrayList => JArrayList, LinkedList => 
JLinkedList, List => JList, Map => JMap}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
-import scala.collection.Seq
 
 /** Test cases for [[org.apache.flink.table.sinks.PartitionableTableSink]]. */
 class PartitionableSinkITCase extends BatchTestBase {
 
-  private val _expectedException = ExpectedException.none
-
-  @Rule
-  def expectedEx: ExpectedException = _expectedException
-
-  @Rule
-  def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE
-
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
+    RowUtils.USE_LEGACY_TO_STRING = true

Review Comment:
   This can be replaced by `LegacyRowExtension` introduced in `FLINK-32060`.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala:
##########
@@ -89,7 +88,6 @@ class PartitionableSourceITCase(val sourceFetchPartitions: 
Boolean, val useCatal
          |    'filterable-fields' = 'id;part1;part2'
          |)
          |""".stripMargin
-

Review Comment:
   It seems that this is un-realted change.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala:
##########
@@ -98,34 +97,33 @@ class PartitionableSinkITCase extends BatchTestBase {
         "3,2,Hello02",
         "3,2,Hello03",
         "3,2,Hello04"
-      ),
-      RESULT3.sorted
-    )
+      ))
   }
 
   @Test
   def testInsertWithPartitionGrouping(): Unit = {
     registerTableSink()
     tEnv.executeSql("insert into sinkTable select a, b, c from 
sortTable").await()
-    assertEquals(List("1,1,Hello world", "1,1,Hello world, how are you?"), 
RESULT1.toList)
-    assertEquals(
-      List("4,4,你好,陌生人", "4,4,你好,陌生人,我是", "4,4,你好,陌生人,我是中国人", 
"4,4,你好,陌生人,我是中国人,你来自哪里?"),
-      RESULT2.toList)
-    assertEquals(
+    assertThatIterable(RESULT1).containsExactlyElementsOf(

Review Comment:
   Can `assertThat` be used instead of `assertThatIterable` here?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -69,28 +67,19 @@ class TableSinkITCase extends BatchTestBase {
       s"insert into MySink /*+ OPTIONS('path' = '$newPath2') */ select * from 
MyTable")
     stmtSet.execute().await()
 
-    Assert.assertTrue(TableTestUtil.readFromFile(resultPath).isEmpty)
+    assertThat(TableTestUtil.readFromFile(resultPath).isEmpty).isTrue
     val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
     val result1 = TableTestUtil.readFromFile(newPath1)
-    Assert.assertEquals(expected.sorted, result1.sorted)
+    assertThat(expected.sorted).isEqualTo(result1.sorted)
     val result2 = TableTestUtil.readFromFile(newPath2)
-    Assert.assertEquals(expected.sorted, result2.sorted)
+    assertThat(expected.sorted).isEqualTo(result2.sorted)
   }
 
   @Test
   def testCollectSinkConfiguration(): Unit = {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, 
MemorySize.parse("1b"))
-    try {
-      checkResult("SELECT 1", Seq(row(1)))
-      Assert.fail("Expecting exception thrown from collect sink")
-    } catch {
-      case e: Exception =>
-        MatcherAssert.assertThat(
-          e,
-          FlinkMatchers.containsMessage(
-            "Please consider increasing max bytes per batch value " +
-              "by setting collect-sink.batch-size.max"))
-    }
+    assertThatThrownBy(() => checkResult("SELECT 1", 
Seq(row(1)))).rootCause.toString.contains(
+      "Please consider increasing max bytes per batch value by setting 
collect-sink.batch-size.max")

Review Comment:
   ```suggestion
       assertThatThrownBy(() => checkResult("SELECT 1", Seq(row(1))))
       .satisfies(e -> FlinkAssertions.anyCauseMatches("Please consider 
increasing max bytes per batch value by setting collect-sink.batch-size.max"))
   ```



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala:
##########
@@ -129,8 +127,12 @@ class PartitionableSourceITCase(val sourceFetchPartitions: 
Boolean, val useCatal
     }
   }
 
-  @Test
-  def testSimplePartitionFieldPredicate1(): Unit = {
+  @ParameterizedTest
+  @MethodSource(Array("parameters"))

Review Comment:
   Maybe we can register `ParameterizedTestExtension` to avoid duplicate code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to