Repository: curator
Updated Branches:
  refs/heads/CURATOR-88 8f6edd706 -> ce456d2df


expiration should now be on a pre-object basis


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ce456d2d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ce456d2d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ce456d2d

Branch: refs/heads/CURATOR-88
Commit: ce456d2df24c9c64108d1c40e111b66a7e00e3ca
Parents: 8f6edd7
Author: randgalt <randg...@apache.org>
Authored: Sat Feb 15 19:39:38 2014 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sat Feb 15 19:39:38 2014 -0500

----------------------------------------------------------------------
 .../curator/x/rest/CuratorRestContext.java      | 38 +-------------
 .../curator/x/rest/api/ClientResource.java      | 24 ++++-----
 .../apache/curator/x/rest/api/Constants.java    |  2 +-
 .../org/apache/curator/x/rest/api/Session.java  | 55 +++++++++++++-------
 4 files changed, 50 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ce456d2d/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java
----------------------------------------------------------------------
diff --git 
a/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java
 
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java
index cca2e89..a07197e 100644
--- 
a/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java
+++ 
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java
@@ -20,14 +20,10 @@ package org.apache.curator.x.rest;
 
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.x.rest.api.Session;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -35,7 +31,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class CuratorRestContext implements Closeable
 {
-    private final Logger log = LoggerFactory.getLogger(getClass());
     private final Session session = new Session();
     private final ObjectMapper mapper = new ObjectMapper();
     private final ObjectWriter writer = mapper.writer();
@@ -43,17 +38,6 @@ public class CuratorRestContext implements Closeable
     private final int sessionLengthMs;
     private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
     private final ScheduledExecutorService executorService = 
ThreadUtils.newSingleThreadScheduledExecutor("CuratorRestContext");
-    private final ConnectionStateListener connectionStateListener = new 
ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState 
newState)
-        {
-            if ( newState == ConnectionState.LOST )
-            {
-                handleLostConnection();
-            }
-        }
-    };
 
     private enum State
     {
@@ -77,7 +61,6 @@ public class CuratorRestContext implements Closeable
     public Session getSession()
     {
         Preconditions.checkState(state.get() == State.STARTED, "Not started");
-        session.updateLastUse();
         return session;
     }
 
@@ -85,35 +68,22 @@ public class CuratorRestContext implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Already started");
 
-        
client.getConnectionStateListenable().addListener(connectionStateListener);
-
         Runnable runner = new Runnable()
         {
             @Override
             public void run()
             {
-                checkSession();
+                session.checkExpiredThings(sessionLengthMs);
             }
         };
         executorService.scheduleAtFixedRate(runner, sessionLengthMs, 
sessionLengthMs, TimeUnit.MILLISECONDS);
     }
 
-    private void checkSession()
-    {
-        long elapsedSinceLastUse = System.currentTimeMillis() - 
session.getLastUseMs();
-        if ( elapsedSinceLastUse > sessionLengthMs )
-        {
-            log.warn("Session has expired. Closing all open recipes. 
Milliseconds since last ping: " + elapsedSinceLastUse);
-            session.closeThings();
-        }
-    }
-
     @Override
     public void close()
     {
         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
         {
-            
client.getConnectionStateListenable().removeListener(connectionStateListener);
             executorService.shutdownNow();
             session.close();
         }
@@ -128,10 +98,4 @@ public class CuratorRestContext implements Closeable
     {
         return writer;
     }
-
-    private void handleLostConnection()
-    {
-        log.warn("Connection lost - closing all REST sessions");
-        session.closeThings();
-    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/ce456d2d/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
----------------------------------------------------------------------
diff --git 
a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
 
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
index 94009f0..26708cf 100644
--- 
a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
+++ 
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
@@ -34,6 +34,7 @@ import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
@@ -64,14 +65,23 @@ public class ClientResource
         return 
Response.ok(context.getWriter().writeValueAsString(node)).build();
     }
 
+    @GET
+    @Path("/touch/{id}")
+    public Response touchThing(@PathParam("id") String id)
+    {
+        if ( !context.getSession().updateThingLastUse(id) )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+        return Response.ok().build();
+    }
+
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/get-children")
     public Response getChildren(final GetChildrenSpec getChildrenSpec) throws 
Exception
     {
-        context.getSession();   // update last use
-
         Object builder = context.getClient().getChildren();
         if ( getChildrenSpec.isWatched() )
         {
@@ -110,8 +120,6 @@ public class ClientResource
     @Path("/delete")
     public Response delete(final DeleteSpec deleteSpec) throws Exception
     {
-        context.getSession();   // update last use
-
         Object builder = context.getClient().delete();
         if ( deleteSpec.isGuaranteed() )
         {
@@ -135,8 +143,6 @@ public class ClientResource
     @Path("/set-data")
     public Response setData(final SetDataSpec setDataSpec) throws Exception
     {
-        context.getSession();   // update last use
-
         Object builder = context.getClient().setData();
         if ( setDataSpec.isCompressed() )
         {
@@ -164,8 +170,6 @@ public class ClientResource
     @Path("/create")
     public Response create(final CreateSpec createSpec) throws Exception
     {
-        context.getSession();   // update last use
-
         Object builder = context.getClient().create();
         if ( createSpec.isCreatingParentsIfNeeded() )
         {
@@ -200,8 +204,6 @@ public class ClientResource
     @Path("/get-data")
     public Response getData(final GetDataSpec getDataSpec) throws Exception
     {
-        context.getSession();   // update last use
-
         Object builder = context.getClient().getData();
         if ( getDataSpec.isWatched() )
         {
@@ -243,8 +245,6 @@ public class ClientResource
     @Path("/exists")
     public Response exists(final ExistsSpec existsSpec) throws Exception
     {
-        context.getSession();   // update last use
-
         Object builder = context.getClient().checkExists();
         if ( existsSpec.isWatched() )
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/ce456d2d/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java
----------------------------------------------------------------------
diff --git 
a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java 
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java
index b12d1b1..5ee1982 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java
@@ -38,7 +38,7 @@ class Constants
     static final String PATH_CACHE = "path-cache";
     static final String NODE_CACHE = "node-cache";
     static final String LEADER = "leader";
-    static final String CLOSING = "closing";
+    static final String EXPIRED = "expired";
 
     static ObjectNode makeIdNode(CuratorRestContext context, String id)
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/ce456d2d/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
----------------------------------------------------------------------
diff --git 
a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java 
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
index 2d2cd8a..16478ce 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
@@ -35,13 +35,13 @@ public class Session implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final Map<String, Entry> things = Maps.newConcurrentMap();
-    private final AtomicLong lastUseMs = new 
AtomicLong(System.currentTimeMillis());
     private final BlockingQueue<StatusMessage> messages = 
Queues.newLinkedBlockingQueue();
 
     private static class Entry
     {
         final Object thing;
         final Closer closer;
+        final AtomicLong lastUseMs = new 
AtomicLong(System.currentTimeMillis());
 
         private Entry(Object thing, Closer closer)
         {
@@ -50,26 +50,9 @@ public class Session implements Closeable
         }
     }
 
-    public void updateLastUse()
-    {
-        lastUseMs.set(System.currentTimeMillis());
-    }
-
-    public long getLastUseMs()
-    {
-        return lastUseMs.get();
-    }
-
     @Override
-    public void close()
+    public synchronized void close()
     {
-        closeThings();
-    }
-
-    public void closeThings()
-    {
-        pushMessage(new StatusMessage(Constants.CLOSING, "", "", ""));
-        
         for ( Map.Entry<String, Entry> mapEntry : things.entrySet() )
         {
             Entry entry = mapEntry.getValue();
@@ -80,6 +63,29 @@ public class Session implements Closeable
                 entry.closer.close(entry.thing);    // lack of generics is 
safe because addThing() is type-safe
             }
         }
+        things.clear();
+    }
+
+    public synchronized void checkExpiredThings(long sessionLengthMs)
+    {
+        for ( Map.Entry<String, Entry> mapEntry : things.entrySet() )
+        {
+            Entry entry = mapEntry.getValue();
+            long elapsedSinceLastUse = System.currentTimeMillis() - 
entry.lastUseMs.get();
+            if ( elapsedSinceLastUse > sessionLengthMs )
+            {
+                String id = mapEntry.getKey();
+                pushMessage(new StatusMessage(Constants.EXPIRED, id, 
"expired", entry.thing.getClass().getName()));
+                log.warn(String.format("Expiring object. Elapsed time: %d, id: 
%s, Class: %s", elapsedSinceLastUse, id, entry.thing.getClass().getName()));
+
+                things.remove(id);
+                if ( entry.closer != null )
+                {
+                    //noinspection unchecked
+                    entry.closer.close(entry.thing);    // lack of generics is 
safe because addThing() is type-safe
+                }
+            }
+        }
     }
 
     void pushMessage(StatusMessage message)
@@ -94,6 +100,17 @@ public class Session implements Closeable
         return localMessages;
     }
 
+    boolean updateThingLastUse(String id)
+    {
+        Entry entry = things.get(id);
+        if ( entry != null )
+        {
+            entry.lastUseMs.set(System.currentTimeMillis());
+            return true;
+        }
+        return false;
+    }
+
     <T> String addThing(T thing, Closer<T> closer)
     {
         String id = Constants.newId();

Reply via email to