This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 65e999cbff9 [fix][fn] Fix orphan exclusive producer on creation
timeout in WorkerUtils.createExclusiveProducerWithRetry (#25942)
65e999cbff9 is described below
commit 65e999cbff9e2c21c98adfad889b45d1dc8d04e3
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jun 5 16:35:22 2026 +0300
[fix][fn] Fix orphan exclusive producer on creation timeout in
WorkerUtils.createExclusiveProducerWithRetry (#25942)
(cherry picked from commit 2177b0e44d96e87a698c40d47ebb763a9523f2ee)
---
.../pulsar/functions/worker/WorkerUtils.java | 11 +++-
.../pulsar/functions/worker/WorkerUtilsTest.java | 69 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 3 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index af1edf5c8e8..c86c6e9a193 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -30,8 +30,8 @@ import java.io.OutputStream;
import java.net.URI;
import java.nio.file.Files;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -60,6 +60,7 @@ import
org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
@@ -400,13 +401,17 @@ public final class WorkerUtils {
int tries = 0;
do {
try {
- return client.newProducer().topic(topic)
+ CompletableFuture<Producer<byte[]>> producerFuture =
client.newProducer().topic(topic)
.accessMode(ProducerAccessMode.Exclusive)
.enableBatching(false)
.blockIfQueueFull(true)
.compressionType(CompressionType.LZ4)
.producerName(producerName)
- .createAsync().get(10, TimeUnit.SECONDS);
+ .createAsync();
+ return FutureUtil.getAndCleanupOnInterrupt(producerFuture,
Producer::closeAsync);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
} catch (Exception e) {
log.info("Encountered exception while at creating
exclusive producer to topic {}", topic, e);
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
index 0f5fca4a8a5..823b645c672 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
@@ -18,26 +18,33 @@
*/
package org.apache.pulsar.functions.worker;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
+import lombok.Cleanup;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.CompressionType;
@@ -108,6 +115,68 @@ public class WorkerUtilsTest {
}
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
testCreateExclusiveProducerWithRetryClosesProducerOnInterrupt() throws
Exception {
+ Producer<byte[]> producer = mock(Producer.class);
+
when(producer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+ // producer creation stays pending until the test completes it
explicitly
+ CompletableFuture<Producer<byte[]>> producerFuture = new
CompletableFuture<>();
+ CountDownLatch createAsyncCalled = new CountDownLatch(1);
+
+ ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+ when(builder.topic(anyString())).thenReturn(builder);
+ when(builder.producerName(anyString())).thenReturn(builder);
+ when(builder.enableBatching(anyBoolean())).thenReturn(builder);
+ when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
+
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
+ when(builder.accessMode(any())).thenReturn(builder);
+ when(builder.createAsync()).thenAnswer(invocation -> {
+ createAsyncCalled.countDown();
+ return producerFuture;
+ });
+
+ PulsarClient pulsarClient = mock(PulsarClient.class);
+ when(pulsarClient.newProducer()).thenReturn(builder);
+
+ AtomicReference<Throwable> thrown = new AtomicReference<>();
+ AtomicBoolean interruptStatusPreserved = new AtomicBoolean();
+ @Cleanup("interrupt")
+ Thread caller = new Thread(() -> {
+ try {
+ WorkerUtils.createExclusiveProducerWithRetry(pulsarClient,
"test-topic", "test-producer",
+ () -> true, 0);
+ } catch (Throwable t) {
+ thrown.set(t);
+
interruptStatusPreserved.set(Thread.currentThread().isInterrupted());
+ }
+ });
+ caller.setDaemon(true);
+ caller.start();
+ assertTrue(createAsyncCalled.await(10, TimeUnit.SECONDS));
+
+ // interrupt the caller while it is waiting for the producer to be
created
+ caller.interrupt();
+ caller.join(TimeUnit.SECONDS.toMillis(10));
+ assertThat(caller.isAlive())
+ .as("Interrupt should abort the retry loop instead of
retrying")
+ .isFalse();
+
+ assertThat(thrown.get())
+ .isInstanceOf(RuntimeException.class)
+ .hasCauseInstanceOf(InterruptedException.class);
+ assertThat(interruptStatusPreserved)
+ .as("Interrupt status should be restored")
+ .isTrue();
+
+ // when the pending creation completes after the interrupt, the
producer must be closed so that
+ // the exclusive producer doesn't leak
+ verify(producer, never()).closeAsync();
+ producerFuture.complete(producer);
+ verify(producer, times(1)).closeAsync();
+ }
+
@Test
public void testDLogConfiguration() throws URISyntaxException, IOException
{
// The config yml is seeded with a fake bookie config.