This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch v3.4
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit a23613c45d60eb26b6822ef3e5183b2169842583
Author: Arvid Heise <[email protected]>
AuthorDate: Sun Feb 16 21:01:32 2025 +0100

    [hotfix] Fix leaks in FlinkKafkaProducerTest
    
    (cherry picked from commit 2e652a92e48df67d77e04f7aebcf8e97c511b1cb)
---
 .../connectors/kafka/FlinkKafkaProducerTest.java   | 34 ++++++++++++++--------
 1 file changed, 22 insertions(+), 12 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
index 6fedcc43..5c2d3803 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
@@ -33,28 +34,35 @@ import javax.annotation.Nullable;
 import java.util.Optional;
 import java.util.Properties;
 
+import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link FlinkKafkaProducer}. */
 public class FlinkKafkaProducerTest {
+    @After
+    public void checkLeaks() {
+        checkProducerLeak();
+    }
+
     @Test
     public void testOpenSerializationSchemaProducer() throws Exception {
         OpenTestingSerializationSchema schema = new 
OpenTestingSerializationSchema();
         FlinkKafkaProducer<Integer> kafkaProducer =
                 new FlinkKafkaProducer<>("localhost:9092", "test-topic", 
schema);
 
-        OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
+        try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new StreamSink<>(kafkaProducer),
                         1,
                         1,
                         0,
                         IntSerializer.INSTANCE,
-                        new OperatorID(1, 1));
+                        new OperatorID(1, 1))) {
 
-        testHarness.open();
+            testHarness.open();
 
-        assertThat(schema.openCalled).isTrue();
+            assertThat(schema.openCalled).isTrue();
+        }
     }
 
     @Test
@@ -69,18 +77,19 @@ public class FlinkKafkaProducerTest {
                         properties,
                         FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
 
-        OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
+        try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new StreamSink<>(kafkaProducer),
                         1,
                         1,
                         0,
                         IntSerializer.INSTANCE,
-                        new OperatorID(1, 1));
+                        new OperatorID(1, 1))) {
 
-        testHarness.open();
+            testHarness.open();
 
-        assertThat(schema.openCalled).isTrue();
+            assertThat(schema.openCalled).isTrue();
+        }
     }
 
     @Test
@@ -95,18 +104,19 @@ public class FlinkKafkaProducerTest {
                         properties,
                         Optional.of(partitioner));
 
-        OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
+        try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new StreamSink<>(kafkaProducer),
                         1,
                         1,
                         0,
                         IntSerializer.INSTANCE,
-                        new OperatorID(1, 1));
+                        new OperatorID(1, 1))) {
 
-        testHarness.open();
+            testHarness.open();
 
-        assertThat(partitioner.openCalled).isTrue();
+            assertThat(partitioner.openCalled).isTrue();
+        }
     }
 
     @Test(expected = NullPointerException.class)

Reply via email to