This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f0d2a1  autorecovery-use-metadata-driver (part 1) : move 
AutoRecoveryMain to use MetadataBookieDriver
4f0d2a1 is described below

commit 4f0d2a195bd9be3788876b47813cee1440cf005c
Author: Qi Wang <[email protected]>
AuthorDate: Sun Sep 23 23:09:37 2018 -0700

    autorecovery-use-metadata-driver (part 1) : move AutoRecoveryMain to use 
MetadataBookieDriver
    
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    We are introducing Etcd as the metadata storage. However AutoRecovery 
currently is still tied to zookeeper. In order to use Etcd as the metadata 
storage, we have to move AutoRecovery related classes to use metadata driver 
API.
    
    ### Changes
    
    This is the first change for changing AutoRecovery to use metadata driver. 
It changed AutoRecoveryMain to use metadata driver api and also removed the 
shutdown logic on session expired, which doesn't make any sense for current 
retryable zookeeper.
    
    
    
    
    
    Author: Qi Wang <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>, Enrico Olivelli 
<[email protected]>
    
    This closes #1693 from codingwangqi/autorecovery_metadata
---
 .../bookkeeper/replication/AutoRecoveryMain.java   | 87 ++++++++++------------
 .../replication/AutoRecoveryMainTest.java          | 33 --------
 2 files changed, 39 insertions(+), 81 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index 9830c59..3369de8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.replication;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;
 
@@ -28,8 +29,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
-import java.util.HashSet;
-import java.util.Set;
+import java.net.URI;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieCriticalThread;
@@ -37,13 +37,15 @@ import org.apache.bookkeeper.bookie.ExitCode;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.http.HttpServer;
 import org.apache.bookkeeper.http.HttpServerLoader;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
 import 
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.server.http.BKHttpServiceProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -51,8 +53,6 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,12 +66,12 @@ public class AutoRecoveryMain {
     private static final Logger LOG = LoggerFactory
             .getLogger(AutoRecoveryMain.class);
 
-    private ServerConfiguration conf;
-    ZooKeeper zk;
-    AuditorElector auditorElector;
-    ReplicationWorker replicationWorker;
-    private AutoRecoveryDeathWatcher deathWatcher;
-    private int exitCode;
+    private final ServerConfiguration conf;
+    final MetadataBookieDriver metadataBookieDriver;
+    final AuditorElector auditorElector;
+    final ReplicationWorker replicationWorker;
+    final AutoRecoveryDeathWatcher deathWatcher;
+    int exitCode;
     private volatile boolean shuttingDown = false;
     private volatile boolean running = false;
 
@@ -85,40 +85,36 @@ public class AutoRecoveryMain {
             throws IOException, InterruptedException, KeeperException, 
UnavailableException,
             CompatibilityException {
         this.conf = conf;
-        Set<Watcher> watchers = new HashSet<Watcher>();
-        // TODO: better session handling for auto recovery daemon  
https://issues.apache.org/jira/browse/BOOKKEEPER-594
-        //       since {@link 
org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager}
-        //       use Watcher, need to ensure the logic works correctly after 
recreating
-        //       a new zookeeper client when session expired.
-        //       for now just shutdown it.
-        watchers.add(new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                // Check for expired connection.
-                if 
(event.getState().equals(Watcher.Event.KeeperState.Expired)) {
-                    LOG.error("ZK client connection to the ZK server has 
expired!");
-                    shutdown(ExitCode.ZK_EXPIRED);
-                }
-            }
-        });
-        zk = ZooKeeperClient.newBuilder()
-                .connectString(ZKMetadataDriverBase.resolveZkServers(conf))
-                .sessionTimeoutMs(conf.getZkTimeout())
-                .watchers(watchers)
-                .build();
+        this.metadataBookieDriver = initializeMetadataDriver(conf, 
statsLogger);
+
         auditorElector = new 
AuditorElector(Bookie.getBookieAddress(conf).toString(), conf,
-                zk, statsLogger.scope(AUDITOR_SCOPE));
-        replicationWorker = new ReplicationWorker(zk, conf, 
statsLogger.scope(REPLICATION_WORKER_SCOPE));
+            getZooKeeperFromMetadataDriver(metadataBookieDriver),
+            statsLogger.scope(AUDITOR_SCOPE));
+        replicationWorker = new ReplicationWorker(
+            getZooKeeperFromMetadataDriver(metadataBookieDriver),
+            conf,
+            statsLogger.scope(REPLICATION_WORKER_SCOPE));
         deathWatcher = new AutoRecoveryDeathWatcher(this);
     }
 
-    public AutoRecoveryMain(ServerConfiguration conf, ZooKeeper zk) throws 
IOException, InterruptedException,
-           KeeperException, UnavailableException, CompatibilityException {
-        this.conf = conf;
-        this.zk = zk;
-        auditorElector = new 
AuditorElector(Bookie.getBookieAddress(conf).toString(), conf, zk);
-        replicationWorker = new ReplicationWorker(zk, conf);
-        deathWatcher = new AutoRecoveryDeathWatcher(this);
+    private MetadataBookieDriver initializeMetadataDriver(ServerConfiguration 
conf, StatsLogger statsLogger)
+            throws IOException {
+        String metadataServiceUri = conf.getMetadataServiceUriUnchecked();
+        MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(
+            URI.create(metadataServiceUri));
+        try {
+            driver.initialize(conf, () -> {}, statsLogger);
+        } catch (MetadataException e) {
+            throw new IOException("Failed to initialize metadata driver at " + 
metadataServiceUri, e);
+        }
+        return driver;
+    }
+
+    // it existing because AuditorElector takes zookeeper
+    ZooKeeper getZooKeeperFromMetadataDriver(MetadataBookieDriver driver) {
+        checkArgument(driver instanceof ZKMetadataBookieDriver);
+        ZKMetadataBookieDriver zkDriver = (ZKMetadataBookieDriver) driver;
+        return zkDriver.getZk();
     }
 
     /*
@@ -169,12 +165,7 @@ public class AutoRecoveryMain {
             LOG.warn("Interrupted shutting down auditor elector", e);
         }
         replicationWorker.shutdown();
-        try {
-            zk.close();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOG.warn("Interrupted shutting down auto recovery", e);
-        }
+        metadataBookieDriver.close();
     }
 
     private int getExitCode() {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index 2970d3c..8bb8049 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -74,37 +74,4 @@ public class AutoRecoveryMainTest extends 
BookKeeperClusterTestCase {
                 main.replicationWorker.isRunning());
     }
 
-    /**
-     * Test that, if an autorecovery looses its ZK connection/session
-     * it will shutdown.
-     */
-    @Test
-    public void testAutoRecoverySessionLoss() throws Exception {
-        AutoRecoveryMain main1 = new AutoRecoveryMain(bsConfs.get(0));
-        AutoRecoveryMain main2 = new AutoRecoveryMain(bsConfs.get(1));
-        main1.start();
-        main2.start();
-        Thread.sleep(500);
-        assertTrue("AuditorElectors should be running",
-                main1.auditorElector.isRunning() && 
main2.auditorElector.isRunning());
-        assertTrue("Replication workers should be running",
-                main1.replicationWorker.isRunning() && 
main2.replicationWorker.isRunning());
-
-        zkUtil.expireSession(main1.zk);
-        zkUtil.expireSession(main2.zk);
-
-        for (int i = 0; i < 10; i++) { // give it 10 seconds to shutdown
-            if (!main1.auditorElector.isRunning()
-                && !main2.auditorElector.isRunning()
-                && !main1.replicationWorker.isRunning()
-                && !main2.replicationWorker.isRunning()) {
-                break;
-            }
-            Thread.sleep(1000);
-        }
-        assertFalse("Elector1 should have shutdown", 
main1.auditorElector.isRunning());
-        assertFalse("Elector2 should have shutdown", 
main2.auditorElector.isRunning());
-        assertFalse("RW1 should have shutdown", 
main1.replicationWorker.isRunning());
-        assertFalse("RW2 should have shutdown", 
main2.replicationWorker.isRunning());
-    }
 }

Reply via email to