This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2a050d17c9b [improve][broker] Cache the internal writer when sent to 
system topic. (#22099)
2a050d17c9b is described below

commit 2a050d17c9b87d8a4868a5613a5d2567126dbdb1
Author: Jiwei Guo <[email protected]>
AuthorDate: Sat Feb 24 08:04:37 2024 +0800

    [improve][broker] Cache the internal writer when sent to system topic. 
(#22099)
    
    (cherry picked from commit 8607905989081421c452e87db7b1eedececd7977)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 84 ++++++++++++++--------
 .../systopic/TopicPoliciesSystemTopicClient.java   | 10 ++-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 35 +++++++++
 3 files changed, 97 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 80fecbe67b6..71f78e21f93 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.service;
 
 import static java.util.Objects.requireNonNull;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import java.util.HashSet;
@@ -29,6 +31,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.tuple.MutablePair;
@@ -84,10 +87,25 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     @VisibleForTesting
     final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = 
new ConcurrentHashMap<>();
 
+    private final AsyncLoadingCache<NamespaceName, 
SystemTopicClient.Writer<PulsarEvent>> writerCaches;
+
     public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
         this.pulsarService = pulsarService;
         this.clusterName = pulsarService.getConfiguration().getClusterName();
         this.localCluster = Sets.newHashSet(clusterName);
+        this.writerCaches = Caffeine.newBuilder()
+                .expireAfterAccess(5, TimeUnit.MINUTES)
+                .removalListener((namespaceName, writer, cause) -> {
+                    ((SystemTopicClient.Writer) 
writer).closeAsync().exceptionally(ex -> {
+                        log.error("[{}] Close writer error.", namespaceName, 
ex);
+                        return null;
+                    });
+                })
+                .buildAsync((namespaceName, executor) -> {
+                    SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
+                            
.createTopicPoliciesSystemTopicClient(namespaceName);
+                    return systemTopicClient.newWriterAsync();
+                });
     }
 
     @Override
@@ -122,39 +140,32 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     } catch (PulsarServerException e) {
                         return CompletableFuture.failedFuture(e);
                     }
-
-                    SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
-                                    
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
-
-                    return systemTopicClient.newWriterAsync()
-                            .thenCompose(writer -> {
-                            PulsarEvent event = getPulsarEvent(topicName, 
actionType, policies);
-                            CompletableFuture<MessageId> writeFuture =
-                                    ActionType.DELETE.equals(actionType) ? 
writer.deleteAsync(getEventKey(event), event)
-                                            : 
writer.writeAsync(getEventKey(event), event);
-                            return writeFuture.handle((messageId, e) -> {
-                                if (e != null) {
-                                    return CompletableFuture.failedFuture(e);
+                    CompletableFuture<Void> result = new CompletableFuture<>();
+                    writerCaches.get(topicName.getNamespaceObject())
+                            .whenComplete((writer, cause) -> {
+                                if (cause != null) {
+                                    
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());
+                                    result.completeExceptionally(cause);
                                 } else {
-                                    if (messageId != null) {
-                                        return 
CompletableFuture.completedFuture(null);
-                                    } else {
-                                        return CompletableFuture.failedFuture(
-                                                new RuntimeException("Got 
message id is null."));
-                                    }
-                                }
-                            }).thenRun(() ->
-                                        writer.closeAsync().whenComplete((v, 
cause) -> {
-                                            if (cause != null) {
-                                                log.error("[{}] Close writer 
error.", topicName, cause);
+                                    PulsarEvent event = 
getPulsarEvent(topicName, actionType, policies);
+                                    CompletableFuture<MessageId> writeFuture = 
ActionType.DELETE.equals(actionType)
+                                                    ? 
writer.deleteAsync(getEventKey(event), event)
+                                                    : 
writer.writeAsync(getEventKey(event), event);
+                                    writeFuture.whenComplete((messageId, e) -> 
{
+                                        if (e != null) {
+                                            result.completeExceptionally(e);
+                                        } else {
+                                            if (messageId != null) {
+                                                result.complete(null);
                                             } else {
-                                                if (log.isDebugEnabled()) {
-                                                    log.debug("[{}] Close 
writer success.", topicName);
-                                                }
+                                                result.completeExceptionally(
+                                                        new 
RuntimeException("Got message id is null."));
                                             }
-                                        })
-                            );
+                                        }
+                                    });
+                            }
                     });
+                    return result;
                 });
     }
 
@@ -364,7 +375,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
         AtomicInteger bundlesCount = 
ownedBundlesCountPerNamespace.get(namespace);
         if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
-            cleanCacheAndCloseReader(namespace, true);
+            cleanCacheAndCloseReader(namespace, true, true);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -440,6 +451,14 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, 
boolean cleanOwnedBundlesCount) {
+        cleanCacheAndCloseReader(namespace, cleanOwnedBundlesCount, false);
+    }
+
+    private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, 
boolean cleanOwnedBundlesCount,
+                                          boolean cleanWriterCache) {
+        if (cleanWriterCache) {
+            writerCaches.synchronous().invalidate(namespace);
+        }
         CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture 
= readerCaches.remove(namespace);
 
         if (cleanOwnedBundlesCount) {
@@ -688,5 +707,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         return listeners;
     }
 
+    @VisibleForTesting
+    protected AsyncLoadingCache<NamespaceName, 
SystemTopicClient.Writer<PulsarEvent>> getWriterCaches() {
+        return writerCaches;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index 3fd8921c15e..b7cff2e08c2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -30,6 +30,8 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.TopicName;
@@ -41,13 +43,17 @@ import org.slf4j.LoggerFactory;
  */
 public class TopicPoliciesSystemTopicClient extends 
SystemTopicClientBase<PulsarEvent> {
 
+    static Schema<PulsarEvent> avroSchema = 
DefaultImplementation.getDefaultImplementation()
+            
.newAvroSchema(SchemaDefinition.builder().withPojo(PulsarEvent.class).build());
+
     public TopicPoliciesSystemTopicClient(PulsarClient client, TopicName 
topicName) {
         super(client, topicName);
+
     }
 
     @Override
     protected  CompletableFuture<Writer<PulsarEvent>> newWriterAsyncInternal() 
{
-        return client.newProducer(Schema.AVRO(PulsarEvent.class))
+        return client.newProducer(avroSchema)
                 .topic(topicName.toString())
                 .enableBatching(false)
                 .createAsync()
@@ -61,7 +67,7 @@ public class TopicPoliciesSystemTopicClient extends 
SystemTopicClientBase<Pulsar
 
     @Override
     protected CompletableFuture<Reader<PulsarEvent>> newReaderAsyncInternal() {
-        return client.newReader(Schema.AVRO(PulsarEvent.class))
+        return client.newReader(avroSchema)
                 .topic(topicName.toString())
                 .startMessageId(MessageId.earliest)
                 .readCompacted(true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index ba5e42867d3..cde41ffae68 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -33,6 +33,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@ import 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.BackoffBuilder;
 import org.apache.pulsar.common.events.PulsarEvent;
@@ -66,6 +68,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends 
MockedPulsarServic
     private static final String NAMESPACE2 = "system-topic/namespace-2";
     private static final String NAMESPACE3 = "system-topic/namespace-3";
 
+    private static final String NAMESPACE4 = "system-topic/namespace-4";
+
     private static final TopicName TOPIC1 = TopicName.get("persistent", 
NamespaceName.get(NAMESPACE1), "topic-1");
     private static final TopicName TOPIC2 = TopicName.get("persistent", 
NamespaceName.get(NAMESPACE1), "topic-2");
     private static final TopicName TOPIC3 = TopicName.get("persistent", 
NamespaceName.get(NAMESPACE2), "topic-1");
@@ -428,4 +432,35 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
 
         result.join();
     }
+
+    @Test
+    public void testWriterCache() throws Exception {
+        admin.namespaces().createNamespace(NAMESPACE4);
+        for (int i = 1; i <= 5; i ++) {
+            final String topicName = "persistent://" + NAMESPACE4 + 
"/testWriterCache" + i;
+            admin.topics().createNonPartitionedTopic(topicName);
+            
pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close();
+        }
+        @Cleanup("shutdown")
+        ExecutorService executorService = Executors.newFixedThreadPool(5);
+        for (int i = 1; i <= 5; i ++) {
+            int finalI = i;
+            executorService.execute(() -> {
+                final String topicName = "persistent://" + NAMESPACE4 + 
"/testWriterCache" + finalI;
+                try {
+                    admin.topicPolicies().setMaxConsumers(topicName, 2);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+        SystemTopicBasedTopicPoliciesService service = 
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+        
Assert.assertNotNull(service.getWriterCaches().synchronous().get(NamespaceName.get(NAMESPACE4)));
+        for (int i = 1; i <= 5; i ++) {
+            final String topicName = "persistent://" + NAMESPACE4 + 
"/testWriterCache" + i;
+            admin.topics().delete(topicName);
+        }
+        admin.namespaces().deleteNamespace(NAMESPACE4);
+        
Assert.assertNull(service.getWriterCaches().synchronous().getIfPresent(NamespaceName.get(NAMESPACE4)));
+    }
 }

Reply via email to