This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eefc51712c9 [fix] [auto-recovery] Fix pulsar ledger auditor dead lock 
problem. (#21181)
eefc51712c9 is described below

commit eefc51712c97515e5913f6e5fceb4e98f5cd3b18
Author: Yan Zhao <[email protected]>
AuthorDate: Tue Sep 19 11:47:44 2023 +0800

    [fix] [auto-recovery] Fix pulsar ledger auditor dead lock problem. (#21181)
---
 .../bookkeeper/PulsarLedgerAuditorManager.java     |  13 ++
 .../replication/AuditorPeriodicCheckTest.java      |   8 +-
 .../replication/AutoRecoveryMainTest.java          | 201 +++++++++++++++++++++
 3 files changed, 218 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
index bc35380fec1..d664ecdcd20 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
@@ -27,6 +27,7 @@ import 
org.apache.pulsar.metadata.api.coordination.CoordinationService;
 import org.apache.pulsar.metadata.api.coordination.LeaderElection;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
 
 @Slf4j
@@ -38,6 +39,7 @@ class PulsarLedgerAuditorManager implements 
LedgerAuditorManager {
     private final LeaderElection<String> leaderElection;
     private LeaderElectionState leaderElectionState;
     private String bookieId;
+    private boolean sessionExpired = false;
 
     PulsarLedgerAuditorManager(MetadataStoreExtended store, String 
ledgersRoot) {
         this.coordinationService = new CoordinationServiceImpl(store);
@@ -47,6 +49,14 @@ class PulsarLedgerAuditorManager implements 
LedgerAuditorManager {
         this.leaderElection =
                 coordinationService.getLeaderElection(String.class, 
electionPath, this::handleStateChanges);
         this.leaderElectionState = LeaderElectionState.NoLeader;
+        store.registerSessionListener(event -> {
+            if (SessionEvent.SessionLost == event) {
+                synchronized (this) {
+                    sessionExpired = true;
+                    notifyAll();
+                }
+            }
+        });
     }
 
     private void handleStateChanges(LeaderElectionState state) {
@@ -71,6 +81,9 @@ class PulsarLedgerAuditorManager implements 
LedgerAuditorManager {
         while (true) {
             try {
                 synchronized (this) {
+                    if (sessionExpired) {
+                        throw new IllegalStateException("Zookeeper session 
expired, give up to become auditor.");
+                    }
                     if (leaderElectionState == LeaderElectionState.Leading) {
                         return;
                     } else {
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index c761d46c622..901361dd3a2 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -44,8 +44,8 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeTest;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
@@ -68,7 +68,7 @@ public class AuditorPeriodicCheckTest extends 
BookKeeperClusterTestCase {
         
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver");
     }
 
-    @BeforeTest
+    @BeforeMethod
     @Override
     public void setUp() throws Exception {
         super.setUp();
@@ -99,7 +99,7 @@ public class AuditorPeriodicCheckTest extends 
BookKeeperClusterTestCase {
         driver.initialize(serverConfiguration, NullStatsLogger.INSTANCE);
     }
 
-    @AfterTest
+    @AfterMethod
     @Override
     public void tearDown() throws Exception {
         if (null != driver) {
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
new file mode 100644
index 00000000000..d12ee177ece
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.TestUtils;
+import org.apache.pulsar.metadata.bookkeeper.PulsarLedgerManagerFactory;
+import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.Awaitility;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test the AuditorPeer.
+ */
+public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
+
+    public AutoRecoveryMainTest() throws Exception {
+        super(3);
+        
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver");
+        
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver");
+    }
+
+    @BeforeMethod
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+    }
+
+    /**
+     * Test that, if an autorecovery looses its ZK connection/session it will
+     * shutdown.
+     */
+    @Test
+    public void testAutoRecoverySessionLoss() throws Exception {
+        confByIndex(0).setMetadataServiceUri(
+                zkUtil.getMetadataServiceUri().replaceAll("zk://", 
"metadata-store:").replaceAll("/ledgers", ""));
+        confByIndex(1).setMetadataServiceUri(
+                zkUtil.getMetadataServiceUri().replaceAll("zk://", 
"metadata-store:").replaceAll("/ledgers", ""));
+        confByIndex(2).setMetadataServiceUri(
+                zkUtil.getMetadataServiceUri().replaceAll("zk://", 
"metadata-store:").replaceAll("/ledgers", ""));
+        /*
+         * initialize three AutoRecovery instances.
+         */
+        AutoRecoveryMain main1 = new AutoRecoveryMain(confByIndex(0));
+        AutoRecoveryMain main2 = new AutoRecoveryMain(confByIndex(1));
+        AutoRecoveryMain main3 = new AutoRecoveryMain(confByIndex(2));
+
+        /*
+         * start main1, make sure all the components are started and main1 is
+         * the current Auditor
+         */
+        PulsarMetadataClientDriver pulsarMetadataClientDriver1 = 
startAutoRecoveryMain(main1);
+        ZooKeeper zk1 = getZk(pulsarMetadataClientDriver1);
+
+        // Wait until auditor gets elected
+        for (int i = 0; i < 10; i++) {
+            try {
+                if (main1.auditorElector.getCurrentAuditor() != null) {
+                    break;
+                } else {
+                    Thread.sleep(1000);
+                }
+            } catch (IOException e) {
+                Thread.sleep(1000);
+            }
+        }
+        BookieId currentAuditor = main1.auditorElector.getCurrentAuditor();
+        assertNotNull(currentAuditor);
+        Auditor auditor1 = main1.auditorElector.getAuditor();
+        assertEquals("Current Auditor should be AR1", currentAuditor, 
BookieImpl.getBookieId(confByIndex(0)));
+        Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertNotNull(auditor1);
+            assertTrue("Auditor of AR1 should be running", 
auditor1.isRunning());
+        });
+
+
+        /*
+         * start main2 and main3
+         */
+        PulsarMetadataClientDriver pulsarMetadataClientDriver2 = 
startAutoRecoveryMain(main2);
+        ZooKeeper zk2 = getZk(pulsarMetadataClientDriver2);
+
+        PulsarMetadataClientDriver pulsarMetadataClientDriver3 = 
startAutoRecoveryMain(main3);
+        ZooKeeper zk3 = getZk(pulsarMetadataClientDriver3);
+
+
+        /*
+         * make sure AR1 is still the current Auditor and AR2's and AR3's
+         * auditors are not running.
+         */
+        assertEquals("Current Auditor should still be AR1", currentAuditor, 
BookieImpl.getBookieId(confByIndex(0)));
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue("AR2's Auditor should not be running", 
(main2.auditorElector.getAuditor() == null
+                    || !main2.auditorElector.getAuditor().isRunning()));
+            assertTrue("AR3's Auditor should not be running", 
(main3.auditorElector.getAuditor() == null
+                    || !main3.auditorElector.getAuditor().isRunning()));
+        });
+
+
+        /*
+         * expire zk2 and zk1 sessions.
+         */
+        zkUtil.expireSession(zk2);
+        zkUtil.expireSession(zk1);
+
+        /*
+         * wait for some time for all the components of AR1 and AR2 are
+         * shutdown.
+         */
+        for (int i = 0; i < 10; i++) {
+            if (!main1.auditorElector.isRunning() && 
!main1.replicationWorker.isRunning()
+                    && !main1.isAutoRecoveryRunning() && 
!main2.auditorElector.isRunning()
+                    && !main2.replicationWorker.isRunning() && 
!main2.isAutoRecoveryRunning()) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        /*
+         * the AR3 should be current auditor.
+         */
+        currentAuditor = main3.auditorElector.getCurrentAuditor();
+        assertEquals("Current Auditor should be AR3", currentAuditor, 
BookieImpl.getBookieId(confByIndex(2)));
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(main3.auditorElector.getAuditor());
+            assertTrue("Auditor of AR3 should be running", 
main3.auditorElector.getAuditor().isRunning());
+        });
+
+        Awaitility.waitAtMost(100, TimeUnit.SECONDS).untilAsserted(() -> {
+            /*
+             * since AR3 is current auditor, AR1's auditor should not be 
running
+             * anymore.
+             */
+            assertFalse("AR1's auditor should not be running", 
auditor1.isRunning());
+
+            /*
+             * components of AR2 and AR3 should not be running since zk1 and 
zk2
+             * sessions are expired.
+             */
+            assertFalse("Elector1 should have shutdown", 
main1.auditorElector.isRunning());
+            assertFalse("RW1 should have shutdown", 
main1.replicationWorker.isRunning());
+            assertFalse("AR1 should have shutdown", 
main1.isAutoRecoveryRunning());
+            assertFalse("Elector2 should have shutdown", 
main2.auditorElector.isRunning());
+            assertFalse("RW2 should have shutdown", 
main2.replicationWorker.isRunning());
+            assertFalse("AR2 should have shutdown", 
main2.isAutoRecoveryRunning());
+        });
+
+    }
+
+    /*
+     * start autoRecoveryMain and make sure all its components are running and
+     * myVote node is existing
+     */
+    PulsarMetadataClientDriver startAutoRecoveryMain(AutoRecoveryMain 
autoRecoveryMain) throws Exception {
+        autoRecoveryMain.start();
+        PulsarMetadataClientDriver pulsarMetadataClientDriver = 
(PulsarMetadataClientDriver) autoRecoveryMain.bkc
+                .getMetadataClientDriver();
+        TestUtils.assertEventuallyTrue("autoRecoveryMain components should be 
running",
+                () -> autoRecoveryMain.auditorElector.isRunning()
+                        && autoRecoveryMain.replicationWorker.isRunning() && 
autoRecoveryMain.isAutoRecoveryRunning());
+        return pulsarMetadataClientDriver;
+    }
+
+    private ZooKeeper getZk(PulsarMetadataClientDriver 
pulsarMetadataClientDriver) throws Exception {
+        PulsarLedgerManagerFactory pulsarLedgerManagerFactory =
+                (PulsarLedgerManagerFactory) 
pulsarMetadataClientDriver.getLedgerManagerFactory();
+        Field field = 
pulsarLedgerManagerFactory.getClass().getDeclaredField("store");
+        field.setAccessible(true);
+        ZKMetadataStore zkMetadataStore = (ZKMetadataStore) 
field.get(pulsarLedgerManagerFactory);
+        return zkMetadataStore.getZkClient();
+    }
+}

Reply via email to