This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b8e6948f62d [fix][broker] Broker failed to load v1 namespace resources
cache (#20783)
b8e6948f62d is described below
commit b8e6948f62d6ec2ca53b6a85fe2fd07d4dee6853
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri Jul 21 15:41:50 2023 -0700
[fix][broker] Broker failed to load v1 namespace resources cache (#20783)
---
.../pulsar/broker/resources/BaseResources.java | 36 ++++++++++++++++++++++
.../broker/resources/NamespaceResources.java | 2 +-
.../pulsar/broker/admin/PersistentTopicsTest.java | 14 +++++++++
3 files changed, 51 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 42add4271f6..4011a482075 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -20,11 +20,16 @@ package org.apache.pulsar.broker.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -78,6 +83,37 @@ public class BaseResources<T> {
return cache.getChildren(path);
}
+ protected CompletableFuture<List<String>> getChildrenRecursiveAsync(String
path) {
+ Set<String> children = ConcurrentHashMap.newKeySet();
+ CompletableFuture<List<String>> result = new CompletableFuture<>();
+ getChildrenRecursiveAsync(path, children, result, new
AtomicInteger(1), path);
+ return result;
+ }
+
+ private void getChildrenRecursiveAsync(String path, Set<String> children,
CompletableFuture<List<String>> result,
+ AtomicInteger totalResults, String parent) {
+ cache.getChildren(path).thenAccept(childList -> {
+ childList = childList != null ? childList :
Collections.emptyList();
+ if (totalResults.decrementAndGet() == 0 && childList.isEmpty()) {
+ result.complete(new ArrayList<>(children));
+ return;
+ }
+ if (childList.isEmpty()) {
+ return;
+ }
+ // remove current node from children if current node is not leaf
+ children.remove(parent);
+ // childPrefix creates a path hierarchy if children has multi
level path
+ String childPrefix = path.equals(parent) ? "" : parent + "/";
+ totalResults.addAndGet(childList.size());
+ for (String child : childList) {
+ children.add(childPrefix + child);
+ String childPath = path + "/" + child;
+ getChildrenRecursiveAsync(childPath, children, result,
totalResults, child);
+ }
+ });
+ }
+
protected Optional<T> get(String path) throws MetadataStoreException {
try {
return getAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 48f82596567..e5dd13c32eb 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -62,7 +62,7 @@ public class NamespaceResources extends
BaseResources<Policies> {
}
public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
- return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant));
+ return getChildrenRecursiveAsync(joinPath(BASE_POLICIES_PATH, tenant));
}
public CompletableFuture<Boolean> getPoliciesReadOnlyAsync() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 83ed63bf0d9..a4f6bd4650f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
@@ -1700,4 +1701,17 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
// verify we only call getReplicatedSubscriptionStatusAsync once.
verify(topics, times(1)).getReplicatedSubscriptionStatusAsync(any(),
any());
}
+
+ @Test
+ public void testNamespaceResources() throws Exception {
+ String ns1V1 = "test/" + testNamespace + "v1";
+ String ns1V2 = testNamespace + "v2";
+ admin.namespaces().createNamespace(testTenant+"/"+ns1V1);
+ admin.namespaces().createNamespace(testTenant+"/"+ns1V2);
+
+ List<String> namespaces =
pulsar.getPulsarResources().getNamespaceResources().listNamespacesAsync(testTenant)
+ .get();
+ assertTrue(namespaces.contains(ns1V2));
+ assertTrue(namespaces.contains(ns1V1));
+ }
}