reswqa commented on code in PR #22573:
URL: https://github.com/apache/flink/pull/22573#discussion_r1195112680
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala:
##########
@@ -74,13 +73,14 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode,
schedulerType: Schedule
MultipleInputITCase.rowType,
"a, b, c, nt",
MultipleInputITCase.nullables)
-
- tEnv.getConfig.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode)
- tEnv.getConfig.set(JobManagerOptions.SCHEDULER, schedulerType)
}
- @Test
- def testBasicMultipleInput(): Unit = {
+ @ParameterizedTest
+ @MethodSource(Array("parameters"))
Review Comment:
Can `ParameterizedTestExtension` be used here? It seems that all tests
require parameterization.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala:
##########
@@ -83,9 +79,12 @@ class PartitionableSinkITCase extends BatchTestBase {
"insert into sinkTable select a, max(b), c"
+ " from nonSortTable group by a, c")
.await()
- assertEquals(List("1,5,Hi", "1,5,Hi01", "1,5,Hi02"), RESULT1.sorted)
- assert(RESULT2.isEmpty)
- assertEquals(
+ assertThatIterable(RESULT1.sorted).containsExactlyElementsOf(
+ List("1,5,Hi", "1,5,Hi01", "1,5,Hi02"))
+ assertThat(RESULT2.isEmpty).isTrue
Review Comment:
```suggestion
assertThat(RESULT2).isEmpty
```
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala:
##########
@@ -31,41 +31,32 @@ import
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
import org.apache.flink.table.descriptors.DescriptorProperties
import org.apache.flink.table.descriptors.Schema.SCHEMA
import org.apache.flink.table.factories.TableSinkFactory
-import
org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase.{type4,
type_int_string, _}
+import
org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase.{type4,
_}
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.TestData._
import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink,
TableSink}
import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
import org.apache.flink.table.types.utils.TypeConversions
-import org.apache.flink.table.utils.LegacyRowResource
-import org.apache.flink.types.Row
+import org.apache.flink.types.{Row, RowUtils}
-import org.junit.{Before, Rule, Test}
-import org.junit.Assert._
-import org.junit.rules.ExpectedException
+import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import org.assertj.core.api.IterableAssert.assertThatIterable
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import java.util
import java.util.{function, ArrayList => JArrayList, LinkedList =>
JLinkedList, List => JList, Map => JMap}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
-import scala.collection.Seq
/** Test cases for [[org.apache.flink.table.sinks.PartitionableTableSink]]. */
class PartitionableSinkITCase extends BatchTestBase {
- private val _expectedException = ExpectedException.none
-
- @Rule
- def expectedEx: ExpectedException = _expectedException
-
- @Rule
- def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE
-
- @Before
+ @BeforeEach
override def before(): Unit = {
super.before()
+ RowUtils.USE_LEGACY_TO_STRING = true
Review Comment:
This can be replaced by `LegacyRowExtension` introduced in `FLINK-32060`.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala:
##########
@@ -89,7 +88,6 @@ class PartitionableSourceITCase(val sourceFetchPartitions:
Boolean, val useCatal
| 'filterable-fields' = 'id;part1;part2'
|)
|""".stripMargin
-
Review Comment:
It seems that this is un-realted change.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala:
##########
@@ -98,34 +97,33 @@ class PartitionableSinkITCase extends BatchTestBase {
"3,2,Hello02",
"3,2,Hello03",
"3,2,Hello04"
- ),
- RESULT3.sorted
- )
+ ))
}
@Test
def testInsertWithPartitionGrouping(): Unit = {
registerTableSink()
tEnv.executeSql("insert into sinkTable select a, b, c from
sortTable").await()
- assertEquals(List("1,1,Hello world", "1,1,Hello world, how are you?"),
RESULT1.toList)
- assertEquals(
- List("4,4,你好,陌生人", "4,4,你好,陌生人,我是", "4,4,你好,陌生人,我是中国人",
"4,4,你好,陌生人,我是中国人,你来自哪里?"),
- RESULT2.toList)
- assertEquals(
+ assertThatIterable(RESULT1).containsExactlyElementsOf(
Review Comment:
Can `assertThat` be used instead of `assertThatIterable` here?
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -69,28 +67,19 @@ class TableSinkITCase extends BatchTestBase {
s"insert into MySink /*+ OPTIONS('path' = '$newPath2') */ select * from
MyTable")
stmtSet.execute().await()
- Assert.assertTrue(TableTestUtil.readFromFile(resultPath).isEmpty)
+ assertThat(TableTestUtil.readFromFile(resultPath).isEmpty).isTrue
val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
val result1 = TableTestUtil.readFromFile(newPath1)
- Assert.assertEquals(expected.sorted, result1.sorted)
+ assertThat(expected.sorted).isEqualTo(result1.sorted)
val result2 = TableTestUtil.readFromFile(newPath2)
- Assert.assertEquals(expected.sorted, result2.sorted)
+ assertThat(expected.sorted).isEqualTo(result2.sorted)
}
@Test
def testCollectSinkConfiguration(): Unit = {
tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE,
MemorySize.parse("1b"))
- try {
- checkResult("SELECT 1", Seq(row(1)))
- Assert.fail("Expecting exception thrown from collect sink")
- } catch {
- case e: Exception =>
- MatcherAssert.assertThat(
- e,
- FlinkMatchers.containsMessage(
- "Please consider increasing max bytes per batch value " +
- "by setting collect-sink.batch-size.max"))
- }
+ assertThatThrownBy(() => checkResult("SELECT 1",
Seq(row(1)))).rootCause.toString.contains(
+ "Please consider increasing max bytes per batch value by setting
collect-sink.batch-size.max")
Review Comment:
```suggestion
assertThatThrownBy(() => checkResult("SELECT 1", Seq(row(1))))
.satisfies(e -> FlinkAssertions.anyCauseMatches("Please consider
increasing max bytes per batch value by setting collect-sink.batch-size.max"))
```
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala:
##########
@@ -129,8 +127,12 @@ class PartitionableSourceITCase(val sourceFetchPartitions:
Boolean, val useCatal
}
}
- @Test
- def testSimplePartitionFieldPredicate1(): Unit = {
+ @ParameterizedTest
+ @MethodSource(Array("parameters"))
Review Comment:
Maybe we can register `ParameterizedTestExtension` to avoid duplicate code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]