Repository: hbase
Updated Branches:
  refs/heads/branch-2 c606a565c -> 12a7d2bac


HBASE-14498 Master stuck in infinite loop when all Zookeeper servers are 
unreachable

Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/12a7d2ba
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/12a7d2ba
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/12a7d2ba

Branch: refs/heads/branch-2
Commit: 12a7d2bacef861e0951c4739e6d0106c4a462d34
Parents: c606a56
Author: Pankaj Kumar <pankaj...@huawei.com>
Authored: Thu Aug 17 17:06:50 2017 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Thu Aug 17 19:03:11 2017 -0700

----------------------------------------------------------------------
 .../hbase/zookeeper/ZooKeeperWatcher.java       | 85 +++++++++++++++++++-
 .../hbase/zookeeper/TestZooKeeperWatcher.java   | 47 +++++++++++
 .../hbase/regionserver/HRegionServer.java       |  2 +-
 3 files changed, 131 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/12a7d2ba/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index 6bec352..8266c9a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -26,6 +26,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -39,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.security.Superusers;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -76,7 +80,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, 
Closeable {
   private final RecoverableZooKeeper recoverableZooKeeper;
 
   // abortable in case of zk failure
-  protected Abortable abortable;
+  protected final Abortable abortable;
   // Used if abortable is null
   private boolean aborted = false;
 
@@ -89,6 +93,13 @@ public class ZooKeeperWatcher implements Watcher, Abortable, 
Closeable {
   // negotiation to complete
   public CountDownLatch saslLatch = new CountDownLatch(1);
 
+  // Connection timeout on disconnect event
+  private long connWaitTimeOut;
+  private AtomicBoolean connected = new AtomicBoolean(false);
+  private boolean forceAbortOnZKDisconnect;
+ 
+  // Execute service for zookeeper disconnect event watcher
+  private ExecutorService zkEventWatcherExecService = null;
 
 
   private final Configuration conf;
@@ -122,6 +133,24 @@ public class ZooKeeperWatcher implements Watcher, 
Abortable, Closeable {
   public ZooKeeperWatcher(Configuration conf, String identifier,
       Abortable abortable, boolean canCreateBaseZNode)
   throws IOException, ZooKeeperConnectionException {
+    this(conf, identifier, abortable, canCreateBaseZNode, false);
+  }
+
+  /**
+   * Instantiate a ZooKeeper connection and watcher.
+   * @param conf Configuration
+   * @param identifier string that is passed to RecoverableZookeeper to be 
used as identifier for
+   *          this instance. Use null for default.
+   * @param abortable Can be null if there is on error there is no host to 
abort: e.g. client
+   *          context.
+   * @param canCreateBaseZNode whether create base node.
+   * @param forceAbortOnZKDisconnect abort the watcher if true.
+   * @throws IOException when any IO exception
+   * @throws ZooKeeperConnectionException when any zookeeper connection 
exception
+   */
+  public ZooKeeperWatcher(Configuration conf, String identifier, Abortable 
abortable,
+      boolean canCreateBaseZNode, boolean forceAbortOnZKDisconnect) throws 
IOException,
+      ZooKeeperConnectionException {
     this.conf = conf;
     this.quorum = ZKConfig.getZKQuorumServersString(conf);
     this.prefix = identifier;
@@ -130,6 +159,9 @@ public class ZooKeeperWatcher implements Watcher, 
Abortable, Closeable {
     this.identifier = identifier + "0x0";
     this.abortable = abortable;
     this.znodePaths = new ZNodePaths(conf);
+    // On Disconnected event a thread will wait for sometime (2/3 of 
zookeeper.session.timeout),
+    // it will abort the process if no SyncConnected event reported by the 
time.
+    connWaitTimeOut = this.conf.getLong("zookeeper.session.timeout", 90000) * 
2 / 3;
     PendingWatcher pendingWatcher = new PendingWatcher();
     this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, 
identifier);
     pendingWatcher.prepare(this);
@@ -146,6 +178,10 @@ public class ZooKeeperWatcher implements Watcher, 
Abortable, Closeable {
         throw zce;
       }
     }
+    this.forceAbortOnZKDisconnect = forceAbortOnZKDisconnect;
+    if (this.forceAbortOnZKDisconnect) {
+      this.zkEventWatcherExecService = Executors.newSingleThreadExecutor();
+    }
   }
 
   private void createBaseZNodes() throws ZooKeeperConnectionException {
@@ -526,11 +562,19 @@ public class ZooKeeperWatcher implements Watcher, 
Abortable, Closeable {
           Long.toHexString(this.recoverableZooKeeper.getSessionId());
         // Update our identifier.  Otherwise ignore.
         LOG.debug(this.identifier + " connected");
+        connected.set(true);
         break;
 
       // Abort the server if Disconnected or Expired
       case Disconnected:
-        LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
+        LOG.debug(prefix("Received Disconnected from ZooKeeper."));
+        if (forceAbortOnZKDisconnect) {
+          connected.set(false);
+          ZKDisconnectEventWatcher task = new ZKDisconnectEventWatcher();
+          zkEventWatcherExecService.execute(task);
+        } else {
+          LOG.debug("Received Disconnected from ZooKeeper, ignoring.");
+        }
         break;
 
       case Expired:
@@ -553,6 +597,39 @@ public class ZooKeeperWatcher implements Watcher, 
Abortable, Closeable {
     }
   }
 
+  /*
+   * Task to watch zookeper disconnect event.
+   */
+  class ZKDisconnectEventWatcher implements Runnable {
+    @Override
+    public void run() {
+      if (connected.get()) {
+        return;
+      }
+
+      long startTime = EnvironmentEdgeManager.currentTime();
+      while (EnvironmentEdgeManager.currentTime() - startTime < 
connWaitTimeOut) {
+        if (connected.get()) {
+          LOG.debug("Client got reconnected to zookeeper.");
+          return;
+        }
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+
+      if (!connected.get() && abortable != null) {
+        String msg =
+            prefix("Couldn't connect to ZooKeeper after waiting " + 
connWaitTimeOut
+                + " ms, aborting");
+        abortable.abort(msg, new KeeperException.ConnectionLossException());
+      }
+    }
+  }
+
   /**
    * Forces a synchronization of this ZooKeeper client connection.
    * <p>
@@ -617,6 +694,10 @@ public class ZooKeeperWatcher implements Watcher, 
Abortable, Closeable {
   public void close() {
     try {
       recoverableZooKeeper.close();
+      if (zkEventWatcherExecService != null) {
+        zkEventWatcherExecService.shutdown();
+        zkEventWatcherExecService = null;
+      }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/12a7d2ba/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java
index de2ec2a..929bd98 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java
@@ -23,14 +23,22 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({ SmallTests.class })
 public class TestZooKeeperWatcher {
+  private final static Log LOG = LogFactory.getLog(TestZooKeeperWatcher.class);
 
   @Test
   public void testIsClientReadable() throws ZooKeeperConnectionException, 
IOException {
@@ -57,4 +65,43 @@ public class TestZooKeeperWatcher {
     watcher.close();
   }
 
+  @Test
+  public void testConnectionEvent() throws ZooKeeperConnectionException, 
IOException {
+    long zkSessionTimeout = 15000l;
+    Configuration conf = HBaseConfiguration.create();
+    conf.set("zookeeper.session.timeout", "15000");
+
+    Abortable abortable = new Abortable() {
+      boolean aborted = false;
+
+      @Override
+      public void abort(String why, Throwable e) {
+        aborted = true;
+        LOG.error(why, e);
+      }
+
+      @Override
+      public boolean isAborted() {
+        return aborted;
+      }
+    };
+    ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, 
"testConnectionEvent", abortable, false, true);
+
+    WatchedEvent event =
+        new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.Disconnected, null);
+
+    long startTime = EnvironmentEdgeManager.currentTime();
+    while (!abortable.isAborted()
+        && (EnvironmentEdgeManager.currentTime() - startTime < 
zkSessionTimeout)) {
+      watcher.process(event);
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    assertTrue(abortable.isAborted());
+    watcher.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/12a7d2ba/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 0c1814f..524164f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -642,7 +642,7 @@ public class HRegionServer extends HasThread implements
     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
       // Open connection to zookeeper and set primary watcher
       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
-        rpcServices.isa.getPort(), this, canCreateBaseZNode());
+        rpcServices.isa.getPort(), this, canCreateBaseZNode(), true);
 
       this.csm = (BaseCoordinatedStateManager) csm;
       this.csm.initialize(this);

Reply via email to