This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch v3.3 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 26b2a3c0bc9448697ac2d26a2c555b2937f988f4 Author: Arvid Heise <[email protected]> AuthorDate: Tue Feb 11 08:30:35 2025 +0100 [FLINK-37281] Fix KafkaUtil#checkProducerLeak Since Java 20, Thread.stop fails, so we just need to remember old leaks to avoid failing subsequent tests. (cherry picked from commit 707ec4c194cdea26a34e5f0c8540621c37175c61) --- .../org/apache/flink/connector/kafka/testutils/KafkaUtil.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java index 7bf7bb00..78b344b6 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.fail; @@ -193,12 +194,17 @@ public class KafkaUtil { .collect(Collectors.toSet()); } + private static final Set<Long> KNOWN_LEAKS = new ConcurrentSkipListSet<>(); + public static void checkProducerLeak() { List<Map.Entry<Thread, StackTraceElement[]>> leaks = null; for (int tries = 0; tries < 10; tries++) { leaks = Thread.getAllStackTraces().entrySet().stream() .filter(KafkaUtil::findAliveKafkaThread) + .filter( + threadEntry -> + !KNOWN_LEAKS.contains(threadEntry.getKey().getId())) .collect(Collectors.toList()); if (leaks.isEmpty()) { return; @@ -210,10 +216,10 @@ public class KafkaUtil { } for (Map.Entry<Thread, StackTraceElement[]> leak : leaks) { - leak.getKey().stop(); + KNOWN_LEAKS.add(leak.getKey().getId()); } fail( - "Detected producer leaks:\n" + "Detected new producer leaks:\n" + leaks.stream() .map(KafkaUtil::format) .collect(Collectors.joining("\n\n")));
