This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch v3.4 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit a23613c45d60eb26b6822ef3e5183b2169842583 Author: Arvid Heise <[email protected]> AuthorDate: Sun Feb 16 21:01:32 2025 +0100 [hotfix] Fix leaks in FlinkKafkaProducerTest (cherry picked from commit 2e652a92e48df67d77e04f7aebcf8e97c511b1cb) --- .../connectors/kafka/FlinkKafkaProducerTest.java | 34 ++++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java index 6fedcc43..5c2d3803 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.After; import org.junit.Test; import javax.annotation.Nullable; @@ -33,28 +34,35 @@ import javax.annotation.Nullable; import java.util.Optional; import java.util.Properties; +import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link FlinkKafkaProducer}. */ public class FlinkKafkaProducerTest { + @After + public void checkLeaks() { + checkProducerLeak(); + } + @Test public void testOpenSerializationSchemaProducer() throws Exception { OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema(); FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>("localhost:9092", "test-topic", schema); - OneInputStreamOperatorTestHarness<Integer, Object> testHarness = + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>( new StreamSink<>(kafkaProducer), 1, 1, 0, IntSerializer.INSTANCE, - new OperatorID(1, 1)); + new OperatorID(1, 1))) { - testHarness.open(); + testHarness.open(); - assertThat(schema.openCalled).isTrue(); + assertThat(schema.openCalled).isTrue(); + } } @Test @@ -69,18 +77,19 @@ public class FlinkKafkaProducerTest { properties, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); - OneInputStreamOperatorTestHarness<Integer, Object> testHarness = + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>( new StreamSink<>(kafkaProducer), 1, 1, 0, IntSerializer.INSTANCE, - new OperatorID(1, 1)); + new OperatorID(1, 1))) { - testHarness.open(); + testHarness.open(); - assertThat(schema.openCalled).isTrue(); + assertThat(schema.openCalled).isTrue(); + } } @Test @@ -95,18 +104,19 @@ public class FlinkKafkaProducerTest { properties, Optional.of(partitioner)); - OneInputStreamOperatorTestHarness<Integer, Object> testHarness = + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>( new StreamSink<>(kafkaProducer), 1, 1, 0, IntSerializer.INSTANCE, - new OperatorID(1, 1)); + new OperatorID(1, 1))) { - testHarness.open(); + testHarness.open(); - assertThat(partitioner.openCalled).isTrue(); + assertThat(partitioner.openCalled).isTrue(); + } } @Test(expected = NullPointerException.class)
