This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch v3.3
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 26b2a3c0bc9448697ac2d26a2c555b2937f988f4
Author: Arvid Heise <[email protected]>
AuthorDate: Tue Feb 11 08:30:35 2025 +0100

    [FLINK-37281] Fix KafkaUtil#checkProducerLeak
    
    Since Java 20, Thread.stop fails, so we just need to remember old leaks to 
avoid failing subsequent tests.
    
    (cherry picked from commit 707ec4c194cdea26a34e5f0c8540621c37175c61)
---
 .../org/apache/flink/connector/kafka/testutils/KafkaUtil.java  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
index 7bf7bb00..78b344b6 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.fail;
@@ -193,12 +194,17 @@ public class KafkaUtil {
                 .collect(Collectors.toSet());
     }
 
+    private static final Set<Long> KNOWN_LEAKS = new ConcurrentSkipListSet<>();
+
     public static void checkProducerLeak() {
         List<Map.Entry<Thread, StackTraceElement[]>> leaks = null;
         for (int tries = 0; tries < 10; tries++) {
             leaks =
                     Thread.getAllStackTraces().entrySet().stream()
                             .filter(KafkaUtil::findAliveKafkaThread)
+                            .filter(
+                                    threadEntry ->
+                                            
!KNOWN_LEAKS.contains(threadEntry.getKey().getId()))
                             .collect(Collectors.toList());
             if (leaks.isEmpty()) {
                 return;
@@ -210,10 +216,10 @@ public class KafkaUtil {
         }
 
         for (Map.Entry<Thread, StackTraceElement[]> leak : leaks) {
-            leak.getKey().stop();
+            KNOWN_LEAKS.add(leak.getKey().getId());
         }
         fail(
-                "Detected producer leaks:\n"
+                "Detected new producer leaks:\n"
                         + leaks.stream()
                                 .map(KafkaUtil::format)
                                 .collect(Collectors.joining("\n\n")));

Reply via email to