This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8b1c2a4d3a9 [fix][bk] Improve to the ReplicaitonWorker performance by 
deleting invalid underreplication nodes (#21160)
8b1c2a4d3a9 is described below

commit 8b1c2a4d3a96cc8c637dbce73ce9a9c2adef7edf
Author: Yan Zhao <[email protected]>
AuthorDate: Fri Sep 15 20:17:55 2023 +0800

    [fix][bk] Improve to the ReplicaitonWorker performance by deleting invalid 
underreplication nodes (#21160)
---
 .../PulsarLedgerUnderreplicationManager.java       | 29 ++++++++++
 .../LedgerUnderreplicationManagerTest.java         | 66 ++++++++++++++++++++++
 2 files changed, 95 insertions(+)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
index 79fdc44cb2b..dda8d7256ed 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
@@ -25,10 +25,12 @@ import static 
org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
 import static 
org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat;
 import static org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat;
 import static 
org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
+import com.google.common.base.Joiner;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.TextFormat;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -61,6 +63,8 @@ import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.KeeperException;
 
 @Slf4j
 public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicationManager {
@@ -393,6 +397,31 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
             Lock l = heldLocks.get(ledgerId);
             if (l != null) {
                 store.delete(getUrLedgerPath(ledgerId), 
Optional.of(l.getLedgerNodeVersion())).get();
+                if (store instanceof ZKMetadataStore) {
+                    try {
+                        // clean up the hierarchy
+                        String[] parts = getUrLedgerPath(ledgerId).split("/");
+                        for (int i = 1; i <= 4; i++) {
+                            String[] p = Arrays.copyOf(parts, parts.length - 
i);
+                            String path = Joiner.on("/").join(p);
+                            Optional<GetResult> getResult = 
store.get(path).get();
+                            if (getResult.isPresent()) {
+                                store.delete(path, 
Optional.of(getResult.get().getStat().getVersion())).get();
+                            }
+                        }
+                    } catch (ExecutionException ee) {
+                        // This can happen when cleaning up the hierarchy.
+                        // It's safe to ignore, it simply means another
+                        // ledger in the same hierarchy has been marked as
+                        // underreplicated.
+                        if (ee.getCause() instanceof MetadataStoreException && 
ee.getCause().getCause()
+                                instanceof KeeperException.NotEmptyException) {
+                            //do nothing.
+                        } else {
+                            log.warn("Error deleting underrepcalited ledger 
parent node", ee);
+                        }
+                    }
+                }
             }
         } catch (ExecutionException ee) {
             if (ee.getCause() instanceof 
MetadataStoreException.NotFoundException) {
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
index 0df325b3c57..649dc1663c6 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
@@ -23,12 +23,14 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.protobuf.TextFormat;
+import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -54,6 +56,7 @@ import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.metadata.BaseMetadataStoreTest;
+import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -296,6 +299,69 @@ public class LedgerUnderreplicationManagerTest extends 
BaseMetadataStoreTest {
         assertEquals(l, lB.get(), "Should be the ledger I marked");
     }
 
+
+    @Test(timeOut = 10000)
+    public void testZkMetasStoreMarkReplicatedDeleteEmptyParentNodes() throws 
Exception {
+        methodSetup(stringSupplier(() -> zks.getConnectionString()));
+
+        String missingReplica = "localhost:3181";
+
+        @Cleanup
+        LedgerUnderreplicationManager m1 = 
lmf.newLedgerUnderreplicationManager();
+
+        Long ledgerA = 0xfeadeefdacL;
+        m1.markLedgerUnderreplicated(ledgerA, missingReplica);
+
+        Field storeField = m1.getClass().getDeclaredField("store");
+        storeField.setAccessible(true);
+        MetadataStoreExtended metadataStore = (MetadataStoreExtended) 
storeField.get(m1);
+
+        String fiveLevelPath = 
PulsarLedgerUnderreplicationManager.getUrLedgerPath(urLedgerPath, ledgerA);
+        Optional<GetResult> getResult = 
metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        String fourLevelPath = fiveLevelPath.substring(0, 
fiveLevelPath.lastIndexOf("/"));
+        getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        String threeLevelPath = fourLevelPath.substring(0, 
fourLevelPath.lastIndexOf("/"));
+        getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        String twoLevelPath = fourLevelPath.substring(0, 
threeLevelPath.lastIndexOf("/"));
+        getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        String oneLevelPath = fourLevelPath.substring(0, 
twoLevelPath.lastIndexOf("/"));
+        getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        long ledgerToRereplicate = m1.getLedgerToRereplicate();
+        assertEquals(ledgerToRereplicate, ledgerA);
+        m1.markLedgerReplicated(ledgerA);
+
+        getResult = metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS);
+        assertFalse(getResult.isPresent());
+
+        getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS);
+        assertFalse(getResult.isPresent());
+
+        getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS);
+        assertFalse(getResult.isPresent());
+
+        getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS);
+        assertFalse(getResult.isPresent());
+
+        getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS);
+        assertFalse(getResult.isPresent());
+
+        getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+    }
+
     /**
      * Test releasing of a ledger
      * A ledger is released when a client decides it does not want

Reply via email to