sijie closed pull request #1747: Set ConnectionExpired Listener to 
MetadataClientDriver in AR
URL: https://github.com/apache/bookkeeper/pull/1747
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
index b53836790c..7647a5bed8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
@@ -84,4 +84,23 @@ LedgerManagerFactory getLedgerManagerFactory()
     @Override
     void close();
 
+    /**
+     * State Listener on listening the metadata client session states.
+     */
+    @FunctionalInterface
+    interface SessionStateListener {
+
+        /**
+         * Signal when client session is expired.
+         */
+        void onSessionExpired();
+    }
+
+    /**
+     * sets session state listener.
+     *
+     * @param sessionStateListener
+     *            listener listening on metadata client session states.
+     */
+    void setSessionStateListener(SessionStateListener sessionStateListener);
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
index a5dcaa740c..99b942771d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
@@ -20,7 +20,9 @@
 
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.discover.ZKRegistrationClient;
@@ -29,6 +31,8 @@
 import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 
 /**
  * ZooKeeper based metadata client driver.
@@ -88,4 +92,14 @@ public synchronized void close() {
         }
         super.close();
     }
+
+    @Override
+    public void setSessionStateListener(SessionStateListener 
sessionStateListener) {
+        zk.register((event) -> {
+            // Check for expired connection.
+            if (event.getType().equals(EventType.None) && 
event.getState().equals(KeeperState.Expired)) {
+                sessionStateListener.onSessionExpired();
+            }
+        });
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
index cddb1f0f43..14c5c53856 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
@@ -179,6 +179,10 @@ private void createMyVote() throws KeeperException, 
InterruptedException {
         }
     }
 
+    String getMyVote() {
+        return myVote;
+    }
+
     private String getVotePath(String vote) {
         return electionPath + vote;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index cd4aee2ce0..c495443823 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -40,6 +40,7 @@
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
 import 
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.server.conf.BookieConfiguration;
@@ -91,6 +92,11 @@ public AutoRecoveryMain(ServerConfiguration conf, 
StatsLogger statsLogger)
             CompatibilityException {
         this.conf = conf;
         this.bkc = Auditor.createBookKeeperClient(conf);
+        MetadataClientDriver metadataClientDriver = 
bkc.getMetadataClientDriver();
+        metadataClientDriver.setSessionStateListener(() -> {
+            LOG.error("Client connection to the Metadata server has expired, 
so shutting down AutoRecoveryMain!");
+            shutdown(ExitCode.ZK_EXPIRED);
+        });
 
         auditorElector = new AuditorElector(
             Bookie.getBookieAddress(conf).toString(),
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
index 2d69cd2bec..593c52005a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
@@ -79,6 +79,10 @@ public LayoutManager getLayoutManager() {
         @Override
         public void close() {
         }
+
+        @Override
+        public void setSessionStateListener(SessionStateListener 
sessionStateListener) {
+        }
     }
 
     static class ClientDriver1 extends TestClientDriver {
@@ -88,6 +92,7 @@ public String getScheme() {
             return "driver1";
         }
 
+
     }
 
     static class ClientDriver2 extends TestClientDriver {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index 8bb8049ef0..8aa969f529 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -23,8 +23,13 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
 import org.junit.Test;
 
 /**
@@ -74,4 +79,130 @@ public void testShutdown() throws Exception {
                 main.replicationWorker.isRunning());
     }
 
+    /**
+     * Test that, if an autorecovery looses its ZK connection/session it will
+     * shutdown.
+     */
+    @Test
+    public void testAutoRecoverySessionLoss() throws Exception {
+        /*
+         * initialize three AutoRecovery instances.
+         */
+        AutoRecoveryMain main1 = new AutoRecoveryMain(bsConfs.get(0));
+        AutoRecoveryMain main2 = new AutoRecoveryMain(bsConfs.get(1));
+        AutoRecoveryMain main3 = new AutoRecoveryMain(bsConfs.get(2));
+
+        /*
+         * start main1, make sure all the components are started and main1 is
+         * the current Auditor
+         */
+        ZKMetadataClientDriver zkMetadataClientDriver1 = 
startAutoRecoveryMain(main1);
+        ZooKeeper zk1 = zkMetadataClientDriver1.getZk();
+        Auditor auditor1 = main1.auditorElector.getAuditor();
+        BookieSocketAddress currentAuditor = 
AuditorElector.getCurrentAuditor(bsConfs.get(0), zk1);
+        assertTrue("Current Auditor should be AR1", 
currentAuditor.equals(Bookie.getBookieAddress(bsConfs.get(0))));
+        assertTrue("Auditor of AR1 should be running", auditor1.isRunning());
+
+        /*
+         * start main2 and main3
+         */
+        ZKMetadataClientDriver zkMetadataClientDriver2 = 
startAutoRecoveryMain(main2);
+        ZooKeeper zk2 = zkMetadataClientDriver2.getZk();
+        ZKMetadataClientDriver zkMetadataClientDriver3 = 
startAutoRecoveryMain(main3);
+        ZooKeeper zk3 = zkMetadataClientDriver3.getZk();
+
+        /*
+         * make sure AR1 is still the current Auditor and AR2's and AR3's
+         * auditors are not running.
+         */
+        assertTrue("Current Auditor should still be AR1",
+                
currentAuditor.equals(Bookie.getBookieAddress(bsConfs.get(0))));
+        Auditor auditor2 = main2.auditorElector.getAuditor();
+        Auditor auditor3 = main3.auditorElector.getAuditor();
+        assertTrue("AR2's Auditor should not be running", (auditor2 == null || 
!auditor2.isRunning()));
+        assertTrue("AR3's Auditor should not be running", (auditor3 == null || 
!auditor3.isRunning()));
+
+        /*
+         * expire zk2 and zk1 sessions.
+         */
+        zkUtil.expireSession(zk2);
+        zkUtil.expireSession(zk1);
+
+        /*
+         * wait for some time for all the components of AR1 and AR2 are
+         * shutdown.
+         */
+        for (int i = 0; i < 10; i++) {
+            if (!main1.auditorElector.isRunning() && 
!main1.replicationWorker.isRunning()
+                    && !main1.isAutoRecoveryRunning() && 
!main2.auditorElector.isRunning()
+                    && !main2.replicationWorker.isRunning() && 
!main2.isAutoRecoveryRunning()) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        /*
+         * since zk1 and zk2 sessions are expired, the 'myVote' ephemeral nodes
+         * of AR1 and AR2 should not be existing anymore.
+         */
+        assertTrue("AR1's vote node should not be existing",
+                zk3.exists(main1.auditorElector.getMyVote(), false) == null);
+        assertTrue("AR2's vote node should not be existing",
+                zk3.exists(main2.auditorElector.getMyVote(), false) == null);
+
+        /*
+         * the AR3 should be current auditor.
+         */
+        currentAuditor = AuditorElector.getCurrentAuditor(bsConfs.get(2), zk3);
+        assertTrue("Current Auditor should be AR3", 
currentAuditor.equals(Bookie.getBookieAddress(bsConfs.get(2))));
+        auditor3 = main3.auditorElector.getAuditor();
+        assertTrue("Auditor of AR3 should be running", auditor3.isRunning());
+
+        /*
+         * since AR3 is current auditor, AR1's auditor should not be running
+         * anymore.
+         */
+        assertFalse("AR1's auditor should not be running", 
auditor1.isRunning());
+
+        /*
+         * components of AR2 and AR3 should not be running since zk1 and zk2
+         * sessions are expired.
+         */
+        assertFalse("Elector1 should have shutdown", 
main1.auditorElector.isRunning());
+        assertFalse("RW1 should have shutdown", 
main1.replicationWorker.isRunning());
+        assertFalse("AR1 should have shutdown", main1.isAutoRecoveryRunning());
+        assertFalse("Elector2 should have shutdown", 
main2.auditorElector.isRunning());
+        assertFalse("RW2 should have shutdown", 
main2.replicationWorker.isRunning());
+        assertFalse("AR2 should have shutdown", main2.isAutoRecoveryRunning());
+    }
+
+    /*
+     * start autoRecoveryMain and make sure all its components are running and
+     * myVote node is existing
+     */
+    ZKMetadataClientDriver startAutoRecoveryMain(AutoRecoveryMain 
autoRecoveryMain)
+            throws InterruptedException, KeeperException, UnavailableException 
{
+        autoRecoveryMain.start();
+        ZKMetadataClientDriver metadataClientDriver = (ZKMetadataClientDriver) 
autoRecoveryMain.bkc
+                .getMetadataClientDriver();
+        ZooKeeper zk = metadataClientDriver.getZk();
+        String myVote;
+        for (int i = 0; i < 10; i++) {
+            if (autoRecoveryMain.auditorElector.isRunning() && 
autoRecoveryMain.replicationWorker.isRunning()
+                    && autoRecoveryMain.isAutoRecoveryRunning()) {
+                myVote = autoRecoveryMain.auditorElector.getMyVote();
+                if (myVote != null) {
+                    if (null != zk.exists(myVote, false)) {
+                        break;
+                    }
+                }
+            }
+            Thread.sleep(100);
+        }
+        assertTrue("autoRecoveryMain components should be running", 
autoRecoveryMain.auditorElector.isRunning()
+                && autoRecoveryMain.replicationWorker.isRunning() && 
autoRecoveryMain.isAutoRecoveryRunning());
+        assertTrue("autoRecoveryMain's vote node should be existing",
+                zk.exists(autoRecoveryMain.auditorElector.getMyVote(), false) 
!= null);
+        return metadataClientDriver;
+    }
 }
diff --git 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataClientDriver.java
 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataClientDriver.java
index 8fd7b973b9..07d64186aa 100644
--- 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataClientDriver.java
+++ 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataClientDriver.java
@@ -72,4 +72,12 @@ public synchronized void close() {
         }
         super.close();
     }
+
+    @Override
+    public void setSessionStateListener(SessionStateListener 
sessionStateListener) {
+        /*
+         * TODO: EtcdMetadataClientDriver has to implement this method.
+         */
+        throw new UnsupportedOperationException();
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to