This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8b1c2a4d3a9 [fix][bk] Improve to the ReplicaitonWorker performance by
deleting invalid underreplication nodes (#21160)
8b1c2a4d3a9 is described below
commit 8b1c2a4d3a96cc8c637dbce73ce9a9c2adef7edf
Author: Yan Zhao <[email protected]>
AuthorDate: Fri Sep 15 20:17:55 2023 +0800
[fix][bk] Improve to the ReplicaitonWorker performance by deleting invalid
underreplication nodes (#21160)
---
.../PulsarLedgerUnderreplicationManager.java | 29 ++++++++++
.../LedgerUnderreplicationManagerTest.java | 66 ++++++++++++++++++++++
2 files changed, 95 insertions(+)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
index 79fdc44cb2b..dda8d7256ed 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
@@ -25,10 +25,12 @@ import static
org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
import static
org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat;
import static org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat;
import static
org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
+import com.google.common.base.Joiner;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -61,6 +63,8 @@ import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.KeeperException;
@Slf4j
public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicationManager {
@@ -393,6 +397,31 @@ public class PulsarLedgerUnderreplicationManager
implements LedgerUnderreplicati
Lock l = heldLocks.get(ledgerId);
if (l != null) {
store.delete(getUrLedgerPath(ledgerId),
Optional.of(l.getLedgerNodeVersion())).get();
+ if (store instanceof ZKMetadataStore) {
+ try {
+ // clean up the hierarchy
+ String[] parts = getUrLedgerPath(ledgerId).split("/");
+ for (int i = 1; i <= 4; i++) {
+ String[] p = Arrays.copyOf(parts, parts.length -
i);
+ String path = Joiner.on("/").join(p);
+ Optional<GetResult> getResult =
store.get(path).get();
+ if (getResult.isPresent()) {
+ store.delete(path,
Optional.of(getResult.get().getStat().getVersion())).get();
+ }
+ }
+ } catch (ExecutionException ee) {
+ // This can happen when cleaning up the hierarchy.
+ // It's safe to ignore, it simply means another
+ // ledger in the same hierarchy has been marked as
+ // underreplicated.
+ if (ee.getCause() instanceof MetadataStoreException &&
ee.getCause().getCause()
+ instanceof KeeperException.NotEmptyException) {
+ //do nothing.
+ } else {
+ log.warn("Error deleting underrepcalited ledger
parent node", ee);
+ }
+ }
+ }
}
} catch (ExecutionException ee) {
if (ee.getCause() instanceof
MetadataStoreException.NotFoundException) {
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
index 0df325b3c57..649dc1663c6 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
@@ -23,12 +23,14 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.protobuf.TextFormat;
+import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -54,6 +56,7 @@ import
org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
+import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -296,6 +299,69 @@ public class LedgerUnderreplicationManagerTest extends
BaseMetadataStoreTest {
assertEquals(l, lB.get(), "Should be the ledger I marked");
}
+
+ @Test(timeOut = 10000)
+ public void testZkMetasStoreMarkReplicatedDeleteEmptyParentNodes() throws
Exception {
+ methodSetup(stringSupplier(() -> zks.getConnectionString()));
+
+ String missingReplica = "localhost:3181";
+
+ @Cleanup
+ LedgerUnderreplicationManager m1 =
lmf.newLedgerUnderreplicationManager();
+
+ Long ledgerA = 0xfeadeefdacL;
+ m1.markLedgerUnderreplicated(ledgerA, missingReplica);
+
+ Field storeField = m1.getClass().getDeclaredField("store");
+ storeField.setAccessible(true);
+ MetadataStoreExtended metadataStore = (MetadataStoreExtended)
storeField.get(m1);
+
+ String fiveLevelPath =
PulsarLedgerUnderreplicationManager.getUrLedgerPath(urLedgerPath, ledgerA);
+ Optional<GetResult> getResult =
metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS);
+ assertTrue(getResult.isPresent());
+
+ String fourLevelPath = fiveLevelPath.substring(0,
fiveLevelPath.lastIndexOf("/"));
+ getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS);
+ assertTrue(getResult.isPresent());
+
+ String threeLevelPath = fourLevelPath.substring(0,
fourLevelPath.lastIndexOf("/"));
+ getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS);
+ assertTrue(getResult.isPresent());
+
+ String twoLevelPath = fourLevelPath.substring(0,
threeLevelPath.lastIndexOf("/"));
+ getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS);
+ assertTrue(getResult.isPresent());
+
+ String oneLevelPath = fourLevelPath.substring(0,
twoLevelPath.lastIndexOf("/"));
+ getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS);
+ assertTrue(getResult.isPresent());
+
+ getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS);
+ assertTrue(getResult.isPresent());
+
+ long ledgerToRereplicate = m1.getLedgerToRereplicate();
+ assertEquals(ledgerToRereplicate, ledgerA);
+ m1.markLedgerReplicated(ledgerA);
+
+ getResult = metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS);
+ assertFalse(getResult.isPresent());
+
+ getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS);
+ assertFalse(getResult.isPresent());
+
+ getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS);
+ assertFalse(getResult.isPresent());
+
+ getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS);
+ assertFalse(getResult.isPresent());
+
+ getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS);
+ assertFalse(getResult.isPresent());
+
+ getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS);
+ assertTrue(getResult.isPresent());
+ }
+
/**
* Test releasing of a ledger
* A ledger is released when a client decides it does not want