This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 345ee95 Set ConnectionExpired Listener to MetadataClientDriver in AR
345ee95 is described below
commit 345ee95ff5c1431c382ecd4ba8cae3bfed457d6d
Author: Charan Reddy Guttapalem <[email protected]>
AuthorDate: Wed Oct 17 03:56:41 2018 -0700
Set ConnectionExpired Listener to MetadataClientDriver in AR
Descriptions of the changes in this PR:
- add setConnectionExpiredListener method to MetadataClientDriver interface.
- add listener to shutdown AR in the case of metadata connection expiry
### Motivation
This commit - 4f0d2a195bd9be3788876b47813cee1440cf005c, has removed the
shutdown logic in AutoRecoveryMain incase of ZK client session expiry, with the
following reason "which doesn't make any sense for current retryable
zookeeper". But if the ZK session has expired then it is not completely correct
to let AutoRecoveryMain to continue to run in that state.
Author: Sijie Guo <[email protected]>
Author: Andrey Yegorov <[email protected]>
Author: cguttapalem <[email protected]>
Author: Charan Reddy Guttapalem <[email protected]>
Author: Samuel Just <[email protected]>
Reviewers: Sijie Guo <[email protected]>, Enrico Olivelli
<[email protected]>
This closes #1747 from reddycharan/conexpirylisten
---
.../bookkeeper/meta/MetadataClientDriver.java | 19 +++
.../bookkeeper/meta/zk/ZKMetadataClientDriver.java | 14 +++
.../bookkeeper/replication/AuditorElector.java | 4 +
.../bookkeeper/replication/AutoRecoveryMain.java | 6 +
.../bookkeeper/meta/MetadataDriversTest.java | 5 +
.../replication/AutoRecoveryMainTest.java | 133 ++++++++++++++++++++-
.../metadata/etcd/EtcdMetadataClientDriver.java | 8 ++
7 files changed, 188 insertions(+), 1 deletion(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
index b538367..7647a5b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
@@ -84,4 +84,23 @@ public interface MetadataClientDriver extends AutoCloseable {
@Override
void close();
+ /**
+ * State Listener on listening the metadata client session states.
+ */
+ @FunctionalInterface
+ interface SessionStateListener {
+
+ /**
+ * Signal when client session is expired.
+ */
+ void onSessionExpired();
+ }
+
+ /**
+ * sets session state listener.
+ *
+ * @param sessionStateListener
+ * listener listening on metadata client session states.
+ */
+ void setSessionStateListener(SessionStateListener sessionStateListener);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
index a5dcaa7..99b9427 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
@@ -20,7 +20,9 @@ package org.apache.bookkeeper.meta.zk;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.discover.ZKRegistrationClient;
@@ -29,6 +31,8 @@ import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
/**
* ZooKeeper based metadata client driver.
@@ -88,4 +92,14 @@ public class ZKMetadataClientDriver
}
super.close();
}
+
+ @Override
+ public void setSessionStateListener(SessionStateListener
sessionStateListener) {
+ zk.register((event) -> {
+ // Check for expired connection.
+ if (event.getType().equals(EventType.None) &&
event.getState().equals(KeeperState.Expired)) {
+ sessionStateListener.onSessionExpired();
+ }
+ });
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
index cddb1f0..14c5c53 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
@@ -179,6 +179,10 @@ public class AuditorElector {
}
}
+ String getMyVote() {
+ return myVote;
+ }
+
private String getVotePath(String vote) {
return electionPath + vote;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index cd4aee2..c495443 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -40,6 +40,7 @@ import
org.apache.bookkeeper.common.component.ComponentStarter;
import org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.bookkeeper.common.component.LifecycleComponentStack;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
import
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
@@ -91,6 +92,11 @@ public class AutoRecoveryMain {
CompatibilityException {
this.conf = conf;
this.bkc = Auditor.createBookKeeperClient(conf);
+ MetadataClientDriver metadataClientDriver =
bkc.getMetadataClientDriver();
+ metadataClientDriver.setSessionStateListener(() -> {
+ LOG.error("Client connection to the Metadata server has expired,
so shutting down AutoRecoveryMain!");
+ shutdown(ExitCode.ZK_EXPIRED);
+ });
auditorElector = new AuditorElector(
Bookie.getBookieAddress(conf).toString(),
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
index 2d69cd2..593c520 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
@@ -79,6 +79,10 @@ public class MetadataDriversTest {
@Override
public void close() {
}
+
+ @Override
+ public void setSessionStateListener(SessionStateListener
sessionStateListener) {
+ }
}
static class ClientDriver1 extends TestClientDriver {
@@ -88,6 +92,7 @@ public class MetadataDriversTest {
return "driver1";
}
+
}
static class ClientDriver2 extends TestClientDriver {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index 8bb8049..8aa969f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -23,8 +23,13 @@ package org.apache.bookkeeper.replication;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
/**
@@ -74,4 +79,130 @@ public class AutoRecoveryMainTest extends
BookKeeperClusterTestCase {
main.replicationWorker.isRunning());
}
+ /**
+ * Test that, if an autorecovery looses its ZK connection/session it will
+ * shutdown.
+ */
+ @Test
+ public void testAutoRecoverySessionLoss() throws Exception {
+ /*
+ * initialize three AutoRecovery instances.
+ */
+ AutoRecoveryMain main1 = new AutoRecoveryMain(bsConfs.get(0));
+ AutoRecoveryMain main2 = new AutoRecoveryMain(bsConfs.get(1));
+ AutoRecoveryMain main3 = new AutoRecoveryMain(bsConfs.get(2));
+
+ /*
+ * start main1, make sure all the components are started and main1 is
+ * the current Auditor
+ */
+ ZKMetadataClientDriver zkMetadataClientDriver1 =
startAutoRecoveryMain(main1);
+ ZooKeeper zk1 = zkMetadataClientDriver1.getZk();
+ Auditor auditor1 = main1.auditorElector.getAuditor();
+ BookieSocketAddress currentAuditor =
AuditorElector.getCurrentAuditor(bsConfs.get(0), zk1);
+ assertTrue("Current Auditor should be AR1",
currentAuditor.equals(Bookie.getBookieAddress(bsConfs.get(0))));
+ assertTrue("Auditor of AR1 should be running", auditor1.isRunning());
+
+ /*
+ * start main2 and main3
+ */
+ ZKMetadataClientDriver zkMetadataClientDriver2 =
startAutoRecoveryMain(main2);
+ ZooKeeper zk2 = zkMetadataClientDriver2.getZk();
+ ZKMetadataClientDriver zkMetadataClientDriver3 =
startAutoRecoveryMain(main3);
+ ZooKeeper zk3 = zkMetadataClientDriver3.getZk();
+
+ /*
+ * make sure AR1 is still the current Auditor and AR2's and AR3's
+ * auditors are not running.
+ */
+ assertTrue("Current Auditor should still be AR1",
+
currentAuditor.equals(Bookie.getBookieAddress(bsConfs.get(0))));
+ Auditor auditor2 = main2.auditorElector.getAuditor();
+ Auditor auditor3 = main3.auditorElector.getAuditor();
+ assertTrue("AR2's Auditor should not be running", (auditor2 == null ||
!auditor2.isRunning()));
+ assertTrue("AR3's Auditor should not be running", (auditor3 == null ||
!auditor3.isRunning()));
+
+ /*
+ * expire zk2 and zk1 sessions.
+ */
+ zkUtil.expireSession(zk2);
+ zkUtil.expireSession(zk1);
+
+ /*
+ * wait for some time for all the components of AR1 and AR2 are
+ * shutdown.
+ */
+ for (int i = 0; i < 10; i++) {
+ if (!main1.auditorElector.isRunning() &&
!main1.replicationWorker.isRunning()
+ && !main1.isAutoRecoveryRunning() &&
!main2.auditorElector.isRunning()
+ && !main2.replicationWorker.isRunning() &&
!main2.isAutoRecoveryRunning()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ /*
+ * since zk1 and zk2 sessions are expired, the 'myVote' ephemeral nodes
+ * of AR1 and AR2 should not be existing anymore.
+ */
+ assertTrue("AR1's vote node should not be existing",
+ zk3.exists(main1.auditorElector.getMyVote(), false) == null);
+ assertTrue("AR2's vote node should not be existing",
+ zk3.exists(main2.auditorElector.getMyVote(), false) == null);
+
+ /*
+ * the AR3 should be current auditor.
+ */
+ currentAuditor = AuditorElector.getCurrentAuditor(bsConfs.get(2), zk3);
+ assertTrue("Current Auditor should be AR3",
currentAuditor.equals(Bookie.getBookieAddress(bsConfs.get(2))));
+ auditor3 = main3.auditorElector.getAuditor();
+ assertTrue("Auditor of AR3 should be running", auditor3.isRunning());
+
+ /*
+ * since AR3 is current auditor, AR1's auditor should not be running
+ * anymore.
+ */
+ assertFalse("AR1's auditor should not be running",
auditor1.isRunning());
+
+ /*
+ * components of AR2 and AR3 should not be running since zk1 and zk2
+ * sessions are expired.
+ */
+ assertFalse("Elector1 should have shutdown",
main1.auditorElector.isRunning());
+ assertFalse("RW1 should have shutdown",
main1.replicationWorker.isRunning());
+ assertFalse("AR1 should have shutdown", main1.isAutoRecoveryRunning());
+ assertFalse("Elector2 should have shutdown",
main2.auditorElector.isRunning());
+ assertFalse("RW2 should have shutdown",
main2.replicationWorker.isRunning());
+ assertFalse("AR2 should have shutdown", main2.isAutoRecoveryRunning());
+ }
+
+ /*
+ * start autoRecoveryMain and make sure all its components are running and
+ * myVote node is existing
+ */
+ ZKMetadataClientDriver startAutoRecoveryMain(AutoRecoveryMain
autoRecoveryMain)
+ throws InterruptedException, KeeperException, UnavailableException
{
+ autoRecoveryMain.start();
+ ZKMetadataClientDriver metadataClientDriver = (ZKMetadataClientDriver)
autoRecoveryMain.bkc
+ .getMetadataClientDriver();
+ ZooKeeper zk = metadataClientDriver.getZk();
+ String myVote;
+ for (int i = 0; i < 10; i++) {
+ if (autoRecoveryMain.auditorElector.isRunning() &&
autoRecoveryMain.replicationWorker.isRunning()
+ && autoRecoveryMain.isAutoRecoveryRunning()) {
+ myVote = autoRecoveryMain.auditorElector.getMyVote();
+ if (myVote != null) {
+ if (null != zk.exists(myVote, false)) {
+ break;
+ }
+ }
+ }
+ Thread.sleep(100);
+ }
+ assertTrue("autoRecoveryMain components should be running",
autoRecoveryMain.auditorElector.isRunning()
+ && autoRecoveryMain.replicationWorker.isRunning() &&
autoRecoveryMain.isAutoRecoveryRunning());
+ assertTrue("autoRecoveryMain's vote node should be existing",
+ zk.exists(autoRecoveryMain.auditorElector.getMyVote(), false)
!= null);
+ return metadataClientDriver;
+ }
}
diff --git
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataClientDriver.java
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataClientDriver.java
index 8fd7b97..07d6418 100644
---
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataClientDriver.java
+++
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataClientDriver.java
@@ -72,4 +72,12 @@ public class EtcdMetadataClientDriver extends
EtcdMetadataDriverBase implements
}
super.close();
}
+
+ @Override
+ public void setSessionStateListener(SessionStateListener
sessionStateListener) {
+ /*
+ * TODO: EtcdMetadataClientDriver has to implement this method.
+ */
+ throw new UnsupportedOperationException();
+ }
}