This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d428118cd144efe434f5474e8f77c1119722d59b Author: Lari Hotari <[email protected]> AuthorDate: Fri Jun 5 16:35:22 2026 +0300 [fix][fn] Fix orphan exclusive producer on creation timeout in WorkerUtils.createExclusiveProducerWithRetry (#25942) (cherry picked from commit 2177b0e44d96e87a698c40d47ebb763a9523f2ee) --- .../pulsar/functions/worker/WorkerUtils.java | 11 +++- .../pulsar/functions/worker/WorkerUtilsTest.java | 69 ++++++++++++++++++++++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java index af1edf5c8e8..c86c6e9a193 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java @@ -30,8 +30,8 @@ import java.io.OutputStream; import java.net.URI; import java.nio.file.Files; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -60,6 +60,7 @@ import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl; import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeSpawner; @@ -400,13 +401,17 @@ public final class WorkerUtils { int tries = 0; do { try { - return client.newProducer().topic(topic) + CompletableFuture<Producer<byte[]>> producerFuture = client.newProducer().topic(topic) .accessMode(ProducerAccessMode.Exclusive) .enableBatching(false) .blockIfQueueFull(true) .compressionType(CompressionType.LZ4) .producerName(producerName) - .createAsync().get(10, TimeUnit.SECONDS); + .createAsync(); + return FutureUtil.getAndCleanupOnInterrupt(producerFuture, Producer::closeAsync); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; } catch (Exception e) { log.info("Encountered exception while at creating exclusive producer to topic {}", topic, e); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java index 0f5fca4a8a5..823b645c672 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java @@ -18,26 +18,33 @@ */ package org.apache.pulsar.functions.worker; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; import java.util.HashSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import lombok.Cleanup; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.api.CompressionType; @@ -108,6 +115,68 @@ public class WorkerUtilsTest { } } + @Test + @SuppressWarnings("unchecked") + public void testCreateExclusiveProducerWithRetryClosesProducerOnInterrupt() throws Exception { + Producer<byte[]> producer = mock(Producer.class); + when(producer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + + // producer creation stays pending until the test completes it explicitly + CompletableFuture<Producer<byte[]>> producerFuture = new CompletableFuture<>(); + CountDownLatch createAsyncCalled = new CountDownLatch(1); + + ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class); + when(builder.topic(anyString())).thenReturn(builder); + when(builder.producerName(anyString())).thenReturn(builder); + when(builder.enableBatching(anyBoolean())).thenReturn(builder); + when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder); + when(builder.compressionType(any(CompressionType.class))).thenReturn(builder); + when(builder.accessMode(any())).thenReturn(builder); + when(builder.createAsync()).thenAnswer(invocation -> { + createAsyncCalled.countDown(); + return producerFuture; + }); + + PulsarClient pulsarClient = mock(PulsarClient.class); + when(pulsarClient.newProducer()).thenReturn(builder); + + AtomicReference<Throwable> thrown = new AtomicReference<>(); + AtomicBoolean interruptStatusPreserved = new AtomicBoolean(); + @Cleanup("interrupt") + Thread caller = new Thread(() -> { + try { + WorkerUtils.createExclusiveProducerWithRetry(pulsarClient, "test-topic", "test-producer", + () -> true, 0); + } catch (Throwable t) { + thrown.set(t); + interruptStatusPreserved.set(Thread.currentThread().isInterrupted()); + } + }); + caller.setDaemon(true); + caller.start(); + assertTrue(createAsyncCalled.await(10, TimeUnit.SECONDS)); + + // interrupt the caller while it is waiting for the producer to be created + caller.interrupt(); + caller.join(TimeUnit.SECONDS.toMillis(10)); + assertThat(caller.isAlive()) + .as("Interrupt should abort the retry loop instead of retrying") + .isFalse(); + + assertThat(thrown.get()) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(InterruptedException.class); + assertThat(interruptStatusPreserved) + .as("Interrupt status should be restored") + .isTrue(); + + // when the pending creation completes after the interrupt, the producer must be closed so that + // the exclusive producer doesn't leak + verify(producer, never()).closeAsync(); + producerFuture.complete(producer); + verify(producer, times(1)).closeAsync(); + } + @Test public void testDLogConfiguration() throws URISyntaxException, IOException { // The config yml is seeded with a fake bookie config.
