This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 480cb3bbc22 [fix] [test] [branch-3.0] Fix AutoRecovery flaky test.
(#21418)
480cb3bbc22 is described below
commit 480cb3bbc22c57f52a969a935aaf468bf37af351
Author: Yan Zhao <[email protected]>
AuthorDate: Mon Oct 23 09:20:35 2023 -0500
[fix] [test] [branch-3.0] Fix AutoRecovery flaky test. (#21418)
---
.../metadata/bookkeeper/PulsarLedgerAuditorManager.java | 13 +++++++++++++
.../bookkeeper/replication/AuditorLedgerCheckerTest.java | 14 +++++++-------
.../bookkeeper/replication/AuditorPeriodicCheckTest.java | 8 ++++----
3 files changed, 24 insertions(+), 11 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
index 957f26ecd2e..44870ed47f0 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java
@@ -27,6 +27,7 @@ import
org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
@Slf4j
@@ -38,6 +39,7 @@ public class PulsarLedgerAuditorManager implements
LedgerAuditorManager {
private final LeaderElection<String> leaderElection;
private LeaderElectionState leaderElectionState;
private String bookieId;
+ private boolean sessionExpired = false;
PulsarLedgerAuditorManager(MetadataStoreExtended store, String
ledgersRoot) {
this.coordinationService = new CoordinationServiceImpl(store);
@@ -47,6 +49,14 @@ public class PulsarLedgerAuditorManager implements
LedgerAuditorManager {
this.leaderElection =
coordinationService.getLeaderElection(String.class,
electionPath, this::handleStateChanges);
this.leaderElectionState = LeaderElectionState.NoLeader;
+ store.registerSessionListener(event -> {
+ if (SessionEvent.SessionLost == event) {
+ synchronized (this) {
+ sessionExpired = true;
+ notifyAll();
+ }
+ }
+ });
}
private void handleStateChanges(LeaderElectionState state) {
@@ -71,6 +81,9 @@ public class PulsarLedgerAuditorManager implements
LedgerAuditorManager {
while (true) {
try {
synchronized (this) {
+ if (sessionExpired) {
+ throw new IllegalStateException("Zookeeper session
expired, give up to become auditor.");
+ }
if (leaderElectionState == LeaderElectionState.Leading) {
return;
} else {
diff --git
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index fcaadaa9bbe..ec5f77f7946 100644
---
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -23,10 +23,10 @@ import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNotSame;
import static org.testng.AssertJUnit.assertTrue;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import static org.testng.AssertJUnit.fail;
import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,19 +40,15 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import java.util.concurrent.TimeoutException;
import lombok.Cleanup;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
-import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -64,7 +60,11 @@ import
org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.UnderreplicatedLedger;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
diff --git
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 0fa6538b3ed..9c5805dc536 100644
---
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -72,8 +72,8 @@ import
org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeTest;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
@@ -96,7 +96,7 @@ public class AuditorPeriodicCheckTest extends
BookKeeperClusterTestCase {
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver");
}
- @BeforeTest
+ @BeforeMethod
@Override
public void setUp() throws Exception {
super.setUp();
@@ -127,7 +127,7 @@ public class AuditorPeriodicCheckTest extends
BookKeeperClusterTestCase {
driver.initialize(serverConfiguration, NullStatsLogger.INSTANCE);
}
- @AfterTest
+ @AfterMethod
@Override
public void tearDown() throws Exception {
if (null != driver) {