This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d428118cd144efe434f5474e8f77c1119722d59b
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jun 5 16:35:22 2026 +0300

    [fix][fn] Fix orphan exclusive producer on creation timeout in 
WorkerUtils.createExclusiveProducerWithRetry (#25942)
    
    (cherry picked from commit 2177b0e44d96e87a698c40d47ebb763a9523f2ee)
---
 .../pulsar/functions/worker/WorkerUtils.java       | 11 +++-
 .../pulsar/functions/worker/WorkerUtilsTest.java   | 69 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index af1edf5c8e8..c86c6e9a193 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -30,8 +30,8 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.nio.file.Files;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
@@ -60,6 +60,7 @@ import 
org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
 import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
@@ -400,13 +401,17 @@ public final class WorkerUtils {
             int tries = 0;
             do {
                 try {
-                    return client.newProducer().topic(topic)
+                    CompletableFuture<Producer<byte[]>> producerFuture = 
client.newProducer().topic(topic)
                             .accessMode(ProducerAccessMode.Exclusive)
                             .enableBatching(false)
                             .blockIfQueueFull(true)
                             .compressionType(CompressionType.LZ4)
                             .producerName(producerName)
-                            .createAsync().get(10, TimeUnit.SECONDS);
+                            .createAsync();
+                    return FutureUtil.getAndCleanupOnInterrupt(producerFuture, 
Producer::closeAsync);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw e;
                 } catch (Exception e) {
                     log.info("Encountered exception while at creating 
exclusive producer to topic {}", topic, e);
                 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
index 0f5fca4a8a5..823b645c672 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
@@ -18,26 +18,33 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.HashSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
+import lombok.Cleanup;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.CompressionType;
@@ -108,6 +115,68 @@ public class WorkerUtilsTest {
         }
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void 
testCreateExclusiveProducerWithRetryClosesProducerOnInterrupt() throws 
Exception {
+        Producer<byte[]> producer = mock(Producer.class);
+        
when(producer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+        // producer creation stays pending until the test completes it 
explicitly
+        CompletableFuture<Producer<byte[]>> producerFuture = new 
CompletableFuture<>();
+        CountDownLatch createAsyncCalled = new CountDownLatch(1);
+
+        ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+        when(builder.topic(anyString())).thenReturn(builder);
+        when(builder.producerName(anyString())).thenReturn(builder);
+        when(builder.enableBatching(anyBoolean())).thenReturn(builder);
+        when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
+        
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
+        when(builder.accessMode(any())).thenReturn(builder);
+        when(builder.createAsync()).thenAnswer(invocation -> {
+            createAsyncCalled.countDown();
+            return producerFuture;
+        });
+
+        PulsarClient pulsarClient = mock(PulsarClient.class);
+        when(pulsarClient.newProducer()).thenReturn(builder);
+
+        AtomicReference<Throwable> thrown = new AtomicReference<>();
+        AtomicBoolean interruptStatusPreserved = new AtomicBoolean();
+        @Cleanup("interrupt")
+        Thread caller = new Thread(() -> {
+            try {
+                WorkerUtils.createExclusiveProducerWithRetry(pulsarClient, 
"test-topic", "test-producer",
+                        () -> true, 0);
+            } catch (Throwable t) {
+                thrown.set(t);
+                
interruptStatusPreserved.set(Thread.currentThread().isInterrupted());
+            }
+        });
+        caller.setDaemon(true);
+        caller.start();
+        assertTrue(createAsyncCalled.await(10, TimeUnit.SECONDS));
+
+        // interrupt the caller while it is waiting for the producer to be 
created
+        caller.interrupt();
+        caller.join(TimeUnit.SECONDS.toMillis(10));
+        assertThat(caller.isAlive())
+                .as("Interrupt should abort the retry loop instead of 
retrying")
+                .isFalse();
+
+        assertThat(thrown.get())
+                .isInstanceOf(RuntimeException.class)
+                .hasCauseInstanceOf(InterruptedException.class);
+        assertThat(interruptStatusPreserved)
+                .as("Interrupt status should be restored")
+                .isTrue();
+
+        // when the pending creation completes after the interrupt, the 
producer must be closed so that
+        // the exclusive producer doesn't leak
+        verify(producer, never()).closeAsync();
+        producerFuture.complete(producer);
+        verify(producer, times(1)).closeAsync();
+    }
+
     @Test
     public void testDLogConfiguration() throws URISyntaxException, IOException 
{
         // The config yml is seeded with a fake bookie config.

Reply via email to