sijie closed pull request #1693: autorecovery-use-metadata-driver (part 1) : 
move AutoRecoveryMain to use MetadataBookieDriver
URL: https://github.com/apache/bookkeeper/pull/1693
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index 9830c59290..3369de8b3d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.replication;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;
 
@@ -28,8 +29,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
-import java.util.HashSet;
-import java.util.Set;
+import java.net.URI;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieCriticalThread;
@@ -37,13 +37,15 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.http.HttpServer;
 import org.apache.bookkeeper.http.HttpServerLoader;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
 import 
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.server.http.BKHttpServiceProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -51,8 +53,6 @@
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,12 +66,12 @@
     private static final Logger LOG = LoggerFactory
             .getLogger(AutoRecoveryMain.class);
 
-    private ServerConfiguration conf;
-    ZooKeeper zk;
-    AuditorElector auditorElector;
-    ReplicationWorker replicationWorker;
-    private AutoRecoveryDeathWatcher deathWatcher;
-    private int exitCode;
+    private final ServerConfiguration conf;
+    final MetadataBookieDriver metadataBookieDriver;
+    final AuditorElector auditorElector;
+    final ReplicationWorker replicationWorker;
+    final AutoRecoveryDeathWatcher deathWatcher;
+    int exitCode;
     private volatile boolean shuttingDown = false;
     private volatile boolean running = false;
 
@@ -85,40 +85,36 @@ public AutoRecoveryMain(ServerConfiguration conf, 
StatsLogger statsLogger)
             throws IOException, InterruptedException, KeeperException, 
UnavailableException,
             CompatibilityException {
         this.conf = conf;
-        Set<Watcher> watchers = new HashSet<Watcher>();
-        // TODO: better session handling for auto recovery daemon  
https://issues.apache.org/jira/browse/BOOKKEEPER-594
-        //       since {@link 
org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager}
-        //       use Watcher, need to ensure the logic works correctly after 
recreating
-        //       a new zookeeper client when session expired.
-        //       for now just shutdown it.
-        watchers.add(new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                // Check for expired connection.
-                if 
(event.getState().equals(Watcher.Event.KeeperState.Expired)) {
-                    LOG.error("ZK client connection to the ZK server has 
expired!");
-                    shutdown(ExitCode.ZK_EXPIRED);
-                }
-            }
-        });
-        zk = ZooKeeperClient.newBuilder()
-                .connectString(ZKMetadataDriverBase.resolveZkServers(conf))
-                .sessionTimeoutMs(conf.getZkTimeout())
-                .watchers(watchers)
-                .build();
+        this.metadataBookieDriver = initializeMetadataDriver(conf, 
statsLogger);
+
         auditorElector = new 
AuditorElector(Bookie.getBookieAddress(conf).toString(), conf,
-                zk, statsLogger.scope(AUDITOR_SCOPE));
-        replicationWorker = new ReplicationWorker(zk, conf, 
statsLogger.scope(REPLICATION_WORKER_SCOPE));
+            getZooKeeperFromMetadataDriver(metadataBookieDriver),
+            statsLogger.scope(AUDITOR_SCOPE));
+        replicationWorker = new ReplicationWorker(
+            getZooKeeperFromMetadataDriver(metadataBookieDriver),
+            conf,
+            statsLogger.scope(REPLICATION_WORKER_SCOPE));
         deathWatcher = new AutoRecoveryDeathWatcher(this);
     }
 
-    public AutoRecoveryMain(ServerConfiguration conf, ZooKeeper zk) throws 
IOException, InterruptedException,
-           KeeperException, UnavailableException, CompatibilityException {
-        this.conf = conf;
-        this.zk = zk;
-        auditorElector = new 
AuditorElector(Bookie.getBookieAddress(conf).toString(), conf, zk);
-        replicationWorker = new ReplicationWorker(zk, conf);
-        deathWatcher = new AutoRecoveryDeathWatcher(this);
+    private MetadataBookieDriver initializeMetadataDriver(ServerConfiguration 
conf, StatsLogger statsLogger)
+            throws IOException {
+        String metadataServiceUri = conf.getMetadataServiceUriUnchecked();
+        MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(
+            URI.create(metadataServiceUri));
+        try {
+            driver.initialize(conf, () -> {}, statsLogger);
+        } catch (MetadataException e) {
+            throw new IOException("Failed to initialize metadata driver at " + 
metadataServiceUri, e);
+        }
+        return driver;
+    }
+
+    // it existing because AuditorElector takes zookeeper
+    ZooKeeper getZooKeeperFromMetadataDriver(MetadataBookieDriver driver) {
+        checkArgument(driver instanceof ZKMetadataBookieDriver);
+        ZKMetadataBookieDriver zkDriver = (ZKMetadataBookieDriver) driver;
+        return zkDriver.getZk();
     }
 
     /*
@@ -169,12 +165,7 @@ private void shutdown(int exitCode) {
             LOG.warn("Interrupted shutting down auditor elector", e);
         }
         replicationWorker.shutdown();
-        try {
-            zk.close();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOG.warn("Interrupted shutting down auto recovery", e);
-        }
+        metadataBookieDriver.close();
     }
 
     private int getExitCode() {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index 2970d3c566..8bb8049ef0 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -74,37 +74,4 @@ public void testShutdown() throws Exception {
                 main.replicationWorker.isRunning());
     }
 
-    /**
-     * Test that, if an autorecovery looses its ZK connection/session
-     * it will shutdown.
-     */
-    @Test
-    public void testAutoRecoverySessionLoss() throws Exception {
-        AutoRecoveryMain main1 = new AutoRecoveryMain(bsConfs.get(0));
-        AutoRecoveryMain main2 = new AutoRecoveryMain(bsConfs.get(1));
-        main1.start();
-        main2.start();
-        Thread.sleep(500);
-        assertTrue("AuditorElectors should be running",
-                main1.auditorElector.isRunning() && 
main2.auditorElector.isRunning());
-        assertTrue("Replication workers should be running",
-                main1.replicationWorker.isRunning() && 
main2.replicationWorker.isRunning());
-
-        zkUtil.expireSession(main1.zk);
-        zkUtil.expireSession(main2.zk);
-
-        for (int i = 0; i < 10; i++) { // give it 10 seconds to shutdown
-            if (!main1.auditorElector.isRunning()
-                && !main2.auditorElector.isRunning()
-                && !main1.replicationWorker.isRunning()
-                && !main2.replicationWorker.isRunning()) {
-                break;
-            }
-            Thread.sleep(1000);
-        }
-        assertFalse("Elector1 should have shutdown", 
main1.auditorElector.isRunning());
-        assertFalse("Elector2 should have shutdown", 
main2.auditorElector.isRunning());
-        assertFalse("RW1 should have shutdown", 
main1.replicationWorker.isRunning());
-        assertFalse("RW2 should have shutdown", 
main2.replicationWorker.isRunning());
-    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to