This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2a050d17c9b [improve][broker] Cache the internal writer when sent to
system topic. (#22099)
2a050d17c9b is described below
commit 2a050d17c9b87d8a4868a5613a5d2567126dbdb1
Author: Jiwei Guo <[email protected]>
AuthorDate: Sat Feb 24 08:04:37 2024 +0800
[improve][broker] Cache the internal writer when sent to system topic.
(#22099)
(cherry picked from commit 8607905989081421c452e87db7b1eedececd7977)
---
.../SystemTopicBasedTopicPoliciesService.java | 84 ++++++++++++++--------
.../systopic/TopicPoliciesSystemTopicClient.java | 10 ++-
.../SystemTopicBasedTopicPoliciesServiceTest.java | 35 +++++++++
3 files changed, 97 insertions(+), 32 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 80fecbe67b6..71f78e21f93 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.broker.service;
import static java.util.Objects.requireNonNull;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.util.HashSet;
@@ -29,6 +31,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.MutablePair;
@@ -84,10 +87,25 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
@VisibleForTesting
final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners =
new ConcurrentHashMap<>();
+ private final AsyncLoadingCache<NamespaceName,
SystemTopicClient.Writer<PulsarEvent>> writerCaches;
+
public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
this.pulsarService = pulsarService;
this.clusterName = pulsarService.getConfiguration().getClusterName();
this.localCluster = Sets.newHashSet(clusterName);
+ this.writerCaches = Caffeine.newBuilder()
+ .expireAfterAccess(5, TimeUnit.MINUTES)
+ .removalListener((namespaceName, writer, cause) -> {
+ ((SystemTopicClient.Writer)
writer).closeAsync().exceptionally(ex -> {
+ log.error("[{}] Close writer error.", namespaceName,
ex);
+ return null;
+ });
+ })
+ .buildAsync((namespaceName, executor) -> {
+ SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory
+
.createTopicPoliciesSystemTopicClient(namespaceName);
+ return systemTopicClient.newWriterAsync();
+ });
}
@Override
@@ -122,39 +140,32 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
} catch (PulsarServerException e) {
return CompletableFuture.failedFuture(e);
}
-
- SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory
-
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
-
- return systemTopicClient.newWriterAsync()
- .thenCompose(writer -> {
- PulsarEvent event = getPulsarEvent(topicName,
actionType, policies);
- CompletableFuture<MessageId> writeFuture =
- ActionType.DELETE.equals(actionType) ?
writer.deleteAsync(getEventKey(event), event)
- :
writer.writeAsync(getEventKey(event), event);
- return writeFuture.handle((messageId, e) -> {
- if (e != null) {
- return CompletableFuture.failedFuture(e);
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ writerCaches.get(topicName.getNamespaceObject())
+ .whenComplete((writer, cause) -> {
+ if (cause != null) {
+
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());
+ result.completeExceptionally(cause);
} else {
- if (messageId != null) {
- return
CompletableFuture.completedFuture(null);
- } else {
- return CompletableFuture.failedFuture(
- new RuntimeException("Got
message id is null."));
- }
- }
- }).thenRun(() ->
- writer.closeAsync().whenComplete((v,
cause) -> {
- if (cause != null) {
- log.error("[{}] Close writer
error.", topicName, cause);
+ PulsarEvent event =
getPulsarEvent(topicName, actionType, policies);
+ CompletableFuture<MessageId> writeFuture =
ActionType.DELETE.equals(actionType)
+ ?
writer.deleteAsync(getEventKey(event), event)
+ :
writer.writeAsync(getEventKey(event), event);
+ writeFuture.whenComplete((messageId, e) ->
{
+ if (e != null) {
+ result.completeExceptionally(e);
+ } else {
+ if (messageId != null) {
+ result.complete(null);
} else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Close
writer success.", topicName);
- }
+ result.completeExceptionally(
+ new
RuntimeException("Got message id is null."));
}
- })
- );
+ }
+ });
+ }
});
+ return result;
});
}
@@ -364,7 +375,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
AtomicInteger bundlesCount =
ownedBundlesCountPerNamespace.get(namespace);
if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
- cleanCacheAndCloseReader(namespace, true);
+ cleanCacheAndCloseReader(namespace, true, true);
}
return CompletableFuture.completedFuture(null);
}
@@ -440,6 +451,14 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace,
boolean cleanOwnedBundlesCount) {
+ cleanCacheAndCloseReader(namespace, cleanOwnedBundlesCount, false);
+ }
+
+ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace,
boolean cleanOwnedBundlesCount,
+ boolean cleanWriterCache) {
+ if (cleanWriterCache) {
+ writerCaches.synchronous().invalidate(namespace);
+ }
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture
= readerCaches.remove(namespace);
if (cleanOwnedBundlesCount) {
@@ -688,5 +707,10 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
return listeners;
}
+ @VisibleForTesting
+ protected AsyncLoadingCache<NamespaceName,
SystemTopicClient.Writer<PulsarEvent>> getWriterCaches() {
+ return writerCaches;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index 3fd8921c15e..b7cff2e08c2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -30,6 +30,8 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.TopicName;
@@ -41,13 +43,17 @@ import org.slf4j.LoggerFactory;
*/
public class TopicPoliciesSystemTopicClient extends
SystemTopicClientBase<PulsarEvent> {
+ static Schema<PulsarEvent> avroSchema =
DefaultImplementation.getDefaultImplementation()
+
.newAvroSchema(SchemaDefinition.builder().withPojo(PulsarEvent.class).build());
+
public TopicPoliciesSystemTopicClient(PulsarClient client, TopicName
topicName) {
super(client, topicName);
+
}
@Override
protected CompletableFuture<Writer<PulsarEvent>> newWriterAsyncInternal()
{
- return client.newProducer(Schema.AVRO(PulsarEvent.class))
+ return client.newProducer(avroSchema)
.topic(topicName.toString())
.enableBatching(false)
.createAsync()
@@ -61,7 +67,7 @@ public class TopicPoliciesSystemTopicClient extends
SystemTopicClientBase<Pulsar
@Override
protected CompletableFuture<Reader<PulsarEvent>> newReaderAsyncInternal() {
- return client.newReader(Schema.AVRO(PulsarEvent.class))
+ return client.newReader(avroSchema)
.topic(topicName.toString())
.startMessageId(MessageId.earliest)
.readCompacted(true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index ba5e42867d3..cde41ffae68 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -33,6 +33,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@ import
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.events.PulsarEvent;
@@ -66,6 +68,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends
MockedPulsarServic
private static final String NAMESPACE2 = "system-topic/namespace-2";
private static final String NAMESPACE3 = "system-topic/namespace-3";
+ private static final String NAMESPACE4 = "system-topic/namespace-4";
+
private static final TopicName TOPIC1 = TopicName.get("persistent",
NamespaceName.get(NAMESPACE1), "topic-1");
private static final TopicName TOPIC2 = TopicName.get("persistent",
NamespaceName.get(NAMESPACE1), "topic-2");
private static final TopicName TOPIC3 = TopicName.get("persistent",
NamespaceName.get(NAMESPACE2), "topic-1");
@@ -428,4 +432,35 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
result.join();
}
+
+ @Test
+ public void testWriterCache() throws Exception {
+ admin.namespaces().createNamespace(NAMESPACE4);
+ for (int i = 1; i <= 5; i ++) {
+ final String topicName = "persistent://" + NAMESPACE4 +
"/testWriterCache" + i;
+ admin.topics().createNonPartitionedTopic(topicName);
+
pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close();
+ }
+ @Cleanup("shutdown")
+ ExecutorService executorService = Executors.newFixedThreadPool(5);
+ for (int i = 1; i <= 5; i ++) {
+ int finalI = i;
+ executorService.execute(() -> {
+ final String topicName = "persistent://" + NAMESPACE4 +
"/testWriterCache" + finalI;
+ try {
+ admin.topicPolicies().setMaxConsumers(topicName, 2);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ SystemTopicBasedTopicPoliciesService service =
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+
Assert.assertNotNull(service.getWriterCaches().synchronous().get(NamespaceName.get(NAMESPACE4)));
+ for (int i = 1; i <= 5; i ++) {
+ final String topicName = "persistent://" + NAMESPACE4 +
"/testWriterCache" + i;
+ admin.topics().delete(topicName);
+ }
+ admin.namespaces().deleteNamespace(NAMESPACE4);
+
Assert.assertNull(service.getWriterCaches().synchronous().getIfPresent(NamespaceName.get(NAMESPACE4)));
+ }
}