This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit e386e7c28dfb701717c587a90c13d5e575a4cf2f
Author: Arvid Heise <[email protected]>
AuthorDate: Mon Nov 17 09:52:37 2025 +0100

    [hotfix] Avoid parallel usage of KafkaTestBase
    
    KafkaTestBase should be an extension. It currently uses static fields to 
keep track of the resources and can lead to live locks during parallel test 
execution.
---
 .../java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java | 2 ++
 .../apache/flink/connector/kafka/source/KafkaSourceMigrationITCase.java | 2 ++
 .../connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java    | 2 ++
 .../kafka/source/enumerator/initializer/OffsetsInitializerTest.java     | 2 ++
 .../kafka/source/enumerator/subscriber/KafkaSubscriberTest.java         | 2 ++
 .../connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java    | 2 ++
 .../flink/connector/kafka/source/reader/KafkaSourceReaderTest.java      | 2 ++
 .../java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java | 2 ++
 8 files changed, 16 insertions(+)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 8c3cfd8a..02b4f991 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -61,6 +61,7 @@ import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.parallel.ResourceLock;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.testcontainers.containers.KafkaContainer;
@@ -84,6 +85,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.kafkaSer
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unite test class for {@link KafkaSource}. */
+@ResourceLock("KafkaTestBase")
 public class KafkaSourceITCase {
     private static final String TOPIC1 = "topic1";
     private static final String TOPIC2 = "topic2";
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceMigrationITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceMigrationITCase.java
index e368b0ac..73a09839 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceMigrationITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceMigrationITCase.java
@@ -48,6 +48,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.parallel.ResourceLock;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -67,6 +68,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** The test for creation savepoint for migration tests for the Kafka Sink. */
 @Testcontainers
+@ResourceLock("KafkaTestBase")
 public class KafkaSourceMigrationITCase extends TestLogger {
     public static final String KAFKA_SOURCE_UID = "kafka-source-operator-uid";
     // Directory to store the savepoints in src/test/resources
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
index 6d69541f..bb706084 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
@@ -43,6 +43,7 @@ import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.ResourceLock;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
@@ -64,6 +65,7 @@ import static 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit.
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for {@link KafkaSourceEnumerator}. */
+@ResourceLock("KafkaTestBase")
 public class KafkaSourceEnumeratorTest {
     private static final int NUM_SUBTASKS = 3;
     private static final String DYNAMIC_TOPIC_NAME = "dynamic_topic";
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
index 19f8c47a..d5566d2b 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.jupiter.api.parallel.ResourceLock;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,6 +39,7 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for {@link OffsetsInitializer}. */
+@ResourceLock("KafkaTestBase")
 public class OffsetsInitializerTest {
     private static final String TOPIC = "topic";
     private static final String TOPIC2 = "topic2";
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
index 4c5a5024..0fd61f86 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.jupiter.api.parallel.ResourceLock;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -41,6 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Unit tests for {@link KafkaSubscriber}. */
+@ResourceLock("KafkaTestBase")
 public class KafkaSubscriberTest {
     private static final String TOPIC1 = "topic1";
     private static final String TOPIC2 = "pattern-topic";
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index b592a691..af000438 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -45,6 +45,7 @@ import 
org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.ResourceLock;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.EmptySource;
@@ -70,6 +71,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Unit tests for {@link KafkaPartitionSplitReader}. */
+@ResourceLock("KafkaTestBase")
 public class KafkaPartitionSplitReaderTest {
     private static final int NUM_SUBTASKS = 3;
     private static final String TOPIC1 = "topic1";
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index 60bce02c..134fe70f 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -54,6 +54,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.ResourceLock;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -84,6 +85,7 @@ import static 
org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for {@link KafkaSourceReader}. */
+@ResourceLock("KafkaTestBase")
 public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSplit> {
     private static final String TOPIC = "KafkaSourceReaderTest";
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index c21ed14c..ab94955d 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -41,6 +41,7 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
+import org.junit.jupiter.api.parallel.ResourceLock;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +73,7 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 @SuppressWarnings("serial")
 @RetryOnFailure(times = 3)
+@ResourceLock("KafkaTestBase")
 public abstract class KafkaTestBase extends TestLogger {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestBase.class);

Reply via email to