This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 480cb3bbc22 [fix] [test] [branch-3.0] Fix AutoRecovery flaky test.  
(#21418)
480cb3bbc22 is described below

commit 480cb3bbc22c57f52a969a935aaf468bf37af351
Author: Yan Zhao <[email protected]>
AuthorDate: Mon Oct 23 09:20:35 2023 -0500

    [fix] [test] [branch-3.0] Fix AutoRecovery flaky test.  (#21418)
---
 .../metadata/bookkeeper/PulsarLedgerAuditorManager.java    | 13 +++++++++++++
 .../bookkeeper/replication/AuditorLedgerCheckerTest.java   | 14 +++++++-------
 .../bookkeeper/replication/AuditorPeriodicCheckTest.java   |  8 ++++----
 3 files changed, 24 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
index 957f26ecd2e..44870ed47f0 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
@@ -27,6 +27,7 @@ import 
org.apache.pulsar.metadata.api.coordination.CoordinationService;
 import org.apache.pulsar.metadata.api.coordination.LeaderElection;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
 
 @Slf4j
@@ -38,6 +39,7 @@ public class PulsarLedgerAuditorManager implements 
LedgerAuditorManager {
     private final LeaderElection<String> leaderElection;
     private LeaderElectionState leaderElectionState;
     private String bookieId;
+    private boolean sessionExpired = false;
 
     PulsarLedgerAuditorManager(MetadataStoreExtended store, String 
ledgersRoot) {
         this.coordinationService = new CoordinationServiceImpl(store);
@@ -47,6 +49,14 @@ public class PulsarLedgerAuditorManager implements 
LedgerAuditorManager {
         this.leaderElection =
                 coordinationService.getLeaderElection(String.class, 
electionPath, this::handleStateChanges);
         this.leaderElectionState = LeaderElectionState.NoLeader;
+        store.registerSessionListener(event -> {
+            if (SessionEvent.SessionLost == event) {
+                synchronized (this) {
+                    sessionExpired = true;
+                    notifyAll();
+                }
+            }
+        });
     }
 
     private void handleStateChanges(LeaderElectionState state) {
@@ -71,6 +81,9 @@ public class PulsarLedgerAuditorManager implements 
LedgerAuditorManager {
         while (true) {
             try {
                 synchronized (this) {
+                    if (sessionExpired) {
+                        throw new IllegalStateException("Zookeeper session 
expired, give up to become auditor.");
+                    }
                     if (leaderElectionState == LeaderElectionState.Leading) {
                         return;
                     } else {
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index fcaadaa9bbe..ec5f77f7946 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -23,10 +23,10 @@ import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertNotSame;
 import static org.testng.AssertJUnit.assertTrue;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import static org.testng.AssertJUnit.fail;
 import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,19 +40,15 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
 import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
-import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.client.LedgerMetadataBuilder;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -64,7 +60,11 @@ import 
org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.MetadataClientDriver;
 import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.meta.UnderreplicatedLedger;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
 import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 0fa6538b3ed..9c5805dc536 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -72,8 +72,8 @@ import 
org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeTest;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
@@ -96,7 +96,7 @@ public class AuditorPeriodicCheckTest extends 
BookKeeperClusterTestCase {
         
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver");
     }
 
-    @BeforeTest
+    @BeforeMethod
     @Override
     public void setUp() throws Exception {
         super.setUp();
@@ -127,7 +127,7 @@ public class AuditorPeriodicCheckTest extends 
BookKeeperClusterTestCase {
         driver.initialize(serverConfiguration, NullStatsLogger.INSTANCE);
     }
 
-    @AfterTest
+    @AfterMethod
     @Override
     public void tearDown() throws Exception {
         if (null != driver) {

Reply via email to