snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047275232


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java:
##########
@@ -79,20 +77,20 @@ public String getOperatorSnapshotPath(FlinkVersion version) 
{
      * Override {@link KafkaTestBase}. Kafka Migration Tests are starting up 
Kafka/ZooKeeper cluster
      * manually
      */
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {}
 
     /**
      * Override {@link KafkaTestBase}. Kafka Migration Tests are starting up 
Kafka/ZooKeeper cluster
      * manually
      */
-    @AfterClass
+    @AfterAll
     public static void shutDownServices() throws Exception {}

Review Comment:
   could be package private or protected i guess



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -65,16 +68,16 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
     private static KafkaTestEnvironment kafkaServer;
     private static Properties standardProps;
 
-    @ClassRule
-    public static MiniClusterWithClientResource flink =
-            new MiniClusterWithClientResource(
+    @RegisterExtension
+    public static MiniClusterExtension flink =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(getConfiguration())
                             .setNumberTaskManagers(NUM_TMS)
                             .setNumberSlotsPerTaskManager(TM_SLOTS)
                             .build());
 
-    @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+    @TempDir public static Path tempFolder;

Review Comment:
   do we need it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -84,7 +87,7 @@ private static Configuration getConfiguration() {
         return flinkConfig;
     }
 
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   do we need it `public`



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -92,17 +93,15 @@ public abstract class KafkaTestBase extends TestLogger {
 
     public static KafkaTestEnvironment kafkaServer;
 
-    @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+    @TempDir public static Path tempFolder;
 
     public static Properties secureProps = new Properties();
 
-    @Rule public final RetryRule retryRule = new RetryRule();
-
     // ------------------------------------------------------------------------
     //  Setup and teardown of the mini clusters
     // ------------------------------------------------------------------------
 
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   do we need to have it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromSpecificOffsets() throws Exception {
         runStartFromSpecificOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromTimestamp() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromSpecificOffsets() throws Exception {
         runStartFromSpecificOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromTimestamp() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromTimestamp() throws Exception {
         runStartFromTimestamp();
     }
 
     // --- offset committing ---
 
-    @Test(timeout = 60000)
-    public void testCommitOffsetsToKafka() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCommitOffsetsToKafka() throws Exception {
         runCommitOffsetsToKafka();
     }
 
-    @Test(timeout = 60000)
-    public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
         runAutoOffsetRetrievalAndCommitToKafka();
     }
 
-    @Test(timeout = 60000)
-    public void testCollectingSchema() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -92,17 +93,15 @@ public abstract class KafkaTestBase extends TestLogger {
 
     public static KafkaTestEnvironment kafkaServer;
 
-    @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+    @TempDir public static Path tempFolder;

Review Comment:
   do we need to have it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java:
##########
@@ -79,20 +77,20 @@ public String getOperatorSnapshotPath(FlinkVersion version) 
{
      * Override {@link KafkaTestBase}. Kafka Migration Tests are starting up 
Kafka/ZooKeeper cluster
      * manually
      */
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {}

Review Comment:
   could be package private or protected i guess



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromSpecificOffsets() throws Exception {
         runStartFromSpecificOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromTimestamp() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromTimestamp() throws Exception {
         runStartFromTimestamp();
     }
 
     // --- offset committing ---
 
-    @Test(timeout = 60000)
-    public void testCommitOffsetsToKafka() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCommitOffsetsToKafka() throws Exception {
         runCommitOffsetsToKafka();
     }
 
-    @Test(timeout = 60000)
-    public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
         runAutoOffsetRetrievalAndCommitToKafka();
     }
 
-    @Test(timeout = 60000)
-    public void testCollectingSchema() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCollectingSchema() throws Exception {
         runCollectingSchemaTest();
     }
 
     /** Kafka 20 specific test, ensuring Timestamps are properly written to 
and read from Kafka. */
-    @Test(timeout = 60000)
-    public void testTimestamps() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromSpecificOffsets() throws Exception {
         runStartFromSpecificOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromTimestamp() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromTimestamp() throws Exception {
         runStartFromTimestamp();
     }
 
     // --- offset committing ---
 
-    @Test(timeout = 60000)
-    public void testCommitOffsetsToKafka() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -79,7 +79,8 @@
  */
 @SuppressWarnings("serial")
 @RetryOnFailure(times = 3)

Review Comment:
   yes it makes sense
   it will simplify to remove junit4 related stuff



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java:
##########
@@ -18,13 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.BeforeClass;
+import org.junit.jupiter.api.BeforeAll;
 
 /** IT cases for the {@link FlinkKafkaProducer}. */
 @SuppressWarnings("serial")
 public class KafkaProducerAtLeastOnceITCase extends KafkaProducerTestBase {
 
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   could be package private



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -113,11 +116,14 @@ public static void prepare() throws Exception {
         standardProps = kafkaServer.getStandardProperties();
     }
 
-    @AfterClass
-    public static void shutDownServices() throws Exception {
+    @AfterAll
+    public static void shutDownServices(@InjectMiniCluster MiniCluster 
miniCluster)
+            throws Exception {
         kafkaServer.shutdown();
 
         secureProps.clear();
+
+        miniCluster.close();

Review Comment:
   why should we start to close something if we didn't open it?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromSpecificOffsets() throws Exception {
         runStartFromSpecificOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromTimestamp() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromTimestamp() throws Exception {
         runStartFromTimestamp();
     }
 
     // --- offset committing ---
 
-    @Test(timeout = 60000)
-    public void testCommitOffsetsToKafka() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCommitOffsetsToKafka() throws Exception {
         runCommitOffsetsToKafka();
     }
 
-    @Test(timeout = 60000)
-    public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBaseWithFlink.java:
##########
@@ -30,9 +30,9 @@ public abstract class KafkaTestBaseWithFlink extends 
KafkaTestBase {
 
     protected static final int TM_SLOTS = 8;
 
-    @ClassRule
-    public static MiniClusterWithClientResource flink =
-            new MiniClusterWithClientResource(
+    @RegisterExtension
+    public static final MiniClusterExtension FLINK =

Review Comment:
   do we need to have it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -113,11 +116,14 @@ public static void prepare() throws Exception {
         standardProps = kafkaServer.getStandardProperties();
     }
 
-    @AfterClass
-    public static void shutDownServices() throws Exception {
+    @AfterAll
+    public static void shutDownServices(@InjectMiniCluster MiniCluster 
miniCluster)

Review Comment:
   do we need it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -111,7 +110,7 @@ public static void prepare() throws Exception {
         startClusters(false);
     }
 
-    @AfterClass
+    @AfterAll
     public static void shutDownServices() throws Exception {

Review Comment:
   do we need to have it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java:
##########
@@ -128,15 +126,19 @@ private OperatorSubtaskState initializeTestState() throws 
Exception {
     }
 
     @SuppressWarnings("warning")
-    @Test
+    @TestTemplate
     public void testRestoreProducer() throws Exception {

Review Comment:
   could be package private 



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java:
##########
@@ -18,21 +18,21 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 /** IT cases for the {@link FlinkKafkaProducer}. */
 @SuppressWarnings("serial")
-public class KafkaProducerExactlyOnceITCase extends KafkaProducerTestBase {
-    @BeforeClass
+class KafkaProducerExactlyOnceITCase extends KafkaProducerTestBase {
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   could be package private



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to