This is an automated email from the ASF dual-hosted git repository.
bharathv pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new d032dfd HBASE-24603: Make Zookeeper sync() call synchronous (#1945)
(#1975)
d032dfd is described below
commit d032dfd8100e1e7dbcca5a23c635654902824462
Author: Bharath Vissapragada <[email protected]>
AuthorDate: Thu Jun 25 16:41:11 2020 -0700
HBASE-24603: Make Zookeeper sync() call synchronous (#1945) (#1975)
Writing a test for this is tricky. There is enough coverage for
functional tests. Only concern is performance, but there is enough
logging for it to detect timed out/badly performing sync calls.
Additionally, this patch decouples the ZK event processing into it's
own thread rather than doing it in the EventThread's context. That
avoids deadlocks and stalls of the event thread.
Signed-off-by: Andrew Purtell <[email protected]>
Signed-off-by: Viraj Jasani <[email protected]>
(cherry picked from commit 84e246f9b197bfa4307172db5465214771b78d38)
---
.../java/org/apache/hadoop/hbase/HConstants.java | 9 +++
.../java/org/apache/hadoop/hbase/util/Threads.java | 2 +-
.../hbase/backup/example/HFileArchiveManager.java | 4 +-
.../visibility/ZKVisibilityLabelWatcher.java | 2 +-
.../hbase/zookeeper/RecoverableZooKeeper.java | 2 +-
.../apache/hadoop/hbase/zookeeper/ZKWatcher.java | 85 ++++++++++++++++------
6 files changed, 77 insertions(+), 27 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 93bf432..b41c230 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -271,6 +271,15 @@ public final class HConstants {
/** Configuration key for ZooKeeper session timeout */
public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout";
+ /** Timeout for the ZK sync() call */
+ public static final String ZK_SYNC_BLOCKING_TIMEOUT_MS =
"hbase.zookeeper.sync.timeout.millis";
+ // Choice of the default value is based on the following ZK recommendation
(from docs). Keeping it
+ // lower lets the callers fail fast in case of any issues.
+ // "The clients view of the system is guaranteed to be up-to-date within a
certain time bound.
+ // (On the order of tens of seconds.) Either system changes will be seen by
a client within this
+ // bound, or the client will detect a service outage."
+ public static final long ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS = 30 * 1000;
+
/** Default value for ZooKeeper session timeout */
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 90 * 1000;
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
index 1ca6c2e..3527340 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
@@ -204,7 +204,7 @@ public class Threads {
* @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
- private static ThreadFactory getNamedThreadFactory(final String prefix) {
+ public static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
Thread.currentThread()
.getThreadGroup();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java
index c51d493..7abf6bb 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java
@@ -125,7 +125,7 @@ class HFileArchiveManager {
*/
private void disable(ZKWatcher zooKeeper, byte[] table) throws
KeeperException {
// ensure the latest state of the archive node is found
- zooKeeper.sync(archiveZnode);
+ zooKeeper.syncOrTimeout(archiveZnode);
// if the top-level archive node is gone, then we are done
if (ZKUtil.checkExists(zooKeeper, archiveZnode) < 0) {
@@ -134,7 +134,7 @@ class HFileArchiveManager {
// delete the table node, from the archive
String tableNode = this.getTableNode(table);
// make sure the table is the latest version so the delete takes
- zooKeeper.sync(tableNode);
+ zooKeeper.syncOrTimeout(tableNode);
LOG.debug("Attempting to delete table node:" + tableNode);
ZKUtil.deleteNodeRecursively(zooKeeper, tableNode);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java
index 0bbc8d3..bcb3b8b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java
@@ -111,7 +111,7 @@ public class ZKVisibilityLabelWatcher extends ZKListener {
public void nodeDataChanged(String path) {
if (path.equals(labelZnode) || path.equals(userAuthsZnode)) {
try {
- watcher.sync(path);
+ watcher.syncOrTimeout(path);
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
if (path.equals(labelZnode)) {
refreshVisibilityLabelsCache(data);
diff --git
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
index f3d6884..c3a7470 100644
---
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
+++
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
@@ -721,7 +721,7 @@ public class RecoverableZooKeeper {
}
public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx)
throws KeeperException {
- checkZk().sync(path, cb, null);
+ checkZk().sync(path, cb, ctx);
}
/**
diff --git
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
index 4bea587..574e105 100644
---
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
+++
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
@@ -24,14 +24,20 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.security.Superusers;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@@ -81,12 +87,22 @@ public class ZKWatcher implements Watcher, Abortable,
Closeable {
// listeners to be notified
private final List<ZKListener> listeners = new CopyOnWriteArrayList<>();
- // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL
- // negotiation to complete
- private CountDownLatch saslLatch = new CountDownLatch(1);
+ // Single threaded executor pool that processes event notifications from
Zookeeper. Events are
+ // processed in the order in which they arrive (pool backed by an unbounded
fifo queue). We do
+ // this to decouple the event processing from Zookeeper's ClientCnxn's
EventThread context.
+ // EventThread internally runs a single while loop to serially process all
the events. When events
+ // are processed by the listeners in the same thread, that blocks the
EventThread from processing
+ // subsequent events. Processing events in a separate thread frees up the
event thread to continue
+ // and further prevents deadlocks if the process method itself makes other
zookeeper calls.
+ // It is ok to do it in a single thread because the Zookeeper ClientCnxn
already serializes the
+ // requests using a single while loop and hence there is no performance
degradation.
+ private final ExecutorService zkEventProcessor =
+
Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory("zk-event-processor"));
private final Configuration conf;
+ private final long zkSyncTimeout;
+
/* A pattern that matches a Kerberos name, borrowed from Hadoop's
KerberosName */
private static final Pattern NAME_PATTERN =
Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
@@ -175,6 +191,8 @@ public class ZKWatcher implements Watcher, Abortable,
Closeable {
throw zce;
}
}
+ this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS,
+ HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS);
}
private void createBaseZNodes() throws ZooKeeperConnectionException {
@@ -467,21 +485,8 @@ public class ZKWatcher implements Watcher, Abortable,
Closeable {
return znodePaths;
}
- /**
- * Method called from ZooKeeper for events and connection status.
- * <p>
- * Valid events are passed along to listeners. Connection status changes
- * are dealt with locally.
- */
- @Override
- public void process(WatchedEvent event) {
- LOG.debug(prefix("Received ZooKeeper Event, " +
- "type=" + event.getType() + ", " +
- "state=" + event.getState() + ", " +
- "path=" + event.getPath()));
-
+ private void processEvent(WatchedEvent event) {
switch(event.getType()) {
-
// If event type is NONE, this is a connection status change
case None: {
connectionEvent(event);
@@ -489,7 +494,6 @@ public class ZKWatcher implements Watcher, Abortable,
Closeable {
}
// Otherwise pass along to the listeners
-
case NodeCreated: {
for(ZKListener listener : listeners) {
listener.nodeCreated(event.getPath());
@@ -518,10 +522,26 @@ public class ZKWatcher implements Watcher, Abortable,
Closeable {
break;
}
default:
- throw new IllegalStateException("Received event is not valid: " +
event.getState());
+ LOG.error("Invalid event of type {} received for path {}. Ignoring.",
+ event.getState(), event.getPath());
}
}
+ /**
+ * Method called from ZooKeeper for events and connection status.
+ * <p>
+ * Valid events are passed along to listeners. Connection status changes
+ * are dealt with locally.
+ */
+ @Override
+ public void process(WatchedEvent event) {
+ LOG.debug(prefix("Received ZooKeeper Event, " +
+ "type=" + event.getType() + ", " +
+ "state=" + event.getState() + ", " +
+ "path=" + event.getPath()));
+ zkEventProcessor.submit(() -> processEvent(event));
+ }
+
// Connection management
/**
@@ -569,7 +589,8 @@ public class ZKWatcher implements Watcher, Abortable,
Closeable {
}
/**
- * Forces a synchronization of this ZooKeeper client connection.
+ * Forces a synchronization of this ZooKeeper client connection within a
timeout. Enforcing a
+ * timeout lets the callers fail-fast rather than wait forever for the sync
to finish.
* <p>
* Executing this method before running other methods will ensure that the
* subsequent operations are up-to-date and consistent as of the time that
@@ -579,9 +600,28 @@ public class ZKWatcher implements Watcher, Abortable,
Closeable {
* data of an existing node and delete or transition that node, utilizing the
* previously read version and data. We want to ensure that the version read
* is up-to-date from when we begin the operation.
+ * <p>
*/
- public void sync(String path) throws KeeperException {
- this.recoverableZooKeeper.sync(path, null, null);
+ public void syncOrTimeout(String path) throws KeeperException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ long startTime = EnvironmentEdgeManager.currentTime();
+ this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null);
+ try {
+ if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) {
+ LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms.
This usually points "
+ + "to a ZK side issue. Check ZK server logs and metrics.",
zkSyncTimeout);
+ throw new KeeperException.OperationTimeoutException();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted waiting for ZK sync() to finish.", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ // TODO: Switch to a metric once server side ZK watcher metrics are
implemented. This is a
+ // useful metric to have since the latency of sync() impacts the callers.
+ LOG.debug("ZK sync() operation took {}ms",
EnvironmentEdgeManager.currentTime() - startTime);
+ }
}
/**
@@ -631,6 +671,7 @@ public class ZKWatcher implements Watcher, Abortable,
Closeable {
*/
@Override
public void close() {
+ zkEventProcessor.shutdownNow();
try {
recoverableZooKeeper.close();
} catch (InterruptedException e) {