Revert "HBASE-19772 ReadOnlyZKClient improvements" Pushed by mistake. Reverting from master.
This reverts commit 70515f53112599997348aee1d748838f7a78a7fd. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a4a4ce8e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a4a4ce8e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a4a4ce8e Branch: refs/heads/HBASE-19064 Commit: a4a4ce8eac2fb042c87f2a9985cb6e97d41cfa0e Parents: 09ae5ab Author: Michael Stack <[email protected]> Authored: Thu Jan 11 14:27:23 2018 -0800 Committer: Michael Stack <[email protected]> Committed: Thu Jan 11 14:27:23 2018 -0800 ---------------------------------------------------------------------- .../hbase/zookeeper/ReadOnlyZKClient.java | 58 ++++++++------------ 1 file changed, 23 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a4a4ce8e/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 0a9544d..82c011b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -117,8 +116,6 @@ public final class ReadOnlyZKClient implements Closeable { private ZooKeeper zookeeper; - private int pendingRequests = 0; - private String getId() { return String.format("0x%08x", System.identityHashCode(this)); } @@ -130,12 +127,12 @@ public final class ReadOnlyZKClient implements Closeable { this.retryIntervalMs = conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); - LOG.info( - "Start read only zookeeper connection {} to {}, " + "session timeout {} ms, retries {}, " + - "retry interval {} ms, keep alive {} ms", - getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs); - Threads.setDaemonThreadRunning(new Thread(this::run), - "ReadOnlyZKClient-" + connectString + "@" + getId()); + LOG.info("Start read only zookeeper connection " + getId() + " to " + connectString + + ", session timeout " + sessionTimeoutMs + " ms, retries " + maxRetries + + ", retry interval " + retryIntervalMs + " ms, keep alive " + keepAliveTimeMs + " ms"); + Thread t = new Thread(this::run, "ReadOnlyZKClient"); + t.setDaemon(true); + t.start(); } private abstract class ZKTask<T> extends Task { @@ -159,7 +156,6 @@ public final class ReadOnlyZKClient implements Closeable { @Override public void exec(ZooKeeper alwaysNull) { - pendingRequests--; Code code = Code.get(rc); if (code == Code.OK) { future.complete(ret); @@ -173,19 +169,19 @@ public final class ReadOnlyZKClient implements Closeable { future.completeExceptionally(KeeperException.create(code, path)); } else { if (code == Code.SESSIONEXPIRED) { - LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString); + LOG.warn(getId() + " session expired, close and reconnect"); try { zk.close(); } catch (InterruptedException e) { } } if (ZKTask.this.delay(retryIntervalMs, maxRetries)) { - LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(), - connectString, operationType, path, code, ZKTask.this.retries); + LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " + + code + ", retries = " + ZKTask.this.retries); tasks.add(ZKTask.this); } else { - LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(), - connectString, operationType, path, code, ZKTask.this.retries); + LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " + + code + ", retries = " + ZKTask.this.retries + ", give up"); future.completeExceptionally(KeeperException.create(code, path)); } } @@ -209,14 +205,6 @@ public final class ReadOnlyZKClient implements Closeable { return true; } - protected abstract void doExec(ZooKeeper zk); - - @Override - public final void exec(ZooKeeper zk) { - pendingRequests++; - doExec(zk); - } - public boolean delay(long intervalMs, int maxRetries) { if (retries >= maxRetries) { return false; @@ -229,12 +217,14 @@ public final class ReadOnlyZKClient implements Closeable { @Override public void connectFailed(IOException e) { if (delay(retryIntervalMs, maxRetries)) { - LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(), - connectString, operationType, path, retries, e); + LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path + + ", retries = " + retries, + e); tasks.add(this); } else { - LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(), - connectString, operationType, path, retries, e); + LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path + + ", retries = " + retries + ", give up", + e); future.completeExceptionally(e); } } @@ -259,7 +249,7 @@ public final class ReadOnlyZKClient implements Closeable { tasks.add(new ZKTask<byte[]>(path, future, "get") { @Override - protected void doExec(ZooKeeper zk) { + public void exec(ZooKeeper zk) { zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null); } @@ -275,7 +265,7 @@ public final class ReadOnlyZKClient implements Closeable { tasks.add(new ZKTask<Stat>(path, future, "exists") { @Override - protected void doExec(ZooKeeper zk) { + public void exec(ZooKeeper zk) { zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null); } }); @@ -321,11 +311,9 @@ public final class ReadOnlyZKClient implements Closeable { if (task == CLOSE) { break; } - if (task == null && pendingRequests == 0) { - LOG.info( - "{} to {} no activities for {} ms, close active connection. " + - "Will reconnect next time when there are new requests", - getId(), connectString, keepAliveTimeMs); + if (task == null) { + LOG.info(getId() + " no activities for " + keepAliveTimeMs + + " ms, close active connection. Will reconnect next time when there are new requests."); closeZk(); continue; } @@ -351,7 +339,7 @@ public final class ReadOnlyZKClient implements Closeable { @Override public void close() { if (closed.compareAndSet(false, true)) { - LOG.info("Close zookeeper connection {} to {}", getId(), connectString); + LOG.info("Close zookeeper connection " + getId() + " to " + connectString); tasks.add(CLOSE); } }
