This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 593625cebee5f9bf2042317362a803830987d321 Author: Lari Hotari <[email protected]> AuthorDate: Tue Dec 17 18:34:32 2024 +0200 [improve][fn] Improve closing of producers in Pulsar Functions ProducerCache invalidation (#23734) (cherry picked from commit 9f046a5f6bfa35bc89e74635cbf7aacd43bc9fc9) --- .../pulsar/functions/instance/ProducerCache.java | 38 +++++++++---- .../functions/instance/ProducerCacheTest.java | 64 ++++++++++++++++++++++ 2 files changed, 92 insertions(+), 10 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java index f68c4e95895..2e10581b352 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.FutureUtil; @Slf4j @@ -61,24 +62,41 @@ public class ProducerCache implements Closeable { private final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures = new CopyOnWriteArrayList<>(); public ProducerCache() { - Caffeine<ProducerCacheKey, Producer> builder = Caffeine.newBuilder() + Caffeine<ProducerCacheKey, Producer<?>> builder = Caffeine.newBuilder() .scheduler(Scheduler.systemScheduler()) - .<ProducerCacheKey, Producer>removalListener((key, producer, cause) -> { + .<ProducerCacheKey, Producer<?>>removalListener((key, producer, cause) -> { log.info("Closing producer for topic {}, cause {}", key.topic(), cause); CompletableFuture closeFuture = - producer.flushAsync() + CompletableFuture.supplyAsync(() -> producer.flushAsync(), Runnable::run) .orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS) .exceptionally(ex -> { - log.error("Error flushing producer for topic {}", key.topic(), ex); + Throwable unwrappedCause = FutureUtil.unwrapCompletionException(ex); + if (unwrappedCause instanceof PulsarClientException.AlreadyClosedException) { + log.error( + "Error flushing producer for topic {} due to " + + "AlreadyClosedException", + key.topic()); + } else { + log.error("Error flushing producer for topic {}", key.topic(), + unwrappedCause); + } return null; }).thenCompose(__ -> producer.closeAsync().orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, - TimeUnit.SECONDS) - .exceptionally(ex -> { - log.error("Error closing producer for topic {}", key.topic(), - ex); - return null; - })); + TimeUnit.SECONDS) + ).exceptionally(ex -> { + Throwable unwrappedCause = FutureUtil.unwrapCompletionException(ex); + if (unwrappedCause instanceof PulsarClientException.AlreadyClosedException) { + log.error( + "Error closing producer for topic {} due to " + + "AlreadyClosedException", + key.topic()); + } else { + log.error("Error closing producer for topic {}", key.topic(), + unwrappedCause); + } + return null; + }); if (closed.get()) { closeFutures.add(closeFuture); } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java new file mode 100644 index 00000000000..af95a7901b6 --- /dev/null +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.testng.annotations.Test; + +public class ProducerCacheTest { + + @Test + public void shouldTolerateAlreadyClosedExceptionInClose() { + ProducerCache cache = new ProducerCache(); + Producer producer = mock(Producer.class); + when(producer.flushAsync()).thenReturn(CompletableFuture.completedFuture(null)); + when(producer.closeAsync()).thenReturn( + CompletableFuture.failedFuture(new PulsarClientException.AlreadyClosedException("Already closed"))); + cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, "topic", "key", + () -> (Producer<Object>) producer); + cache.close(); + } + + @Test + public void shouldTolerateRuntimeExceptionInClose() { + ProducerCache cache = new ProducerCache(); + Producer producer = mock(Producer.class); + when(producer.flushAsync()).thenReturn(CompletableFuture.completedFuture(null)); + when(producer.closeAsync()).thenThrow(new RuntimeException("Some exception")); + cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, "topic", "key", + () -> (Producer<Object>) producer); + cache.close(); + } + + @Test + public void shouldTolerateRuntimeExceptionInFlush() { + ProducerCache cache = new ProducerCache(); + Producer producer = mock(Producer.class); + when(producer.flushAsync()).thenThrow(new RuntimeException("Some exception")); + when(producer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, "topic", "key", + () -> (Producer<Object>) producer); + cache.close(); + } + +} \ No newline at end of file
