This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 5384ab2f [FLINK-37714][Connector/Kafka] Generate entities in a 
temporary folder while tests
5384ab2f is described below

commit 5384ab2f3a0bc42e6c3019c151f3b72bb4983926
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Apr 24 12:14:40 2025 +0200

    [FLINK-37714][Connector/Kafka] Generate entities in a temporary folder 
while tests
    
    * [FLINK-37714] Generate entities in a temporary folder while tests
    
    * [FLINK-37714] Address feedback and migrate KafkaTableITCase to JUnit5
---
 .../kafka/table/KafkaChangelogTableITCase.java     |  16 +-
 .../connectors/kafka/table/KafkaTableITCase.java   | 170 ++++++++++++---------
 .../connectors/kafka/table/KafkaTableTestBase.java |  20 +--
 .../kafka/table/UpsertKafkaTableITCase.java        | 101 ++++++------
 4 files changed, 170 insertions(+), 137 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index cdc442a3..d77e3dfb 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -31,8 +31,8 @@ import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.time.ZoneId;
@@ -44,17 +44,17 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUt
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.waitingExpectedResults;
 
 /** IT cases for Kafka with changelog format for Table API & SQL. */
-public class KafkaChangelogTableITCase extends KafkaTableTestBase {
+class KafkaChangelogTableITCase extends KafkaTableTestBase {
 
-    @Before
-    public void before() {
+    @BeforeEach
+    void before() {
         // we have to use single parallelism,
         // because we will count the messages in sink to terminate the job
         env.setParallelism(1);
     }
 
     @Test
-    public void testKafkaDebeziumChangelogSource() throws Exception {
+    void testKafkaDebeziumChangelogSource() throws Exception {
         final String topic = "changelog_topic";
         createTestTopic(topic, 1, 1);
 
@@ -180,7 +180,7 @@ public class KafkaChangelogTableITCase extends 
KafkaTableTestBase {
     }
 
     @Test
-    public void testKafkaCanalChangelogSource() throws Exception {
+    void testKafkaCanalChangelogSource() throws Exception {
         final String topic = "changelog_canal";
         createTestTopic(topic, 1, 1);
 
@@ -320,7 +320,7 @@ public class KafkaChangelogTableITCase extends 
KafkaTableTestBase {
     }
 
     @Test
-    public void testKafkaMaxwellChangelogSource() throws Exception {
+    void testKafkaMaxwellChangelogSource() throws Exception {
         final String topic = "changelog_maxwell";
         createTestTopic(topic, 1, 1);
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 0a84ecb2..e90c2191 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -46,10 +46,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.assertj.core.api.Assertions;
 import org.assertj.core.api.ThrowingConsumer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -83,29 +82,22 @@ import static org.assertj.core.api.Assertions.fail;
 import static org.assertj.core.api.HamcrestCondition.matching;
 
 /** Basic IT cases for the Kafka table source and sink. */
-@RunWith(Parameterized.class)
-public class KafkaTableITCase extends KafkaTableTestBase {
+class KafkaTableITCase extends KafkaTableTestBase {
 
-    private static final String JSON_FORMAT = "json";
-    private static final String AVRO_FORMAT = "avro";
-    private static final String CSV_FORMAT = "csv";
-
-    @Parameterized.Parameter public String format;
-
-    @Parameterized.Parameters(name = "format = {0}")
-    public static Collection<String> parameters() {
-        return Arrays.asList(JSON_FORMAT, AVRO_FORMAT, CSV_FORMAT);
+    private static Collection<String> formats() {
+        return Arrays.asList("avro", "csv", "json");
     }
 
-    @Before
-    public void before() {
+    @BeforeEach
+    void before() {
         // we have to use single parallelism,
         // because we will count the messages in sink to terminate the job
         env.setParallelism(1);
     }
 
-    @Test
-    public void testKafkaSourceSink() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaSourceSink(final String format) throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "tstopic_" + format + "_" + UUID.randomUUID();
@@ -138,7 +130,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         topic,
                         bootstraps,
                         groupId,
-                        formatOptions());
+                        formatOptions(format));
 
         tEnv.executeSql(createTable);
 
@@ -195,8 +187,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic);
     }
 
-    @Test
-    public void testKafkaSourceSinkWithTopicList() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaSourceSinkWithTopicList(final String format) throws 
Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic1 = "topics1_" + format + "_" + UUID.randomUUID();
@@ -230,7 +223,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         String.join(";", Arrays.asList(topic1, topic2)),
                         bootstraps,
                         groupId,
-                        formatOptions());
+                        formatOptions(format));
         final String createTopic1Table =
                 String.format(
                         createTableTemplate,
@@ -239,7 +232,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         topic1,
                         bootstraps,
                         groupId,
-                        formatOptions());
+                        formatOptions(format));
         final String createTopic2Table =
                 String.format(
                         createTableTemplate,
@@ -248,7 +241,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         topic2,
                         bootstraps,
                         groupId,
-                        formatOptions());
+                        formatOptions(format));
 
         tEnv.executeSql(createTopicListTable);
         tEnv.executeSql(createTopic1Table);
@@ -276,8 +269,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic2);
     }
 
-    @Test
-    public void testKafkaSourceSinkWithTopicPattern() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaSourceSinkWithTopicPattern(final String format) throws 
Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic1 = "topics1_" + format + "_" + UUID.randomUUID();
@@ -312,7 +306,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         topicPattern,
                         bootstraps,
                         groupId,
-                        formatOptions());
+                        formatOptions(format));
         final String createTopic1Table =
                 String.format(
                         createTableTemplate,
@@ -321,7 +315,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         topic1,
                         bootstraps,
                         groupId,
-                        formatOptions());
+                        formatOptions(format));
         final String createTopic2Table =
                 String.format(
                         createTableTemplate,
@@ -330,7 +324,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         topic2,
                         bootstraps,
                         groupId,
-                        formatOptions());
+                        formatOptions(format));
 
         tEnv.executeSql(createTopicPatternTable);
         tEnv.executeSql(createTopic1Table);
@@ -359,8 +353,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic2);
     }
 
-    @Test
-    public void testExactlyOnceSink() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testExactlyOnceSink(final String format) throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "topics_" + format + "_" + UUID.randomUUID();
@@ -391,7 +386,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         TransactionNamingStrategy.POOLING,
                         topic,
                         bootstraps,
-                        formatOptions()));
+                        formatOptions(format)));
 
         List<Row> values =
                 Arrays.asList(Row.of(1, 1102, "behavior 1"), Row.of(2, 1103, 
"behavior 2"));
@@ -408,8 +403,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic);
     }
 
-    @Test
-    public void testKafkaSourceEmptyResultOnDeletedOffsets() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaSourceEmptyResultOnDeletedOffsets(final String format) 
throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "bounded_" + format + "_" + UUID.randomUUID();
@@ -439,7 +435,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         topic,
                         bootstraps,
                         groupId,
-                        formatOptions());
+                        formatOptions(format));
         tEnv.executeSql(createTable);
         List<Row> values =
                 Arrays.asList(
@@ -464,8 +460,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic);
     }
 
-    @Test
-    public void testKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaSourceSinkWithBoundedSpecificOffsets(final String format) 
throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "bounded_" + format + "_" + UUID.randomUUID();
@@ -495,7 +492,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         topic,
                         bootstraps,
                         groupId,
-                        formatOptions());
+                        formatOptions(format));
 
         tEnv.executeSql(createTable);
 
@@ -518,8 +515,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic);
     }
 
-    @Test
-    public void testKafkaSourceSinkWithBoundedTimestamp() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaSourceSinkWithBoundedTimestamp(final String format) throws 
Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "bounded_" + format + "_" + UUID.randomUUID();
@@ -550,7 +548,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         topic,
                         bootstraps,
                         groupId,
-                        formatOptions());
+                        formatOptions(format));
 
         tEnv.executeSql(createTable);
 
@@ -575,8 +573,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic);
     }
 
-    @Test
-    public void testKafkaTableWithMultipleTopics() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaTableWithMultipleTopics(final String format) throws 
Exception {
         // ---------- create source and sink tables -------------------
         String tableTemp =
                 "create table %s (\n"
@@ -614,7 +613,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                                             topics.get(index),
                                             bootstraps,
                                             groupId,
-                                            formatOptions()));
+                                            formatOptions(format)));
                         });
         // create source table
         tEnv.executeSql(
@@ -625,7 +624,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                         String.join(";", topics),
                         bootstraps,
                         groupId,
-                        formatOptions()));
+                        formatOptions(format)));
 
         // ---------- Prepare data in Kafka topics -------------------
         String insertTemp =
@@ -668,8 +667,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         topics.forEach(super::deleteTestTopic);
     }
 
-    @Test
-    public void testKafkaSourceSinkWithMetadata() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaSourceSinkWithMetadata(final String format) throws Exception 
{
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "metadata_topic_" + format + "_" + 
UUID.randomUUID();
@@ -701,7 +701,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
                                 + "  'scan.startup.mode' = 
'earliest-offset',\n"
                                 + "  %s\n"
                                 + ")",
-                        topic, bootstraps, groupId, formatOptions());
+                        topic, bootstraps, groupId, formatOptions(format));
         tEnv.executeSql(createTable);
 
         String initialValues =
@@ -760,8 +760,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic);
     }
 
-    @Test
-    public void testKafkaSourceSinkWithKeyAndPartialValue() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaSourceSinkWithKeyAndPartialValue(final String format) throws 
Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "key_partial_value_topic_" + format + "_" + 
UUID.randomUUID();
@@ -841,8 +842,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic);
     }
 
-    @Test
-    public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaSourceSinkWithKeyAndFullValue(final String format) throws 
Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "key_full_value_topic_" + format + "_" + 
UUID.randomUUID();
@@ -919,8 +921,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic);
     }
 
-    @Test
-    public void testKafkaTemporalJoinChangelog() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaTemporalJoinChangelog(final String format) throws Exception {
         // Set the session time zone to UTC, because the next `METADATA FROM
         // 'value.source.timestamp'` DDL
         // will use the session time zone when convert the changelog time from 
milliseconds to
@@ -1062,8 +1065,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         tEnv.executeSql(insertSql).await();
     }
 
-    @Test
-    public void testPerPartitionWatermarkKafka() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testPerPartitionWatermarkKafka(final String format) throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "per_partition_watermark_topic_" + format + "_" + 
UUID.randomUUID();
@@ -1152,8 +1156,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic);
     }
 
-    @Test
-    public void testPerPartitionWatermarkWithIdleSource() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testPerPartitionWatermarkWithIdleSource(final String format) throws 
Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "idle_partition_watermark_topic_" + format + "_" 
+ UUID.randomUUID();
@@ -1227,8 +1232,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic);
     }
 
-    @Test
-    public void testLatestOffsetStrategyResume() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testLatestOffsetStrategyResume(final String format) throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "latest_offset_resume_topic_" + format + "_" + 
UUID.randomUUID();
@@ -1359,19 +1365,22 @@ public class KafkaTableITCase extends 
KafkaTableTestBase {
         cleanupTopic(topic);
     }
 
-    @Test
-    public void testStartFromGroupOffsetsLatest() throws Exception {
-        testStartFromGroupOffsets("latest");
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testStartFromGroupOffsetsLatest(final String format) throws Exception 
{
+        testStartFromGroupOffsets("latest", format);
     }
 
-    @Test
-    public void testStartFromGroupOffsetsEarliest() throws Exception {
-        testStartFromGroupOffsets("earliest");
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testStartFromGroupOffsetsEarliest(final String format) throws 
Exception {
+        testStartFromGroupOffsets("earliest", format);
     }
 
-    @Test
-    public void testStartFromGroupOffsetsNone() {
-        Assertions.assertThatThrownBy(() -> 
testStartFromGroupOffsetsWithNoneResetStrategy())
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testStartFromGroupOffsetsNone(final String format) {
+        Assertions.assertThatThrownBy(() -> 
testStartFromGroupOffsetsWithNoneResetStrategy(format))
                 
.satisfies(anyCauseMatches(NoOffsetForPartitionException.class));
     }
 
@@ -1402,7 +1411,12 @@ public class KafkaTableITCase extends KafkaTableTestBase 
{
     }
 
     private TableResult startFromGroupOffset(
-            String tableName, String topic, String groupId, String 
resetStrategy, String sinkName)
+            String tableName,
+            String topic,
+            String groupId,
+            String resetStrategy,
+            String sinkName,
+            String format)
             throws ExecutionException, InterruptedException {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
@@ -1467,7 +1481,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         return tEnv.executeSql("INSERT INTO " + sinkName + " SELECT * FROM " + 
tableName);
     }
 
-    private void testStartFromGroupOffsets(String resetStrategy) throws 
Exception {
+    private void testStartFromGroupOffsets(String resetStrategy, String 
format) throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String tableName = "Table" + format + resetStrategy;
@@ -1481,7 +1495,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
 
         TableResult tableResult = null;
         try {
-            tableResult = startFromGroupOffset(tableName, topic, groupId, 
resetStrategy, sinkName);
+            tableResult =
+                    startFromGroupOffset(
+                            tableName, topic, groupId, resetStrategy, 
sinkName, format);
             if ("latest".equals(resetStrategy)) {
                 expected = appendNewData(topic, tableName, groupId, 
expected.size());
             }
@@ -1493,7 +1509,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         }
     }
 
-    private void testStartFromGroupOffsetsWithNoneResetStrategy()
+    private void testStartFromGroupOffsetsWithNoneResetStrategy(final String 
format)
             throws ExecutionException, InterruptedException {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
@@ -1504,7 +1520,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
 
         TableResult tableResult = null;
         try {
-            tableResult = startFromGroupOffset(tableName, topic, groupId, 
resetStrategy, "MySink");
+            tableResult =
+                    startFromGroupOffset(
+                            tableName, topic, groupId, resetStrategy, 
"MySink", format);
             tableResult.await();
         } finally {
             // ------------- cleanup -------------------
@@ -1530,7 +1548,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         }
     }
 
-    private String formatOptions() {
+    private String formatOptions(final String format) {
         return String.format("'format' = '%s'", format);
     }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index 8bac3628..5999df09 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -37,12 +37,13 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -56,14 +57,15 @@ import java.util.TimerTask;
 import java.util.stream.Collectors;
 
 /** Base class for Kafka Table IT Cases. */
-public abstract class KafkaTableTestBase extends AbstractTestBase {
+@Testcontainers
+abstract class KafkaTableTestBase extends AbstractTestBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTableTestBase.class);
 
     private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
     private static final int zkTimeoutMills = 30000;
 
-    @ClassRule
+    @Container
     public static final KafkaContainer KAFKA_CONTAINER =
             KafkaUtil.createKafkaContainer(KafkaTableTestBase.class)
                     .withEmbeddedZookeeper()
@@ -80,8 +82,8 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
     // Timer for scheduling logging task if the test hangs
     private final Timer loggingTimer = new Timer("Debug Logging Timer");
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         Configuration configuration = new Configuration();
         configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
         env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
@@ -101,8 +103,8 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
                 });
     }
 
-    @After
-    public void after() {
+    @AfterEach
+    void after() {
         // Cancel timer for debug logging
         cancelTimeoutLogger();
     }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
index 1a6bf7e1..7677f984 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
@@ -23,18 +23,19 @@ import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.utils.LegacyRowResource;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowUtils;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -56,38 +57,42 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.HamcrestCondition.matching;
 
 /** Upsert-kafka IT cases. */
-@RunWith(Parameterized.class)
-public class UpsertKafkaTableITCase extends KafkaTableTestBase {
+class UpsertKafkaTableITCase extends KafkaTableTestBase {
 
-    private static final String JSON_FORMAT = "json";
-    private static final String CSV_FORMAT = "csv";
-    private static final String AVRO_FORMAT = "avro";
-
-    @Parameterized.Parameter public String format;
-
-    @Parameterized.Parameters(name = "format = {0}")
-    public static Object[] parameters() {
-        return new Object[] {JSON_FORMAT, CSV_FORMAT, AVRO_FORMAT};
+    private static Collection<String> formats() {
+        return Arrays.asList("avro", "csv", "json");
     }
 
-    @Rule public final LegacyRowResource usesLegacyRows = 
LegacyRowResource.INSTANCE;
-
     private static final String USERS_TOPIC = "users";
     private static final String WORD_COUNT_TOPIC = "word_count";
 
-    @Test
-    public void testAggregate() throws Exception {
+    @BeforeEach
+    void setup() {
+        super.setup();
+        RowUtils.USE_LEGACY_TO_STRING = true;
+    }
+
+    @AfterEach
+    void after() {
+        super.after();
+        RowUtils.USE_LEGACY_TO_STRING = false;
+    }
+
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testAggregate(final String format) throws Exception {
         String topic = WORD_COUNT_TOPIC + "_" + format;
         createTestTopic(topic, 4, 1);
         // -------------   test   ---------------
-        wordCountToUpsertKafka(topic);
-        wordFreqToUpsertKafka(topic);
+        wordCountToUpsertKafka(topic, format);
+        wordFreqToUpsertKafka(topic, format);
         // ------------- clean up ---------------
         deleteTestTopic(topic);
     }
 
-    @Test
-    public void testTemporalJoin() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testTemporalJoin(final String format) throws Exception {
         String topic = USERS_TOPIC + "_" + format;
         createTestTopic(topic, 2, 1);
         // -------------   test   ---------------
@@ -102,15 +107,16 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         // partition and
         // use the Kafka DefaultPartition to repartition the records.
         env.setParallelism(1);
-        writeChangelogToUpsertKafkaWithMetadata(topic);
+        writeChangelogToUpsertKafkaWithMetadata(topic, format);
         env.setParallelism(2);
-        temporalJoinUpsertKafka(topic);
+        temporalJoinUpsertKafka(topic, format);
         // ------------- clean up ---------------
         deleteTestTopic(topic);
     }
 
-    @Test
-    public void testBufferedUpsertSink() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testBufferedUpsertSink(final String format) throws Exception {
         final String topic = "buffered_upsert_topic_" + format;
         createTestTopic(topic, 1, 1);
         String bootstraps = getBootstrapServers();
@@ -198,8 +204,9 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
-    @Test
-    public void testBufferedUpsertSinkWithoutAssigningWatermark() throws 
Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testBufferedUpsertSinkWithoutAssigningWatermark(final String format) 
throws Exception {
         final String topic = 
"buffered_upsert_topic_without_assigning_watermark_" + format;
         createTestTopic(topic, 1, 1);
         String bootstraps = getBootstrapServers();
@@ -263,8 +270,9 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
-    @Test
-    public void testSourceSinkWithKeyAndPartialValue() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testSourceSinkWithKeyAndPartialValue(final String format) throws 
Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "key_partial_value_topic_" + format;
@@ -361,8 +369,9 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
-    @Test
-    public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testKafkaSourceSinkWithKeyAndFullValue(final String format) throws 
Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
         final String topic = "key_full_value_topic_" + format;
@@ -456,8 +465,9 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
-    @Test
-    public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets(final String 
format) throws Exception {
         final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
         createTestTopic(topic, 1, 1);
 
@@ -509,8 +519,9 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
-    @Test
-    public void testUpsertKafkaSourceSinkWithBoundedTimestamp() throws 
Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testUpsertKafkaSourceSinkWithBoundedTimestamp(final String format) 
throws Exception {
         final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
         createTestTopic(topic, 1, 1);
 
@@ -595,8 +606,9 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
      * Tests that setting bounded end offset that is before the earliest 
offset results in 0
      * results.
      */
-    @Test
-    public void testUpsertKafkaSourceSinkWithZeroLengthBoundedness() throws 
Exception {
+    @ParameterizedTest(name = "format: {0}")
+    @MethodSource("formats")
+    void testUpsertKafkaSourceSinkWithZeroLengthBoundedness(final String 
format) throws Exception {
         final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
         createTestTopic(topic, 1, 1);
 
@@ -652,7 +664,7 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
-    private void wordCountToUpsertKafka(String wordCountTable) throws 
Exception {
+    private void wordCountToUpsertKafka(String wordCountTable, String format) 
throws Exception {
         String bootstraps = getBootstrapServers();
 
         // ------------- test data ---------------
@@ -765,7 +777,7 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         comparedWithKeyAndOrder(expected2, result2, new int[] {0});
     }
 
-    private void wordFreqToUpsertKafka(String wordCountTable) throws Exception 
{
+    private void wordFreqToUpsertKafka(String wordCountTable, String format) 
throws Exception {
         // ------------- test data ---------------
 
         final List<String> expectedData = Arrays.asList("3,1", "2,1");
@@ -802,7 +814,8 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         query.getJobClient().get().cancel();
     }
 
-    private void writeChangelogToUpsertKafkaWithMetadata(String userTable) 
throws Exception {
+    private void writeChangelogToUpsertKafkaWithMetadata(String userTable, 
String format)
+            throws Exception {
         String bootstraps = getBootstrapServers();
 
         // ------------- test data ---------------
@@ -1050,7 +1063,7 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         assertThat(result).satisfies(matching(deepEqualTo(expected, true)));
     }
 
-    private void temporalJoinUpsertKafka(String userTable) throws Exception {
+    private void temporalJoinUpsertKafka(String userTable, String format) 
throws Exception {
         // ------------- test data ---------------
         List<Row> input =
                 Arrays.asList(


Reply via email to