This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eefc51712c9 [fix] [auto-recovery] Fix pulsar ledger auditor dead lock
problem. (#21181)
eefc51712c9 is described below
commit eefc51712c97515e5913f6e5fceb4e98f5cd3b18
Author: Yan Zhao <[email protected]>
AuthorDate: Tue Sep 19 11:47:44 2023 +0800
[fix] [auto-recovery] Fix pulsar ledger auditor dead lock problem. (#21181)
---
.../bookkeeper/PulsarLedgerAuditorManager.java | 13 ++
.../replication/AuditorPeriodicCheckTest.java | 8 +-
.../replication/AutoRecoveryMainTest.java | 201 +++++++++++++++++++++
3 files changed, 218 insertions(+), 4 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
index bc35380fec1..d664ecdcd20 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
@@ -27,6 +27,7 @@ import
org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
@Slf4j
@@ -38,6 +39,7 @@ class PulsarLedgerAuditorManager implements
LedgerAuditorManager {
private final LeaderElection<String> leaderElection;
private LeaderElectionState leaderElectionState;
private String bookieId;
+ private boolean sessionExpired = false;
PulsarLedgerAuditorManager(MetadataStoreExtended store, String
ledgersRoot) {
this.coordinationService = new CoordinationServiceImpl(store);
@@ -47,6 +49,14 @@ class PulsarLedgerAuditorManager implements
LedgerAuditorManager {
this.leaderElection =
coordinationService.getLeaderElection(String.class,
electionPath, this::handleStateChanges);
this.leaderElectionState = LeaderElectionState.NoLeader;
+ store.registerSessionListener(event -> {
+ if (SessionEvent.SessionLost == event) {
+ synchronized (this) {
+ sessionExpired = true;
+ notifyAll();
+ }
+ }
+ });
}
private void handleStateChanges(LeaderElectionState state) {
@@ -71,6 +81,9 @@ class PulsarLedgerAuditorManager implements
LedgerAuditorManager {
while (true) {
try {
synchronized (this) {
+ if (sessionExpired) {
+ throw new IllegalStateException("Zookeeper session
expired, give up to become auditor.");
+ }
if (leaderElectionState == LeaderElectionState.Leading) {
return;
} else {
diff --git
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index c761d46c622..901361dd3a2 100644
---
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -44,8 +44,8 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeTest;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
@@ -68,7 +68,7 @@ public class AuditorPeriodicCheckTest extends
BookKeeperClusterTestCase {
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver");
}
- @BeforeTest
+ @BeforeMethod
@Override
public void setUp() throws Exception {
super.setUp();
@@ -99,7 +99,7 @@ public class AuditorPeriodicCheckTest extends
BookKeeperClusterTestCase {
driver.initialize(serverConfiguration, NullStatsLogger.INSTANCE);
}
- @AfterTest
+ @AfterMethod
@Override
public void tearDown() throws Exception {
if (null != driver) {
diff --git
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
new file mode 100644
index 00000000000..d12ee177ece
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.TestUtils;
+import org.apache.pulsar.metadata.bookkeeper.PulsarLedgerManagerFactory;
+import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.Awaitility;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test the AuditorPeer.
+ */
+public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
+
+ public AutoRecoveryMainTest() throws Exception {
+ super(3);
+
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver");
+
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver");
+ }
+
+ @BeforeMethod
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ /**
+ * Test that, if an autorecovery looses its ZK connection/session it will
+ * shutdown.
+ */
+ @Test
+ public void testAutoRecoverySessionLoss() throws Exception {
+ confByIndex(0).setMetadataServiceUri(
+ zkUtil.getMetadataServiceUri().replaceAll("zk://",
"metadata-store:").replaceAll("/ledgers", ""));
+ confByIndex(1).setMetadataServiceUri(
+ zkUtil.getMetadataServiceUri().replaceAll("zk://",
"metadata-store:").replaceAll("/ledgers", ""));
+ confByIndex(2).setMetadataServiceUri(
+ zkUtil.getMetadataServiceUri().replaceAll("zk://",
"metadata-store:").replaceAll("/ledgers", ""));
+ /*
+ * initialize three AutoRecovery instances.
+ */
+ AutoRecoveryMain main1 = new AutoRecoveryMain(confByIndex(0));
+ AutoRecoveryMain main2 = new AutoRecoveryMain(confByIndex(1));
+ AutoRecoveryMain main3 = new AutoRecoveryMain(confByIndex(2));
+
+ /*
+ * start main1, make sure all the components are started and main1 is
+ * the current Auditor
+ */
+ PulsarMetadataClientDriver pulsarMetadataClientDriver1 =
startAutoRecoveryMain(main1);
+ ZooKeeper zk1 = getZk(pulsarMetadataClientDriver1);
+
+ // Wait until auditor gets elected
+ for (int i = 0; i < 10; i++) {
+ try {
+ if (main1.auditorElector.getCurrentAuditor() != null) {
+ break;
+ } else {
+ Thread.sleep(1000);
+ }
+ } catch (IOException e) {
+ Thread.sleep(1000);
+ }
+ }
+ BookieId currentAuditor = main1.auditorElector.getCurrentAuditor();
+ assertNotNull(currentAuditor);
+ Auditor auditor1 = main1.auditorElector.getAuditor();
+ assertEquals("Current Auditor should be AR1", currentAuditor,
BookieImpl.getBookieId(confByIndex(0)));
+ Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertNotNull(auditor1);
+ assertTrue("Auditor of AR1 should be running",
auditor1.isRunning());
+ });
+
+
+ /*
+ * start main2 and main3
+ */
+ PulsarMetadataClientDriver pulsarMetadataClientDriver2 =
startAutoRecoveryMain(main2);
+ ZooKeeper zk2 = getZk(pulsarMetadataClientDriver2);
+
+ PulsarMetadataClientDriver pulsarMetadataClientDriver3 =
startAutoRecoveryMain(main3);
+ ZooKeeper zk3 = getZk(pulsarMetadataClientDriver3);
+
+
+ /*
+ * make sure AR1 is still the current Auditor and AR2's and AR3's
+ * auditors are not running.
+ */
+ assertEquals("Current Auditor should still be AR1", currentAuditor,
BookieImpl.getBookieId(confByIndex(0)));
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue("AR2's Auditor should not be running",
(main2.auditorElector.getAuditor() == null
+ || !main2.auditorElector.getAuditor().isRunning()));
+ assertTrue("AR3's Auditor should not be running",
(main3.auditorElector.getAuditor() == null
+ || !main3.auditorElector.getAuditor().isRunning()));
+ });
+
+
+ /*
+ * expire zk2 and zk1 sessions.
+ */
+ zkUtil.expireSession(zk2);
+ zkUtil.expireSession(zk1);
+
+ /*
+ * wait for some time for all the components of AR1 and AR2 are
+ * shutdown.
+ */
+ for (int i = 0; i < 10; i++) {
+ if (!main1.auditorElector.isRunning() &&
!main1.replicationWorker.isRunning()
+ && !main1.isAutoRecoveryRunning() &&
!main2.auditorElector.isRunning()
+ && !main2.replicationWorker.isRunning() &&
!main2.isAutoRecoveryRunning()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ /*
+ * the AR3 should be current auditor.
+ */
+ currentAuditor = main3.auditorElector.getCurrentAuditor();
+ assertEquals("Current Auditor should be AR3", currentAuditor,
BookieImpl.getBookieId(confByIndex(2)));
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(main3.auditorElector.getAuditor());
+ assertTrue("Auditor of AR3 should be running",
main3.auditorElector.getAuditor().isRunning());
+ });
+
+ Awaitility.waitAtMost(100, TimeUnit.SECONDS).untilAsserted(() -> {
+ /*
+ * since AR3 is current auditor, AR1's auditor should not be
running
+ * anymore.
+ */
+ assertFalse("AR1's auditor should not be running",
auditor1.isRunning());
+
+ /*
+ * components of AR2 and AR3 should not be running since zk1 and
zk2
+ * sessions are expired.
+ */
+ assertFalse("Elector1 should have shutdown",
main1.auditorElector.isRunning());
+ assertFalse("RW1 should have shutdown",
main1.replicationWorker.isRunning());
+ assertFalse("AR1 should have shutdown",
main1.isAutoRecoveryRunning());
+ assertFalse("Elector2 should have shutdown",
main2.auditorElector.isRunning());
+ assertFalse("RW2 should have shutdown",
main2.replicationWorker.isRunning());
+ assertFalse("AR2 should have shutdown",
main2.isAutoRecoveryRunning());
+ });
+
+ }
+
+ /*
+ * start autoRecoveryMain and make sure all its components are running and
+ * myVote node is existing
+ */
+ PulsarMetadataClientDriver startAutoRecoveryMain(AutoRecoveryMain
autoRecoveryMain) throws Exception {
+ autoRecoveryMain.start();
+ PulsarMetadataClientDriver pulsarMetadataClientDriver =
(PulsarMetadataClientDriver) autoRecoveryMain.bkc
+ .getMetadataClientDriver();
+ TestUtils.assertEventuallyTrue("autoRecoveryMain components should be
running",
+ () -> autoRecoveryMain.auditorElector.isRunning()
+ && autoRecoveryMain.replicationWorker.isRunning() &&
autoRecoveryMain.isAutoRecoveryRunning());
+ return pulsarMetadataClientDriver;
+ }
+
+ private ZooKeeper getZk(PulsarMetadataClientDriver
pulsarMetadataClientDriver) throws Exception {
+ PulsarLedgerManagerFactory pulsarLedgerManagerFactory =
+ (PulsarLedgerManagerFactory)
pulsarMetadataClientDriver.getLedgerManagerFactory();
+ Field field =
pulsarLedgerManagerFactory.getClass().getDeclaredField("store");
+ field.setAccessible(true);
+ ZKMetadataStore zkMetadataStore = (ZKMetadataStore)
field.get(pulsarLedgerManagerFactory);
+ return zkMetadataStore.getZkClient();
+ }
+}