Updated Branches: refs/heads/CURATOR-24 [created] 11ae23adc
CURATOR-24 Improve the handling of hung ZooKeeper connections Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/11ae23ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/11ae23ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/11ae23ad Branch: refs/heads/CURATOR-24 Commit: 11ae23adc013fa6526ac001bce2f5b2967a1d9b5 Parents: 38f28b5 Author: randgalt <[email protected]> Authored: Thu May 9 17:18:09 2013 -0700 Committer: randgalt <[email protected]> Committed: Thu May 9 17:18:09 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/curator/ConnectionState.java | 169 +++++++-------- .../org/apache/curator/CuratorZookeeperClient.java | 9 - .../framework/state/ConnectionStateManager.java | 5 - 3 files changed, 84 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-client/src/main/java/org/apache/curator/ConnectionState.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java index 8de3e27..bbb0588 100644 --- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java +++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java @@ -38,24 +38,23 @@ import java.util.concurrent.atomic.AtomicReference; class ConnectionState implements Watcher, Closeable { - private volatile long connectionStartMs = 0; - - private final Logger log = LoggerFactory.getLogger(getClass()); - private final HandleHolder zooKeeper; - private final AtomicBoolean isConnected = new AtomicBoolean(false); - private final AtomicBoolean lost = new AtomicBoolean(false); - private final EnsembleProvider ensembleProvider; - private final int connectionTimeoutMs; + private static final int MAX_BACKGROUND_EXCEPTIONS = 10; + private static final boolean LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS); + private final Logger log = LoggerFactory.getLogger(getClass()); + private final HandleHolder zooKeeper; + private final AtomicBoolean isConnected = new AtomicBoolean(false); + private final EnsembleProvider ensembleProvider; + private final int sessionTimeoutMs; + private final int connectionTimeoutMs; private final AtomicReference<TracerDriver> tracer; - private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>(); - private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>(); - - private static final int MAX_BACKGROUND_EXCEPTIONS = 10; - private static final boolean LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS); + private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>(); + private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>(); + private volatile long connectionStartMs = 0; ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly) { this.ensembleProvider = ensembleProvider; + this.sessionTimeoutMs = sessionTimeoutMs; this.connectionTimeoutMs = connectionTimeoutMs; this.tracer = tracer; if ( parentWatcher != null ) @@ -73,12 +72,6 @@ class ConnectionState implements Watcher, Closeable throw new SessionFailRetryLoop.SessionFailedException(); } - if ( lost.compareAndSet(true, false) ) - { - log.info("resetting after loss"); - reset(); - } - Exception exception = backgroundExceptions.poll(); if ( exception != null ) { @@ -90,24 +83,7 @@ class ConnectionState implements Watcher, Closeable boolean localIsConnected = isConnected.get(); if ( !localIsConnected ) { - long elapsed = System.currentTimeMillis() - connectionStartMs; - if ( elapsed >= connectionTimeoutMs ) - { - if ( zooKeeper.hasNewConnectionString() ) - { - handleNewConnectionString(); - } - else - { - KeeperException.ConnectionLossException connectionLossException = new KeeperException.ConnectionLossException(); - if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) - { - log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException); - } - tracer.get().addCount("connections-timed-out", 1); - throw connectionLossException; - } - } + checkTimeouts(); } return zooKeeper.getZooKeeper(); @@ -118,7 +94,7 @@ class ConnectionState implements Watcher, Closeable return isConnected.get(); } - void start() throws Exception + void start() throws Exception { log.debug("Starting"); ensembleProvider.start(); @@ -126,7 +102,7 @@ class ConnectionState implements Watcher, Closeable } @Override - public void close() throws IOException + public void close() throws IOException { log.debug("Closing"); @@ -142,27 +118,19 @@ class ConnectionState implements Watcher, Closeable finally { isConnected.set(false); - lost.set(false); } } - void addParentWatcher(Watcher watcher) + void addParentWatcher(Watcher watcher) { parentWatchers.offer(watcher); } - void removeParentWatcher(Watcher watcher) + void removeParentWatcher(Watcher watcher) { parentWatchers.remove(watcher); } - void markLost() - { - log.info("lost marked"); - - lost.set(true); - } - @Override public void process(WatchedEvent event) { @@ -188,10 +156,6 @@ class ConnectionState implements Watcher, Closeable if ( newIsConnected != wasConnected ) { isConnected.set(newIsConnected); - if ( newIsConnected ) - { - lost.set(false); - } connectionStartMs = System.currentTimeMillis(); } } @@ -201,6 +165,41 @@ class ConnectionState implements Watcher, Closeable return ensembleProvider; } + private synchronized void checkTimeouts() throws Exception + { + int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs); + long elapsed = System.currentTimeMillis() - connectionStartMs; + if ( elapsed >= minTimeout ) + { + if ( zooKeeper.hasNewConnectionString() ) + { + handleNewConnectionString(); + } + else + { + int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs); + if ( elapsed > maxTimeout ) + { + if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) + { + log.warn(String.format("Connection attempt unsuccessful after %d (greater than max timeout of %d). Resetting connection and trying again with a new connection.", elapsed, maxTimeout)); + } + reset(); + } + else + { + KeeperException.ConnectionLossException connectionLossException = new KeeperException.ConnectionLossException(); + if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) + { + log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException); + } + tracer.get().addCount("connections-timed-out", 1); + throw connectionLossException; + } + } + } + } + private synchronized void reset() throws Exception { log.debug("reset"); @@ -213,44 +212,44 @@ class ConnectionState implements Watcher, Closeable private boolean checkState(Event.KeeperState state, boolean wasConnected) { - boolean isConnected = wasConnected; - boolean checkNewConnectionString = true; + boolean isConnected = wasConnected; + boolean checkNewConnectionString = true; switch ( state ) { - default: - case Disconnected: - { - isConnected = false; - break; - } + default: + case Disconnected: + { + isConnected = false; + break; + } - case SyncConnected: - case ConnectedReadOnly: - { - isConnected = true; - break; - } + case SyncConnected: + case ConnectedReadOnly: + { + isConnected = true; + break; + } - case AuthFailed: - { - isConnected = false; - log.error("Authentication failed"); - break; - } + case AuthFailed: + { + isConnected = false; + log.error("Authentication failed"); + break; + } - case Expired: - { - isConnected = false; - checkNewConnectionString = false; - handleExpiredSession(); - break; - } + case Expired: + { + isConnected = false; + checkNewConnectionString = false; + handleExpiredSession(); + break; + } - case SaslAuthenticated: - { - // NOP - break; - } + case SaslAuthenticated: + { + // NOP + break; + } } if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() ) http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java index be63a9b..f633fe1 100644 --- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java +++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java @@ -184,15 +184,6 @@ public class CuratorZookeeperClient implements Closeable } /** - * Mark the connection as lost. The next time {@link #getZooKeeper()} is called, - * a new connection will be created. - */ - public void markLost() - { - state.markLost(); - } - - /** * Close the client */ public void close() http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 8f5d9ff..2ed60c1 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -141,11 +141,6 @@ public class ConnectionStateManager implements Closeable return; } - if ( newState == ConnectionState.LOST ) - { - client.getZookeeperClient().markLost(); - } - ConnectionState previousState = currentState.getAndSet(newState); if ( previousState == newState ) {
