This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c1ba2b1 Fixed getting children of parent nodes in
LocalMemoryMetadataStore (#12491)
c1ba2b1 is described below
commit c1ba2b1cda191478b2919719e153822e479a01c9
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Oct 26 09:57:33 2021 -0700
Fixed getting children of parent nodes in LocalMemoryMetadataStore (#12491)
---
.../metadata/impl/LocalMemoryMetadataStore.java | 41 +++++++++++-----------
.../apache/pulsar/metadata/MetadataStoreTest.java | 29 ++++++++++++++-
2 files changed, 48 insertions(+), 22 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index a683bc9..740910d 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -27,13 +27,12 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-
import lombok.Data;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.GetResult;
@@ -118,16 +117,16 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
String firstKey = path.equals("/") ? path : path + "/";
String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is
lexicographically just after '/'
- List<String> children = new ArrayList<>();
+ Set<String> children = new TreeSet<>();
map.subMap(firstKey, false, lastKey, false).forEach((key, value)
-> {
String relativePath = key.replace(firstKey, "");
- if (!relativePath.contains("/")) {
- // Only return first-level children
- children.add(relativePath);
- }
+
+ // Only return first-level children
+ String child = relativePath.split("/", 2)[0];
+ children.add(child);
});
- return FutureUtils.value(children);
+ return FutureUtils.value(new ArrayList<>(children));
}
}
@@ -172,10 +171,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
return FutureUtils.exception(new BadVersionException(""));
} else {
receivedNotification(new
Notification(NotificationType.Created, path));
- String parent = parent(path);
- if (parent != null) {
- receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
- }
+ notifyParentChildrenChanged(path);
return FutureUtils.value(new Stat(path, 0, now, now,
newValue.isEphemeral(), true));
}
} else {
@@ -194,10 +190,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
existingValue == null ? NotificationType.Created :
NotificationType.Modified;
receivedNotification(new Notification(type, path));
if (type == NotificationType.Created) {
- String parent = parent(path);
- if (parent != null) {
- receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
- }
+ notifyParentChildrenChanged(path);
}
return FutureUtils
.value(new Stat(path, newValue.version,
newValue.createdTimestamp,
@@ -223,15 +216,21 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
} else {
map.remove(path);
receivedNotification(new
Notification(NotificationType.Deleted, path));
- String parent = parent(path);
- if (parent != null) {
- receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
- }
+
+ notifyParentChildrenChanged(path);
return FutureUtils.value(null);
}
}
}
+ private void notifyParentChildrenChanged(String path) {
+ String parent = parent(path);
+ while (parent != null) {
+ receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
+ parent = parent(parent);
+ }
+ }
+
private static boolean isValidPath(String path) {
if (path == null || !path.startsWith("/")) {
return false;
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index db6b4b9..4bb690f 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
+import org.assertj.core.util.Lists;
import org.testng.annotations.Test;
public class MetadataStoreTest extends BaseMetadataStoreTest {
@@ -142,6 +143,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
int n = 10;
List<String> expectedChildren = new ArrayList<>();
+ assertEquals(store.getChildren(key).join(), Collections.emptyList());
+
for (int i = 0; i < n; i++) {
store.put(key + "/c-" + i, new byte[0], Optional.empty()).join();
@@ -156,6 +159,30 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
}
assertEquals(store.getChildren(key).join(), expectedChildren);
+
+ for (int i = 0; i < n; i++) {
+ store.deleteRecursive(key + "/c-" + i).join();
+ }
+
+ assertEquals(store.getChildren(key).join(), Collections.emptyList());
+ }
+
+ @Test(dataProvider = "impl")
+ public void navigateChildrenTest(String provider, Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+
+ String key = newKey();
+
+ // Nested children
+ store.put(key + "/c-0/cc-1", new byte[0], Optional.empty()).join();
+ store.put(key + "/c-0/cc-2/ccc-1", new byte[0],
Optional.empty()).join();
+
+ assertEquals(store.getChildren(key).join(),
Collections.singletonList("c-0"));
+ assertEquals(store.getChildren(key + "/c-0").join(),
+ Lists.newArrayList("cc-1", "cc-2"));
+ assertEquals(store.getChildren(key + "/c-0/cc-2").join(),
+ Lists.newArrayList("ccc-1"));
}
@Test(dataProvider = "impl")
@@ -300,7 +327,7 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
}
@Test(dataProvider = "impl")
- public void testDeleteRecursive(String provider, Supplier<String>
urlSupplier) throws Exception {
+ public void testDeleteRecursive(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());