This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 2389e65f468 [fix][broker] Fix get topic policies as null during clean
cache (#20763)
2389e65f468 is described below
commit 2389e65f46808e4e74b11f97d5f33af18ac08e4f
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Jul 12 20:03:50 2023 +0800
[fix][broker] Fix get topic policies as null during clean cache (#20763)
(cherry picked from commit 3116abf30f7f4c8a2a5e608e1ca672e2bada3e2d)
---
.../SystemTopicBasedTopicPoliciesService.java | 30 +++++++--
.../SystemTopicBasedTopicPoliciesServiceTest.java | 76 +++++++++++++++++++++-
2 files changed, 97 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 162e77cec32..652ad268a80 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
+import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -191,11 +192,24 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
NamespaceName namespace = topicName.getNamespaceObject();
prepareInitPoliciesCache(namespace, new CompletableFuture<>());
}
- if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
- && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
- throw new TopicPoliciesCacheNotInitException();
+
+ MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result
= new MutablePair<>();
+ policyCacheInitMap.compute(topicName.getNamespaceObject(), (k,
initialized) -> {
+ if (initialized == null || !initialized) {
+ result.setLeft(new TopicPoliciesCacheNotInitException());
+ } else {
+ TopicPolicies topicPolicies =
+
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+ result.setRight(topicPolicies);
+ }
+ return initialized;
+ });
+
+ if (result.getLeft() != null) {
+ throw result.getLeft();
+ } else {
+ return result.getRight();
}
- return
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
}
@Override
@@ -359,7 +373,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace,
boolean cleanOwnedBundlesCount) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture
= readerCaches.remove(namespace);
- policiesCache.entrySet().removeIf(entry ->
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+
if (cleanOwnedBundlesCount) {
ownedBundlesCountPerNamespace.remove(namespace);
}
@@ -370,7 +384,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
return null;
});
}
- policyCacheInitMap.remove(namespace);
+
+ policyCacheInitMap.compute(namespace, (k, v) -> {
+ policiesCache.entrySet().removeIf(entry ->
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+ return null;
+ });
}
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent>
reader) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 8b5f4203c84..fdb0c34e7dc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -22,7 +22,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
@@ -34,10 +33,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
@@ -45,10 +47,8 @@ import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
-import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -56,13 +56,16 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
+import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
+@Slf4j
public class SystemTopicBasedTopicPoliciesServiceTest extends
MockedPulsarServiceBaseTest {
private static final String NAMESPACE1 = "system-topic/namespace-1";
@@ -376,4 +379,71 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
}
});
}
+
+ @Test
+ public void testGetTopicPoliciesWithCleanCache() throws Exception {
+ final String topic = "persistent://" + NAMESPACE1 + "/test" +
UUID.randomUUID();
+ pulsarClient.newProducer().topic(topic).create().close();
+
+ SystemTopicBasedTopicPoliciesService topicPoliciesService =
+ (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+
+ ConcurrentHashMap<TopicName, TopicPolicies> spyPoliciesCache = spy(new
ConcurrentHashMap<TopicName, TopicPolicies>());
+ FieldUtils.writeDeclaredField(topicPoliciesService, "policiesCache",
spyPoliciesCache, true);
+
+ Awaitility.await().untilAsserted(() -> {
+
Assertions.assertThat(topicPoliciesService.getTopicPolicies(TopicName.get(topic))).isNull();
+ });
+
+ admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+ Awaitility.await().untilAsserted(() -> {
+
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
+ });
+
+ Map<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readers =
+ (Map<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>)
+ FieldUtils.readDeclaredField(topicPoliciesService,
"readerCaches", true);
+
+ Mockito.doAnswer(invocation -> {
+ Thread.sleep(1000);
+ return invocation.callRealMethod();
+ }).when(spyPoliciesCache).get(Mockito.any());
+
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ Thread thread = new Thread(() -> {
+ TopicPolicies topicPolicies;
+ for (int i = 0; i < 10; i++) {
+ try {
+ topicPolicies =
topicPoliciesService.getTopicPolicies(TopicName.get(topic));
+ Assert.assertNotNull(topicPolicies);
+ Thread.sleep(500);
+ } catch
(BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.warn("topic policies cache not init, retry...");
+ } catch (Throwable e) {
+ log.error("ops: ", e);
+ result.completeExceptionally(e);
+ return;
+ }
+ }
+ result.complete(null);
+ });
+
+ Thread thread2 = new Thread(() -> {
+ for (int i = 0; i < 10; i++) {
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+ readers.get(TopicName.get(topic).getNamespaceObject());
+ if (readerCompletableFuture != null) {
+ readerCompletableFuture.join().closeAsync().join();
+ }
+ }
+ });
+
+ thread.start();
+ thread2.start();
+
+ thread.join();
+ thread2.join();
+
+ result.join();
+ }
}