Repository: hbase Updated Branches: refs/heads/branch-2 c606a565c -> 12a7d2bac
HBASE-14498 Master stuck in infinite loop when all Zookeeper servers are unreachable Signed-off-by: Michael Stack <st...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/12a7d2ba Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/12a7d2ba Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/12a7d2ba Branch: refs/heads/branch-2 Commit: 12a7d2bacef861e0951c4739e6d0106c4a462d34 Parents: c606a56 Author: Pankaj Kumar <pankaj...@huawei.com> Authored: Thu Aug 17 17:06:50 2017 +0800 Committer: Michael Stack <st...@apache.org> Committed: Thu Aug 17 19:03:11 2017 -0700 ---------------------------------------------------------------------- .../hbase/zookeeper/ZooKeeperWatcher.java | 85 +++++++++++++++++++- .../hbase/zookeeper/TestZooKeeperWatcher.java | 47 +++++++++++ .../hbase/regionserver/HRegionServer.java | 2 +- 3 files changed, 131 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/12a7d2ba/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 6bec352..8266c9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -26,6 +26,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -39,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.security.UserGroupInformation; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -76,7 +80,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private final RecoverableZooKeeper recoverableZooKeeper; // abortable in case of zk failure - protected Abortable abortable; + protected final Abortable abortable; // Used if abortable is null private boolean aborted = false; @@ -89,6 +93,13 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { // negotiation to complete public CountDownLatch saslLatch = new CountDownLatch(1); + // Connection timeout on disconnect event + private long connWaitTimeOut; + private AtomicBoolean connected = new AtomicBoolean(false); + private boolean forceAbortOnZKDisconnect; + + // Execute service for zookeeper disconnect event watcher + private ExecutorService zkEventWatcherExecService = null; private final Configuration conf; @@ -122,6 +133,24 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable, boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { + this(conf, identifier, abortable, canCreateBaseZNode, false); + } + + /** + * Instantiate a ZooKeeper connection and watcher. + * @param conf Configuration + * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for + * this instance. Use null for default. + * @param abortable Can be null if there is on error there is no host to abort: e.g. client + * context. + * @param canCreateBaseZNode whether create base node. + * @param forceAbortOnZKDisconnect abort the watcher if true. + * @throws IOException when any IO exception + * @throws ZooKeeperConnectionException when any zookeeper connection exception + */ + public ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable, + boolean canCreateBaseZNode, boolean forceAbortOnZKDisconnect) throws IOException, + ZooKeeperConnectionException { this.conf = conf; this.quorum = ZKConfig.getZKQuorumServersString(conf); this.prefix = identifier; @@ -130,6 +159,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { this.identifier = identifier + "0x0"; this.abortable = abortable; this.znodePaths = new ZNodePaths(conf); + // On Disconnected event a thread will wait for sometime (2/3 of zookeeper.session.timeout), + // it will abort the process if no SyncConnected event reported by the time. + connWaitTimeOut = this.conf.getLong("zookeeper.session.timeout", 90000) * 2 / 3; PendingWatcher pendingWatcher = new PendingWatcher(); this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier); pendingWatcher.prepare(this); @@ -146,6 +178,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { throw zce; } } + this.forceAbortOnZKDisconnect = forceAbortOnZKDisconnect; + if (this.forceAbortOnZKDisconnect) { + this.zkEventWatcherExecService = Executors.newSingleThreadExecutor(); + } } private void createBaseZNodes() throws ZooKeeperConnectionException { @@ -526,11 +562,19 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { Long.toHexString(this.recoverableZooKeeper.getSessionId()); // Update our identifier. Otherwise ignore. LOG.debug(this.identifier + " connected"); + connected.set(true); break; // Abort the server if Disconnected or Expired case Disconnected: - LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring")); + LOG.debug(prefix("Received Disconnected from ZooKeeper.")); + if (forceAbortOnZKDisconnect) { + connected.set(false); + ZKDisconnectEventWatcher task = new ZKDisconnectEventWatcher(); + zkEventWatcherExecService.execute(task); + } else { + LOG.debug("Received Disconnected from ZooKeeper, ignoring."); + } break; case Expired: @@ -553,6 +597,39 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { } } + /* + * Task to watch zookeper disconnect event. + */ + class ZKDisconnectEventWatcher implements Runnable { + @Override + public void run() { + if (connected.get()) { + return; + } + + long startTime = EnvironmentEdgeManager.currentTime(); + while (EnvironmentEdgeManager.currentTime() - startTime < connWaitTimeOut) { + if (connected.get()) { + LOG.debug("Client got reconnected to zookeeper."); + return; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + + if (!connected.get() && abortable != null) { + String msg = + prefix("Couldn't connect to ZooKeeper after waiting " + connWaitTimeOut + + " ms, aborting"); + abortable.abort(msg, new KeeperException.ConnectionLossException()); + } + } + } + /** * Forces a synchronization of this ZooKeeper client connection. * <p> @@ -617,6 +694,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public void close() { try { recoverableZooKeeper.close(); + if (zkEventWatcherExecService != null) { + zkEventWatcherExecService.shutdown(); + zkEventWatcherExecService = null; + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/12a7d2ba/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java index de2ec2a..929bd98 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java @@ -23,14 +23,22 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ SmallTests.class }) public class TestZooKeeperWatcher { + private final static Log LOG = LogFactory.getLog(TestZooKeeperWatcher.class); @Test public void testIsClientReadable() throws ZooKeeperConnectionException, IOException { @@ -57,4 +65,43 @@ public class TestZooKeeperWatcher { watcher.close(); } + @Test + public void testConnectionEvent() throws ZooKeeperConnectionException, IOException { + long zkSessionTimeout = 15000l; + Configuration conf = HBaseConfiguration.create(); + conf.set("zookeeper.session.timeout", "15000"); + + Abortable abortable = new Abortable() { + boolean aborted = false; + + @Override + public void abort(String why, Throwable e) { + aborted = true; + LOG.error(why, e); + } + + @Override + public boolean isAborted() { + return aborted; + } + }; + ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, "testConnectionEvent", abortable, false, true); + + WatchedEvent event = + new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null); + + long startTime = EnvironmentEdgeManager.currentTime(); + while (!abortable.isAborted() + && (EnvironmentEdgeManager.currentTime() - startTime < zkSessionTimeout)) { + watcher.process(event); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + assertTrue(abortable.isAborted()); + watcher.close(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/12a7d2ba/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 0c1814f..524164f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -642,7 +642,7 @@ public class HRegionServer extends HasThread implements if (!conf.getBoolean("hbase.testing.nocluster", false)) { // Open connection to zookeeper and set primary watcher zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" + - rpcServices.isa.getPort(), this, canCreateBaseZNode()); + rpcServices.isa.getPort(), this, canCreateBaseZNode(), true); this.csm = (BaseCoordinatedStateManager) csm; this.csm.initialize(this);