This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c22f3d5b2f11635a9c3b6070786129a7298164e6
Author: Ilya Brin <[email protected]>
AuthorDate: Wed Jan 14 07:57:14 2026 -0600

    [fix][fn] complete flushAsync before closeAsync in ProducerCache and wait 
for completion in closing the cache (#25140)
    
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit 21819c67c8fa322ef82371d753490d5b4c5026c5)
---
 .../pulsar/functions/instance/ProducerCache.java   | 31 +++++++++++++++----
 .../functions/instance/ProducerCacheTest.java      | 36 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
index 2e10581b352..32632ef0695 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
@@ -22,12 +22,16 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Scheduler;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.Closeable;
 import java.time.Duration;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.extern.slf4j.Slf4j;
@@ -59,15 +63,19 @@ public class ProducerCache implements Closeable {
 
     private final Cache<ProducerCacheKey, Producer<?>> cache;
     private final AtomicBoolean closed = new AtomicBoolean(false);
-    private final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures = 
new CopyOnWriteArrayList<>();
+    @VisibleForTesting
+    final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures = new 
CopyOnWriteArrayList<>();
+    private final ExecutorService cacheExecutor;
 
     public ProducerCache() {
+        cacheExecutor = Executors.newSingleThreadExecutor(new 
DefaultThreadFactory("ProducerCache"));
         Caffeine<ProducerCacheKey, Producer<?>> builder = Caffeine.newBuilder()
                 .scheduler(Scheduler.systemScheduler())
+                .executor(cacheExecutor)
                 .<ProducerCacheKey, Producer<?>>removalListener((key, 
producer, cause) -> {
                     log.info("Closing producer for topic {}, cause {}", 
key.topic(), cause);
                     CompletableFuture closeFuture =
-                            CompletableFuture.supplyAsync(() -> 
producer.flushAsync(), Runnable::run)
+                            producer.flushAsync()
                                     .orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)
                                     .exceptionally(ex -> {
                                         Throwable unwrappedCause = 
FutureUtil.unwrapCompletionException(ex);
@@ -128,10 +136,21 @@ public class ProducerCache implements Closeable {
     public void close() {
         if (closed.compareAndSet(false, true)) {
             cache.invalidateAll();
-            try {
-                FutureUtil.waitForAll(closeFutures).get();
-            } catch (InterruptedException | ExecutionException e) {
-                log.warn("Failed to close producers", e);
+            // schedule the waiting job on the cache executor
+            cacheExecutor.execute(() -> {
+                try {
+                    FutureUtil.waitForAll(closeFutures).get();
+                } catch (InterruptedException | ExecutionException e) {
+                    log.warn("Failed to close producers", e);
+                }
+            });
+            // Wait for the cache executor to terminate.
+            // The eviction jobs and waiting for the close futures to complete 
will run on the single-threaded
+            // cache executor, so we need to wait for them to finish to ensure 
that the cache is closed properly.
+            boolean terminated = 
MoreExecutors.shutdownAndAwaitTermination(cacheExecutor,
+                    Duration.ofSeconds(FLUSH_OR_CLOSE_TIMEOUT_SECONDS));
+            if (!terminated) {
+                log.warn("Failed to shutdown cache executor gracefully.");
             }
         }
     }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java
index af95a7901b6..129651c8804 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.functions.instance;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.testng.annotations.Test;
@@ -61,4 +64,37 @@ public class ProducerCacheTest {
         cache.close();
     }
 
+    @Test
+    public void shouldCompleteFlushBeforeCloseAndWaitForClosing() {
+        ProducerCache cache = new ProducerCache();
+        Producer producer = mock(Producer.class);
+        AtomicBoolean flushCompleted = new AtomicBoolean(false);
+        
when(producer.flushAsync()).thenReturn(CompletableFuture.supplyAsync(() -> {
+            try {
+                // add delay to make sure that cache close waits for 
completion of flushAsync
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+            flushCompleted.set(true);
+            return null;
+        }));
+        AtomicBoolean closeCompleted = new AtomicBoolean(false);
+        
when(producer.closeAsync()).thenReturn(CompletableFuture.supplyAsync(() -> {
+            try {
+                // add delay to make sure that cache close waits for 
completion of closeAsync
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+            closeCompleted.set(true);
+            return null;
+        }));
+        cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, 
"topic", "key",
+                () -> (Producer<Object>) producer);
+        cache.close();
+        assertTrue(flushCompleted.get());
+        assertTrue(closeCompleted.get());
+        assertEquals(cache.closeFutures.size(), 1);
+    }
 }
\ No newline at end of file

Reply via email to