This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 5384ab2f [FLINK-37714][Connector/Kafka] Generate entities in a
temporary folder while tests
5384ab2f is described below
commit 5384ab2f3a0bc42e6c3019c151f3b72bb4983926
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Apr 24 12:14:40 2025 +0200
[FLINK-37714][Connector/Kafka] Generate entities in a temporary folder
while tests
* [FLINK-37714] Generate entities in a temporary folder while tests
* [FLINK-37714] Address feedback and migrate KafkaTableITCase to JUnit5
---
.../kafka/table/KafkaChangelogTableITCase.java | 16 +-
.../connectors/kafka/table/KafkaTableITCase.java | 170 ++++++++++++---------
.../connectors/kafka/table/KafkaTableTestBase.java | 20 +--
.../kafka/table/UpsertKafkaTableITCase.java | 101 ++++++------
4 files changed, 170 insertions(+), 137 deletions(-)
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index cdc442a3..d77e3dfb 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -31,8 +31,8 @@ import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.ZoneId;
@@ -44,17 +44,17 @@ import static
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUt
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.waitingExpectedResults;
/** IT cases for Kafka with changelog format for Table API & SQL. */
-public class KafkaChangelogTableITCase extends KafkaTableTestBase {
+class KafkaChangelogTableITCase extends KafkaTableTestBase {
- @Before
- public void before() {
+ @BeforeEach
+ void before() {
// we have to use single parallelism,
// because we will count the messages in sink to terminate the job
env.setParallelism(1);
}
@Test
- public void testKafkaDebeziumChangelogSource() throws Exception {
+ void testKafkaDebeziumChangelogSource() throws Exception {
final String topic = "changelog_topic";
createTestTopic(topic, 1, 1);
@@ -180,7 +180,7 @@ public class KafkaChangelogTableITCase extends
KafkaTableTestBase {
}
@Test
- public void testKafkaCanalChangelogSource() throws Exception {
+ void testKafkaCanalChangelogSource() throws Exception {
final String topic = "changelog_canal";
createTestTopic(topic, 1, 1);
@@ -320,7 +320,7 @@ public class KafkaChangelogTableITCase extends
KafkaTableTestBase {
}
@Test
- public void testKafkaMaxwellChangelogSource() throws Exception {
+ void testKafkaMaxwellChangelogSource() throws Exception {
final String topic = "changelog_maxwell";
createTestTopic(topic, 1, 1);
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 0a84ecb2..e90c2191 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -46,10 +46,9 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
import java.time.Instant;
@@ -83,29 +82,22 @@ import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.HamcrestCondition.matching;
/** Basic IT cases for the Kafka table source and sink. */
-@RunWith(Parameterized.class)
-public class KafkaTableITCase extends KafkaTableTestBase {
+class KafkaTableITCase extends KafkaTableTestBase {
- private static final String JSON_FORMAT = "json";
- private static final String AVRO_FORMAT = "avro";
- private static final String CSV_FORMAT = "csv";
-
- @Parameterized.Parameter public String format;
-
- @Parameterized.Parameters(name = "format = {0}")
- public static Collection<String> parameters() {
- return Arrays.asList(JSON_FORMAT, AVRO_FORMAT, CSV_FORMAT);
+ private static Collection<String> formats() {
+ return Arrays.asList("avro", "csv", "json");
}
- @Before
- public void before() {
+ @BeforeEach
+ void before() {
// we have to use single parallelism,
// because we will count the messages in sink to terminate the job
env.setParallelism(1);
}
- @Test
- public void testKafkaSourceSink() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaSourceSink(final String format) throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "tstopic_" + format + "_" + UUID.randomUUID();
@@ -138,7 +130,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
topic,
bootstraps,
groupId,
- formatOptions());
+ formatOptions(format));
tEnv.executeSql(createTable);
@@ -195,8 +187,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic);
}
- @Test
- public void testKafkaSourceSinkWithTopicList() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaSourceSinkWithTopicList(final String format) throws
Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic1 = "topics1_" + format + "_" + UUID.randomUUID();
@@ -230,7 +223,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
String.join(";", Arrays.asList(topic1, topic2)),
bootstraps,
groupId,
- formatOptions());
+ formatOptions(format));
final String createTopic1Table =
String.format(
createTableTemplate,
@@ -239,7 +232,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
topic1,
bootstraps,
groupId,
- formatOptions());
+ formatOptions(format));
final String createTopic2Table =
String.format(
createTableTemplate,
@@ -248,7 +241,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
topic2,
bootstraps,
groupId,
- formatOptions());
+ formatOptions(format));
tEnv.executeSql(createTopicListTable);
tEnv.executeSql(createTopic1Table);
@@ -276,8 +269,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic2);
}
- @Test
- public void testKafkaSourceSinkWithTopicPattern() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaSourceSinkWithTopicPattern(final String format) throws
Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic1 = "topics1_" + format + "_" + UUID.randomUUID();
@@ -312,7 +306,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
topicPattern,
bootstraps,
groupId,
- formatOptions());
+ formatOptions(format));
final String createTopic1Table =
String.format(
createTableTemplate,
@@ -321,7 +315,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
topic1,
bootstraps,
groupId,
- formatOptions());
+ formatOptions(format));
final String createTopic2Table =
String.format(
createTableTemplate,
@@ -330,7 +324,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
topic2,
bootstraps,
groupId,
- formatOptions());
+ formatOptions(format));
tEnv.executeSql(createTopicPatternTable);
tEnv.executeSql(createTopic1Table);
@@ -359,8 +353,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic2);
}
- @Test
- public void testExactlyOnceSink() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testExactlyOnceSink(final String format) throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "topics_" + format + "_" + UUID.randomUUID();
@@ -391,7 +386,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
TransactionNamingStrategy.POOLING,
topic,
bootstraps,
- formatOptions()));
+ formatOptions(format)));
List<Row> values =
Arrays.asList(Row.of(1, 1102, "behavior 1"), Row.of(2, 1103,
"behavior 2"));
@@ -408,8 +403,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic);
}
- @Test
- public void testKafkaSourceEmptyResultOnDeletedOffsets() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaSourceEmptyResultOnDeletedOffsets(final String format)
throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "bounded_" + format + "_" + UUID.randomUUID();
@@ -439,7 +435,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
topic,
bootstraps,
groupId,
- formatOptions());
+ formatOptions(format));
tEnv.executeSql(createTable);
List<Row> values =
Arrays.asList(
@@ -464,8 +460,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic);
}
- @Test
- public void testKafkaSourceSinkWithBoundedSpecificOffsets() throws
Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaSourceSinkWithBoundedSpecificOffsets(final String format)
throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "bounded_" + format + "_" + UUID.randomUUID();
@@ -495,7 +492,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
topic,
bootstraps,
groupId,
- formatOptions());
+ formatOptions(format));
tEnv.executeSql(createTable);
@@ -518,8 +515,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic);
}
- @Test
- public void testKafkaSourceSinkWithBoundedTimestamp() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaSourceSinkWithBoundedTimestamp(final String format) throws
Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "bounded_" + format + "_" + UUID.randomUUID();
@@ -550,7 +548,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
topic,
bootstraps,
groupId,
- formatOptions());
+ formatOptions(format));
tEnv.executeSql(createTable);
@@ -575,8 +573,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic);
}
- @Test
- public void testKafkaTableWithMultipleTopics() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaTableWithMultipleTopics(final String format) throws
Exception {
// ---------- create source and sink tables -------------------
String tableTemp =
"create table %s (\n"
@@ -614,7 +613,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
topics.get(index),
bootstraps,
groupId,
- formatOptions()));
+ formatOptions(format)));
});
// create source table
tEnv.executeSql(
@@ -625,7 +624,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
String.join(";", topics),
bootstraps,
groupId,
- formatOptions()));
+ formatOptions(format)));
// ---------- Prepare data in Kafka topics -------------------
String insertTemp =
@@ -668,8 +667,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
topics.forEach(super::deleteTestTopic);
}
- @Test
- public void testKafkaSourceSinkWithMetadata() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaSourceSinkWithMetadata(final String format) throws Exception
{
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "metadata_topic_" + format + "_" +
UUID.randomUUID();
@@ -701,7 +701,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
+ " 'scan.startup.mode' =
'earliest-offset',\n"
+ " %s\n"
+ ")",
- topic, bootstraps, groupId, formatOptions());
+ topic, bootstraps, groupId, formatOptions(format));
tEnv.executeSql(createTable);
String initialValues =
@@ -760,8 +760,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic);
}
- @Test
- public void testKafkaSourceSinkWithKeyAndPartialValue() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaSourceSinkWithKeyAndPartialValue(final String format) throws
Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "key_partial_value_topic_" + format + "_" +
UUID.randomUUID();
@@ -841,8 +842,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic);
}
- @Test
- public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaSourceSinkWithKeyAndFullValue(final String format) throws
Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "key_full_value_topic_" + format + "_" +
UUID.randomUUID();
@@ -919,8 +921,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic);
}
- @Test
- public void testKafkaTemporalJoinChangelog() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaTemporalJoinChangelog(final String format) throws Exception {
// Set the session time zone to UTC, because the next `METADATA FROM
// 'value.source.timestamp'` DDL
// will use the session time zone when convert the changelog time from
milliseconds to
@@ -1062,8 +1065,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
tEnv.executeSql(insertSql).await();
}
- @Test
- public void testPerPartitionWatermarkKafka() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testPerPartitionWatermarkKafka(final String format) throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "per_partition_watermark_topic_" + format + "_" +
UUID.randomUUID();
@@ -1152,8 +1156,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic);
}
- @Test
- public void testPerPartitionWatermarkWithIdleSource() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testPerPartitionWatermarkWithIdleSource(final String format) throws
Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "idle_partition_watermark_topic_" + format + "_"
+ UUID.randomUUID();
@@ -1227,8 +1232,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
cleanupTopic(topic);
}
- @Test
- public void testLatestOffsetStrategyResume() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testLatestOffsetStrategyResume(final String format) throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "latest_offset_resume_topic_" + format + "_" +
UUID.randomUUID();
@@ -1359,19 +1365,22 @@ public class KafkaTableITCase extends
KafkaTableTestBase {
cleanupTopic(topic);
}
- @Test
- public void testStartFromGroupOffsetsLatest() throws Exception {
- testStartFromGroupOffsets("latest");
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testStartFromGroupOffsetsLatest(final String format) throws Exception
{
+ testStartFromGroupOffsets("latest", format);
}
- @Test
- public void testStartFromGroupOffsetsEarliest() throws Exception {
- testStartFromGroupOffsets("earliest");
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testStartFromGroupOffsetsEarliest(final String format) throws
Exception {
+ testStartFromGroupOffsets("earliest", format);
}
- @Test
- public void testStartFromGroupOffsetsNone() {
- Assertions.assertThatThrownBy(() ->
testStartFromGroupOffsetsWithNoneResetStrategy())
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testStartFromGroupOffsetsNone(final String format) {
+ Assertions.assertThatThrownBy(() ->
testStartFromGroupOffsetsWithNoneResetStrategy(format))
.satisfies(anyCauseMatches(NoOffsetForPartitionException.class));
}
@@ -1402,7 +1411,12 @@ public class KafkaTableITCase extends KafkaTableTestBase
{
}
private TableResult startFromGroupOffset(
- String tableName, String topic, String groupId, String
resetStrategy, String sinkName)
+ String tableName,
+ String topic,
+ String groupId,
+ String resetStrategy,
+ String sinkName,
+ String format)
throws ExecutionException, InterruptedException {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
@@ -1467,7 +1481,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
return tEnv.executeSql("INSERT INTO " + sinkName + " SELECT * FROM " +
tableName);
}
- private void testStartFromGroupOffsets(String resetStrategy) throws
Exception {
+ private void testStartFromGroupOffsets(String resetStrategy, String
format) throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String tableName = "Table" + format + resetStrategy;
@@ -1481,7 +1495,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
TableResult tableResult = null;
try {
- tableResult = startFromGroupOffset(tableName, topic, groupId,
resetStrategy, sinkName);
+ tableResult =
+ startFromGroupOffset(
+ tableName, topic, groupId, resetStrategy,
sinkName, format);
if ("latest".equals(resetStrategy)) {
expected = appendNewData(topic, tableName, groupId,
expected.size());
}
@@ -1493,7 +1509,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
}
}
- private void testStartFromGroupOffsetsWithNoneResetStrategy()
+ private void testStartFromGroupOffsetsWithNoneResetStrategy(final String
format)
throws ExecutionException, InterruptedException {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
@@ -1504,7 +1520,9 @@ public class KafkaTableITCase extends KafkaTableTestBase {
TableResult tableResult = null;
try {
- tableResult = startFromGroupOffset(tableName, topic, groupId,
resetStrategy, "MySink");
+ tableResult =
+ startFromGroupOffset(
+ tableName, topic, groupId, resetStrategy,
"MySink", format);
tableResult.await();
} finally {
// ------------- cleanup -------------------
@@ -1530,7 +1548,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
}
}
- private String formatOptions() {
+ private String formatOptions(final String format) {
return String.format("'format' = '%s'", format);
}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index 8bac3628..5999df09 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -37,12 +37,13 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import java.time.Duration;
import java.util.ArrayList;
@@ -56,14 +57,15 @@ import java.util.TimerTask;
import java.util.stream.Collectors;
/** Base class for Kafka Table IT Cases. */
-public abstract class KafkaTableTestBase extends AbstractTestBase {
+@Testcontainers
+abstract class KafkaTableTestBase extends AbstractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaTableTestBase.class);
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final int zkTimeoutMills = 30000;
- @ClassRule
+ @Container
public static final KafkaContainer KAFKA_CONTAINER =
KafkaUtil.createKafkaContainer(KafkaTableTestBase.class)
.withEmbeddedZookeeper()
@@ -80,8 +82,8 @@ public abstract class KafkaTableTestBase extends
AbstractTestBase {
// Timer for scheduling logging task if the test hangs
private final Timer loggingTimer = new Timer("Debug Logging Timer");
- @Before
- public void setup() {
+ @BeforeEach
+ void setup() {
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
@@ -101,8 +103,8 @@ public abstract class KafkaTableTestBase extends
AbstractTestBase {
});
}
- @After
- public void after() {
+ @AfterEach
+ void after() {
// Cancel timer for debug logging
cancelTimeoutLogger();
}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
index 1a6bf7e1..7677f984 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
@@ -23,18 +23,19 @@ import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -56,38 +57,42 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.HamcrestCondition.matching;
/** Upsert-kafka IT cases. */
-@RunWith(Parameterized.class)
-public class UpsertKafkaTableITCase extends KafkaTableTestBase {
+class UpsertKafkaTableITCase extends KafkaTableTestBase {
- private static final String JSON_FORMAT = "json";
- private static final String CSV_FORMAT = "csv";
- private static final String AVRO_FORMAT = "avro";
-
- @Parameterized.Parameter public String format;
-
- @Parameterized.Parameters(name = "format = {0}")
- public static Object[] parameters() {
- return new Object[] {JSON_FORMAT, CSV_FORMAT, AVRO_FORMAT};
+ private static Collection<String> formats() {
+ return Arrays.asList("avro", "csv", "json");
}
- @Rule public final LegacyRowResource usesLegacyRows =
LegacyRowResource.INSTANCE;
-
private static final String USERS_TOPIC = "users";
private static final String WORD_COUNT_TOPIC = "word_count";
- @Test
- public void testAggregate() throws Exception {
+ @BeforeEach
+ void setup() {
+ super.setup();
+ RowUtils.USE_LEGACY_TO_STRING = true;
+ }
+
+ @AfterEach
+ void after() {
+ super.after();
+ RowUtils.USE_LEGACY_TO_STRING = false;
+ }
+
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testAggregate(final String format) throws Exception {
String topic = WORD_COUNT_TOPIC + "_" + format;
createTestTopic(topic, 4, 1);
// ------------- test ---------------
- wordCountToUpsertKafka(topic);
- wordFreqToUpsertKafka(topic);
+ wordCountToUpsertKafka(topic, format);
+ wordFreqToUpsertKafka(topic, format);
// ------------- clean up ---------------
deleteTestTopic(topic);
}
- @Test
- public void testTemporalJoin() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testTemporalJoin(final String format) throws Exception {
String topic = USERS_TOPIC + "_" + format;
createTestTopic(topic, 2, 1);
// ------------- test ---------------
@@ -102,15 +107,16 @@ public class UpsertKafkaTableITCase extends
KafkaTableTestBase {
// partition and
// use the Kafka DefaultPartition to repartition the records.
env.setParallelism(1);
- writeChangelogToUpsertKafkaWithMetadata(topic);
+ writeChangelogToUpsertKafkaWithMetadata(topic, format);
env.setParallelism(2);
- temporalJoinUpsertKafka(topic);
+ temporalJoinUpsertKafka(topic, format);
// ------------- clean up ---------------
deleteTestTopic(topic);
}
- @Test
- public void testBufferedUpsertSink() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testBufferedUpsertSink(final String format) throws Exception {
final String topic = "buffered_upsert_topic_" + format;
createTestTopic(topic, 1, 1);
String bootstraps = getBootstrapServers();
@@ -198,8 +204,9 @@ public class UpsertKafkaTableITCase extends
KafkaTableTestBase {
deleteTestTopic(topic);
}
- @Test
- public void testBufferedUpsertSinkWithoutAssigningWatermark() throws
Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testBufferedUpsertSinkWithoutAssigningWatermark(final String format)
throws Exception {
final String topic =
"buffered_upsert_topic_without_assigning_watermark_" + format;
createTestTopic(topic, 1, 1);
String bootstraps = getBootstrapServers();
@@ -263,8 +270,9 @@ public class UpsertKafkaTableITCase extends
KafkaTableTestBase {
deleteTestTopic(topic);
}
- @Test
- public void testSourceSinkWithKeyAndPartialValue() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testSourceSinkWithKeyAndPartialValue(final String format) throws
Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "key_partial_value_topic_" + format;
@@ -361,8 +369,9 @@ public class UpsertKafkaTableITCase extends
KafkaTableTestBase {
deleteTestTopic(topic);
}
- @Test
- public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testKafkaSourceSinkWithKeyAndFullValue(final String format) throws
Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "key_full_value_topic_" + format;
@@ -456,8 +465,9 @@ public class UpsertKafkaTableITCase extends
KafkaTableTestBase {
deleteTestTopic(topic);
}
- @Test
- public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws
Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets(final String
format) throws Exception {
final String topic = "bounded_upsert_" + format + "_" +
UUID.randomUUID();
createTestTopic(topic, 1, 1);
@@ -509,8 +519,9 @@ public class UpsertKafkaTableITCase extends
KafkaTableTestBase {
deleteTestTopic(topic);
}
- @Test
- public void testUpsertKafkaSourceSinkWithBoundedTimestamp() throws
Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testUpsertKafkaSourceSinkWithBoundedTimestamp(final String format)
throws Exception {
final String topic = "bounded_upsert_" + format + "_" +
UUID.randomUUID();
createTestTopic(topic, 1, 1);
@@ -595,8 +606,9 @@ public class UpsertKafkaTableITCase extends
KafkaTableTestBase {
* Tests that setting bounded end offset that is before the earliest
offset results in 0
* results.
*/
- @Test
- public void testUpsertKafkaSourceSinkWithZeroLengthBoundedness() throws
Exception {
+ @ParameterizedTest(name = "format: {0}")
+ @MethodSource("formats")
+ void testUpsertKafkaSourceSinkWithZeroLengthBoundedness(final String
format) throws Exception {
final String topic = "bounded_upsert_" + format + "_" +
UUID.randomUUID();
createTestTopic(topic, 1, 1);
@@ -652,7 +664,7 @@ public class UpsertKafkaTableITCase extends
KafkaTableTestBase {
deleteTestTopic(topic);
}
- private void wordCountToUpsertKafka(String wordCountTable) throws
Exception {
+ private void wordCountToUpsertKafka(String wordCountTable, String format)
throws Exception {
String bootstraps = getBootstrapServers();
// ------------- test data ---------------
@@ -765,7 +777,7 @@ public class UpsertKafkaTableITCase extends
KafkaTableTestBase {
comparedWithKeyAndOrder(expected2, result2, new int[] {0});
}
- private void wordFreqToUpsertKafka(String wordCountTable) throws Exception
{
+ private void wordFreqToUpsertKafka(String wordCountTable, String format)
throws Exception {
// ------------- test data ---------------
final List<String> expectedData = Arrays.asList("3,1", "2,1");
@@ -802,7 +814,8 @@ public class UpsertKafkaTableITCase extends
KafkaTableTestBase {
query.getJobClient().get().cancel();
}
- private void writeChangelogToUpsertKafkaWithMetadata(String userTable)
throws Exception {
+ private void writeChangelogToUpsertKafkaWithMetadata(String userTable,
String format)
+ throws Exception {
String bootstraps = getBootstrapServers();
// ------------- test data ---------------
@@ -1050,7 +1063,7 @@ public class UpsertKafkaTableITCase extends
KafkaTableTestBase {
assertThat(result).satisfies(matching(deepEqualTo(expected, true)));
}
- private void temporalJoinUpsertKafka(String userTable) throws Exception {
+ private void temporalJoinUpsertKafka(String userTable, String format)
throws Exception {
// ------------- test data ---------------
List<Row> input =
Arrays.asList(