This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ae09fa7643192cb4c4f9353e3bbb37aa37319860 Author: Yan Zhao <[email protected]> AuthorDate: Thu Aug 31 15:12:03 2023 +0800 [fix][auto-recovery] Improve to the ReplicaitonWorker performance by deleting invalid underreplication nodes (#21059) (cherry picked from commit ba0f2ba38bb88869b8bf664cb20ef41d47a73026) --- .../gradle-enterprise-workspace-id | 1 + .../bookkeeper/AbstractMetadataDriver.java | 2 + .../PulsarLedgerUnderreplicationManager.java | 38 ++++++++++++- .../LedgerUnderreplicationManagerTest.java | 66 ++++++++++++++++++++++ 4 files changed, 106 insertions(+), 1 deletion(-) diff --git a/.mvn/.gradle-enterprise/gradle-enterprise-workspace-id b/.mvn/.gradle-enterprise/gradle-enterprise-workspace-id new file mode 100644 index 00000000000..663b50b8c2e --- /dev/null +++ b/.mvn/.gradle-enterprise/gradle-enterprise-workspace-id @@ -0,0 +1 @@ +s3kqc43mf5g5vgx62q3q6vo3hm \ No newline at end of file diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java index 76a14300d0b..8af9bb91f5b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java @@ -21,6 +21,7 @@ package org.apache.pulsar.metadata.bookkeeper; import java.io.Closeable; import java.io.IOException; import java.net.URI; +import java.util.concurrent.TimeUnit; import lombok.SneakyThrows; import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.discover.RegistrationClient; @@ -40,6 +41,7 @@ public abstract class AbstractMetadataDriver implements Closeable { public static final String METADATA_STORE_SCHEME = "metadata-store"; public static final String METADATA_STORE_INSTANCE = "metadata-store-instance"; + public static final long BLOCKING_CALL_TIMEOUT = TimeUnit.SECONDS.toMillis(30); protected MetadataStoreExtended store; private boolean storeInstanceIsOwned; diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java index f58a2752180..eeedf54f3bb 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java @@ -19,16 +19,20 @@ package org.apache.pulsar.metadata.bookkeeper; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.bookkeeper.proto.DataFormats.CheckAllLedgersFormat; import static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat; import static org.apache.bookkeeper.proto.DataFormats.LockDataFormat; import static org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat; import static org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat; import static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat; +import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT; +import com.google.common.base.Joiner; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.TextFormat; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -41,6 +45,7 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -61,6 +66,8 @@ import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.zookeeper.KeeperException; @Slf4j public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicationManager { @@ -392,7 +399,34 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati try { Lock l = heldLocks.get(ledgerId); if (l != null) { - store.delete(getUrLedgerPath(ledgerId), Optional.of(l.getLedgerNodeVersion())).get(); + store.delete(getUrLedgerPath(ledgerId), Optional.of(l.getLedgerNodeVersion())) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + if (store instanceof ZKMetadataStore) { + try { + // clean up the hierarchy + String[] parts = getUrLedgerPath(ledgerId).split("/"); + for (int i = 1; i <= 4; i++) { + String[] p = Arrays.copyOf(parts, parts.length - i); + String path = Joiner.on("/").join(p); + Optional<GetResult> getResult = store.get(path).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + if (getResult.isPresent()) { + store.delete(path, Optional.of(getResult.get().getStat().getVersion())) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + } + } + } catch (ExecutionException ee) { + // This can happen when cleaning up the hierarchy. + // It's safe to ignore, it simply means another + // ledger in the same hierarchy has been marked as + // underreplicated. + if (ee.getCause() instanceof MetadataStoreException && ee.getCause().getCause() + instanceof KeeperException.NotEmptyException) { + //do nothing. + } else { + log.warn("Error deleting underrepcalited ledger parent node", ee); + } + } + } } } catch (ExecutionException ee) { if (ee.getCause() instanceof MetadataStoreException.NotFoundException) { @@ -405,6 +439,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.error("Error deleting underreplicated ledger node", ee); throw new ReplicationException.UnavailableException("Error contacting metadata store", ee); } + } catch (TimeoutException ex) { + throw new ReplicationException.UnavailableException("Error contacting metadata store", ex); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting metadata store", ie); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java index 661eb13ac28..1c8b62642da 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java @@ -23,12 +23,14 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.protobuf.TextFormat; +import java.lang.reflect.Field; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -54,6 +56,7 @@ import org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.metadata.BaseMetadataStoreTest; +import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -295,6 +298,69 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest { assertEquals(l, lB.get(), "Should be the ledger I marked"); } + + @Test(timeOut = 10000) + public void testZkMetasStoreMarkReplicatedDeleteEmptyParentNodes() throws Exception { + methodSetup(stringSupplier(() -> zks.getConnectionString())); + + String missingReplica = "localhost:3181"; + + @Cleanup + LedgerUnderreplicationManager m1 = lmf.newLedgerUnderreplicationManager(); + + Long ledgerA = 0xfeadeefdacL; + m1.markLedgerUnderreplicated(ledgerA, missingReplica); + + Field storeField = m1.getClass().getDeclaredField("store"); + storeField.setAccessible(true); + MetadataStoreExtended metadataStore = (MetadataStoreExtended) storeField.get(m1); + + String fiveLevelPath = PulsarLedgerUnderreplicationManager.getUrLedgerPath(urLedgerPath, ledgerA); + Optional<GetResult> getResult = metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + String fourLevelPath = fiveLevelPath.substring(0, fiveLevelPath.lastIndexOf("/")); + getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + String threeLevelPath = fourLevelPath.substring(0, fourLevelPath.lastIndexOf("/")); + getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + String twoLevelPath = fourLevelPath.substring(0, threeLevelPath.lastIndexOf("/")); + getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + String oneLevelPath = fourLevelPath.substring(0, twoLevelPath.lastIndexOf("/")); + getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + long ledgerToRereplicate = m1.getLedgerToRereplicate(); + assertEquals(Long.valueOf(ledgerToRereplicate), ledgerA); + m1.markLedgerReplicated(ledgerA); + + getResult = metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS); + assertFalse(getResult.isPresent()); + + getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS); + assertFalse(getResult.isPresent()); + + getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS); + assertFalse(getResult.isPresent()); + + getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS); + assertFalse(getResult.isPresent()); + + getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS); + assertFalse(getResult.isPresent()); + + getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + } + /** * Test releasing of a ledger * A ledger is released when a client decides it does not want
