CURATOR-110 - Moved the 'wait until connection established' logic into
the ExecuteAfterConnectionEstablished utility class. Cleaned up the
blockUntilConnected() logic in the CuratorFrameworkImpl

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e8138ed9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e8138ed9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e8138ed9

Branch: refs/heads/master
Commit: e8138ed9768e99e98e308845626e36aa55afadb0
Parents: 63d0401
Author: Cameron McKenzie <[email protected]>
Authored: Mon Jun 16 15:59:56 2014 +1000
Committer: Cameron McKenzie <[email protected]>
Committed: Mon Jun 16 15:59:56 2014 +1000

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java     |  9 +---
 .../framework/imps/CuratorFrameworkImpl.java    | 22 +++-----
 .../framework/state/ConnectionStateManager.java |  5 ++
 .../ExecuteAfterConnectionEstablished.java      | 53 ++++++++++++++++++++
 .../framework/recipes/leader/LeaderLatch.java   | 44 ++++++----------
 5 files changed, 81 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 1df3fa5..13cff30 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -213,14 +213,7 @@ public interface CuratorFramework extends Closeable
      * @param watcher the watcher
      */
     public void clearWatcherReferences(Watcher watcher);
-    
-    /**
-     * Get the current connection state. The connection state will have a 
value of 0 until
-     * the first connection related event is received.
-     * @return The current connection state, or null if it is unknown 
-     */
-    public ConnectionState getCurrentConnectionState();
-    
+        
     /**
      * Block until a connection to ZooKeeper is available or the maxWaitTime 
has been exceeded
      * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait 
indefinitely

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 33e260d..d1de29f 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -67,7 +67,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final BlockingQueue<OperationAndData<?>>                    
backgroundOperations;
     private final NamespaceImpl                                         
namespace;
     private final ConnectionStateManager                                
connectionStateManager;
-    private final AtomicReference<ConnectionState>                             
                connectionState;
     private final AtomicReference<AuthInfo>                             
authInfo = new AtomicReference<AuthInfo>();
     private final byte[]                                                
defaultData;
     private final FailedDeleteManager                                   
failedDeleteManager;
@@ -75,6 +74,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final ACLProvider                                           
aclProvider;
     private final NamespaceFacadeCache                                  
namespaceFacadeCache;
     private final NamespaceWatcherMap                                   
namespaceWatcherMap = new NamespaceWatcherMap(this);
+    private final Object                                                       
                                        connectionLock = new Object();
 
     private volatile ExecutorService                                    
executorService;
     private final AtomicBoolean                                         
logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -151,7 +151,6 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         connectionStateManager = new ConnectionStateManager(this, 
builder.getThreadFactory());
-        connectionState = new AtomicReference<ConnectionState>(null);
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new 
AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -174,10 +173,9 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
                        @Override
                        public void stateChanged(CuratorFramework client, 
ConnectionState newState)
                        {
-                               connectionState.set(newState);
-                               synchronized(connectionState)
+                               synchronized(connectionLock)
                                {
-                                       connectionState.notifyAll();
+                                       connectionLock.notifyAll();
                                }
                        }
                });
@@ -220,7 +218,6 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         threadFactory = parent.threadFactory;
         backgroundOperations = parent.backgroundOperations;
         connectionStateManager = parent.connectionStateManager;
-        connectionState = parent.connectionState;
         defaultData = parent.defaultData;
         failedDeleteManager = parent.failedDeleteManager;
         compressionProvider = parent.compressionProvider;
@@ -890,16 +887,11 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         );
     }
     
-    public ConnectionState getCurrentConnectionState()
-    {
-       return connectionState.get();
-    }
-    
     @Override
     public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws 
InterruptedException
     {
        //Check if we're already connected
-       ConnectionState currentConnectionState = connectionState.get();
+       ConnectionState currentConnectionState = 
connectionStateManager.getCurrentConnectionState();
        if(currentConnectionState != null && 
currentConnectionState.isConnected())
        {
                return true;
@@ -910,9 +902,9 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
        
        for(;;)
        {
-               synchronized(connectionState)
+               synchronized(connectionLock)
                {
-                       currentConnectionState = connectionState.get();
+                       currentConnectionState = 
connectionStateManager.getCurrentConnectionState();
                if(currentConnectionState != null && 
currentConnectionState.isConnected())
                        {
                                return true;
@@ -930,7 +922,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
                                }                                       
                        }
                
-                       connectionState.wait(waitTime);
+                       connectionLock.wait(waitTime);
                }
        }
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 42804b8..ba29994 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -188,6 +188,11 @@ public class ConnectionStateManager implements Closeable
 
         return true;
     }
+    
+    public synchronized ConnectionState getCurrentConnectionState()
+    {
+       return currentConnectionState;
+    }
 
     private void postState(ConnectionState state)
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
new file mode 100644
index 0000000..d213d37
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
@@ -0,0 +1,53 @@
+package org.apache.curator.framework.recipes;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to allow execution of logic once a ZooKeeper connection 
becomes available.
+ *
+ */
+public class ExecuteAfterConnectionEstablished
+{
+       private final static Logger log = 
LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
+       
+       /**
+        * Spawns a new new background thread that will block until a 
connection is available and
+        * then execute the 'runAfterConnection' logic
+        * @param name The name of the spawned thread
+        * @param client The curator client
+        * @param runAfterConnection The logic to run
+        */
+       public static void executeAfterConnectionEstablishedInBackground(String 
name,
+                                                                               
                                                     final CuratorFramework 
client,
+                                                                               
                                                     final Runnable 
runAfterConnection)
+       {
+        //Block until connected
+        final ExecutorService executor = 
ThreadUtils.newSingleThreadExecutor(name);
+        executor.submit(new Runnable()
+        {
+                       
+                       @Override
+                       public void run()
+                       {
+                               try
+                               {
+                               client.blockUntilConnected();                   
        
+                               runAfterConnection.run();
+                               }
+                               catch(Exception e)
+                               {
+                                       log.error("An error occurred blocking 
until a connection is available", e);
+                               }
+                               finally
+                               {
+                                       executor.shutdown();
+                               }
+                       }
+        });
+       }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 21e8cca..13a9f21 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -27,6 +27,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.ExecuteAfterConnectionEstablished;
 import org.apache.curator.framework.recipes.locks.LockInternals;
 import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
 import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -148,14 +149,6 @@ public class LeaderLatch implements Closeable
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
         this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode 
cannot be null");
     }
-    
-    private CountDownLatch startLatch;
-    
-    public LeaderLatch(CuratorFramework client, String latchPath,
-               CountDownLatch startLatch) {
-       this(client, latchPath);
-        this.startLatch = startLatch;
-    }
 
     /**
      * Add this instance to the leadership election and attempt to acquire 
leadership.
@@ -166,30 +159,23 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Cannot be started more than once");
 
-        //Block until connected
-        final ExecutorService executor = 
ThreadUtils.newSingleThreadExecutor("");
-        executor.submit(new Runnable()
-        {
-                       
-                       @Override
+        
ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground(LeaderLatch.class.getName(),
+                       client, new Runnable()
+        {                                      
+               @Override
                        public void run()
                        {
-                       try
-                       {
-                               client.blockUntilConnected();
-                               
-                               
client.getConnectionStateListenable().addListener(listener);                    
        
-                               reset();
-                       }
-                       catch(Exception ex)
-                       {
-                               log.error("An error occurred checking resetting 
leadership.", ex);
-                       } finally {
-                               //Shutdown the executor
-                               executor.shutdown();
-                       }
+                       try
+                       {
+                               
client.getConnectionStateListenable().addListener(listener);                    
        
+                               reset();
+                       }
+                       catch(Exception ex)
+                {
+                       log.error("An error occurred checking resetting 
leadership.", ex);
+                }
                        }
-        });
+               });     
     }
 
     /**

Reply via email to