Author: sjlee
Date: Fri Jun 26 20:59:15 2009
New Revision: 788870

URL: http://svn.apache.org/viewvc?rev=788870&view=rev
Log:
ASYNCWEB-33

eliminated several timing related issues:
- canceling the timeout task should be done without interrupt (serves little 
purpose)
- made the timeoutHandle variable in HttpRequestMessage an atomic reference to 
ensure correct visibility
- set the timeout handle to null at the beginning of the timeout task to 
minimize the window during which other events may get a hold of the timeout 
handle
- marked the request as close pending any time we initiate session closing
- when we check out a session from the cache, check the pending close status so 
if it has a pending close, do not use it
- in cancelTask(), use the removeTimeoutHandle() method to retrieve and remove 
the handle atomically
- moved up the Future.set() and Future.setException() operations to prior to 
the monitoring listener notification
- ensure only one callback method fires for a given request by using a flag
- eat up any Throwable that may result as invoking the callback so the flow 
continues even if callbacks throw exceptions

Modified:
    
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/HttpIoHandler.java
    
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/HttpRequestMessage.java
    
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/ResponseFuture.java
    
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/SessionCache.java

Modified: 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/HttpIoHandler.java
URL: 
http://svn.apache.org/viewvc/mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/HttpIoHandler.java?rev=788870&r1=788869&r2=788870&view=diff
==============================================================================
--- 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/HttpIoHandler.java
 (original)
+++ 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/HttpIoHandler.java
 Fri Jun 26 20:59:15 2009
@@ -64,6 +64,11 @@
     public static final String REQUEST_OUTSTANDING = "REQUEST_OUTSTANDING";
     
     /**
+     * Indicates whether the request is getting closed due to problems.
+     */
+    public static final String CLOSE_PENDING = "CLOSE_PENDING";
+    
+    /**
      * The Constant for the proxy connect status.
      */
     public static final String PROXY_CONNECT_IN_PROGRESS = 
"PROXY_CONNECT_IN_PROGRESS";
@@ -136,7 +141,6 @@
      * @param object    the {...@link HttpResponseMessage} object
      * @see 
org.apache.mina.common.IoHandlerAdapter#messageReceived(org.apache.mina.common.IoSession,java.lang.Object)
      */
-    @SuppressWarnings({"UnusedDeclaration"})
     public void messageReceived(IoSession ioSession, Object object) throws 
Exception {
         // clear the request outstanding flag on the session
         ioSession.removeAttribute(REQUEST_OUTSTANDING);
@@ -220,11 +224,12 @@
 
         cancelTasks(request);
 
-        // notify any interesting parties that this is starting 
-        client.notifyMonitoringListeners(MonitoringEvent.REQUEST_COMPLETED, 
request); 
         // complete the future which will also fire the callback
         ResponseFuture result = request.getResponseFuture();
         result.set(response);
+        
+        // notify any interesting parties that this is starting 
+        client.notifyMonitoringListeners(MonitoringEvent.REQUEST_COMPLETED, 
request); 
 
         // if we've been provided with a cache, put this session into 
         // the cache. 
@@ -243,6 +248,9 @@
      * @see 
org.apache.mina.common.IoHandlerAdapter#exceptionCaught(org.apache.mina.common.IoSession,java.lang.Throwable)
      */
     public void exceptionCaught(IoSession ioSession, Throwable throwable) 
throws Exception {
+        // mark the session as closing so it won't be used
+        ioSession.setAttribute(CLOSE_PENDING);
+       
         // clear the request outstanding flag on the session if set
         ioSession.removeAttribute(REQUEST_OUTSTANDING);
         
@@ -252,13 +260,13 @@
         HttpRequestMessage request = (HttpRequestMessage) 
ioSession.getAttribute(CURRENT_REQUEST);
         cancelTasks(request);
         
-        AsyncHttpClient client = (AsyncHttpClient) ioSession.getAttachment();
-
-        // notify any interesting parties that this is starting 
-        client.notifyMonitoringListeners(MonitoringEvent.REQUEST_FAILED, 
request); 
         // complete the future which will also fire the callback
         ResponseFuture result = request.getResponseFuture();
         result.setException(throwable);
+        
+        AsyncHttpClient client = (AsyncHttpClient) ioSession.getAttachment();
+        // notify any interesting parties that this is starting 
+        client.notifyMonitoringListeners(MonitoringEvent.REQUEST_FAILED, 
request); 
 
         //Exception is bad, so just close it up
         ioSession.close();
@@ -286,9 +294,6 @@
         }
         HttpRequestMessage request = (HttpRequestMessage) 
ioSession.getAttribute(CURRENT_REQUEST);
         cancelTasks(request);
-        AsyncHttpClient client = (AsyncHttpClient) ioSession.getAttachment();
-        // notify any interesting parties that this is starting 
-        
client.notifyMonitoringListeners(MonitoringEvent.CONNECTION_CLOSED_BY_SERVER, 
request); 
         
         // if the session is closing while the request is outstanding, it means
         // the connection is closing prematurely; we need to cause an exception
@@ -301,6 +306,10 @@
                 callback.onClosed();
             }
         }
+        
+        AsyncHttpClient client = (AsyncHttpClient) ioSession.getAttachment();
+        // notify any interesting parties that this is starting 
+        
client.notifyMonitoringListeners(MonitoringEvent.CONNECTION_CLOSED_BY_SERVER, 
request); 
     }
 
     /**
@@ -320,7 +329,7 @@
         //Start the timeout timer now if a timeout is needed and there is not 
one already in effect for this request
         if (msg.getTimeOut() > 0 && msg.getTimeoutHandle() == null) {
             TimeoutTask task = new TimeoutTask(ioSession);
-            ScheduledFuture handle = scheduler.schedule(task, 
msg.getTimeOut(), TimeUnit.MILLISECONDS);
+            ScheduledFuture<?> handle = scheduler.schedule(task, 
msg.getTimeOut(), TimeUnit.MILLISECONDS);
             msg.setTimeoutHandle(handle);
         }
     }
@@ -331,15 +340,10 @@
      * @param request the {...@link HttpRequestMessage} request
      */
     private void cancelTasks(HttpRequestMessage request) {
-        ScheduledFuture handle = request.getTimeoutHandle();
+        ScheduledFuture<?> handle = request.removeTimeoutHandle();
         if (handle != null) {
-            boolean canceled = handle.cancel(true);
-            //See if it canceled
-            if (!canceled) {
-                //Couldn't cancel it and it ran, so too late :-(
-                return;
-            }
-            request.setTimeoutHandle(null);
+            // cancel but don't interrupt
+            handle.cancel(false);
         }
     }
 
@@ -351,7 +355,7 @@
         /**
          * The session object.
          */
-        private IoSession sess;
+        private final IoSession sess;
 
         /**
          * Instantiates a new timeout task.
@@ -368,20 +372,24 @@
          * @see java.lang.Runnable#run()
          */
         public void run() {
+            // first, indicate that the request has timed out
+            sess.setAttribute(CLOSE_PENDING);
+               
                // clear the request outstanding flag on the session
                sess.removeAttribute(REQUEST_OUTSTANDING);
                
             // complete the future which will also fire the callback
             HttpRequestMessage request = 
(HttpRequestMessage)sess.getAttribute(CURRENT_REQUEST);
+            // make sure to remove the timeout handle
+            request.setTimeoutHandle(null);
+            
+            ResponseFuture result = request.getResponseFuture();
+            result.setException(new TimeoutException());
             
             AsyncHttpClient client = (AsyncHttpClient) sess.getAttachment();
-
             // notify any interesting parties that this is starting 
             client.notifyMonitoringListeners(MonitoringEvent.REQUEST_TIMEOUT, 
request); 
             
-            ResponseFuture result = request.getResponseFuture();
-            result.setException(new TimeoutException());
-            
             //Close the session, its no good since the server is timing out
             sess.close();
         }

Modified: 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/HttpRequestMessage.java
URL: 
http://svn.apache.org/viewvc/mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/HttpRequestMessage.java?rev=788870&r1=788869&r2=788870&view=diff
==============================================================================
--- 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/HttpRequestMessage.java
 (original)
+++ 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/HttpRequestMessage.java
 Fri Jun 26 20:59:15 2009
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.ssl.SSLContext;
 
@@ -116,7 +117,8 @@
     /**
      * The timeout handle.
      */
-    private ScheduledFuture timeoutHandle;
+    private final AtomicReference<ScheduledFuture<?>> timeoutHandle =
+            new AtomicReference<ScheduledFuture<?>>();
 
     /**
      * The response future.
@@ -202,8 +204,8 @@
      *
      * @return the timeout <code>ScheduledFuture</code> handle
      */
-    protected ScheduledFuture getTimeoutHandle() {
-        return timeoutHandle;
+    protected ScheduledFuture<?> getTimeoutHandle() {
+        return timeoutHandle.get();
     }
 
     /**
@@ -211,8 +213,17 @@
      *
      * @param timeoutHandle the new <code>ScheduledFuture</code> timeout handle
      */
-    protected void setTimeoutHandle(ScheduledFuture timeoutHandle) {
-        this.timeoutHandle = timeoutHandle;
+    protected void setTimeoutHandle(ScheduledFuture<?> timeoutHandle) {
+        this.timeoutHandle.set(timeoutHandle);
+    }
+    
+    /**
+     * Removes the timeout handle and returns it atomically.
+     * 
+     * @return the timeout <code>ScheduledFuture</code> handle
+     */
+    protected ScheduledFuture<?> removeTimeoutHandle() {
+        return timeoutHandle.getAndSet(null);
     }
 
     /**

Modified: 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/ResponseFuture.java
URL: 
http://svn.apache.org/viewvc/mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/ResponseFuture.java?rev=788870&r1=788869&r2=788870&view=diff
==============================================================================
--- 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/ResponseFuture.java
 (original)
+++ 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/ResponseFuture.java
 Fri Jun 26 20:59:15 2009
@@ -24,6 +24,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asyncweb.client.AsyncHttpClientCallback;
 
@@ -51,6 +52,12 @@
     private final AsyncHttpClientCallback callback;
     
     /**
+     * Indicates whether set/setException was done on the future.  It is used
+     * to prevent multiple callback events from going out.
+     */
+    private final AtomicBoolean setDone = new AtomicBoolean();
+    
+    /**
      * Constructor.  Optionally one can pass in the completion queue and/or the
      * callback object.
      * 
@@ -95,9 +102,14 @@
             // fire the callback before completing the future to 
             // ensure everything gets handled before the future gets 
             // completed. 
-            if (callback != null) {
-                callback.onResponse(v);
+            // run the callback only if the future is not already complete
+            if (setDone.compareAndSet(false, true)) {
+               if (callback != null) {
+                   callback.onResponse(v);
+                }
             }
+        } catch (Throwable th) {
+            // we need to tolerate exceptions coming from the callback
         } finally {
             super.set(v);
         }
@@ -116,13 +128,18 @@
             // fire the callback before completing the future to 
             // ensure everything gets handled before the future gets 
             // completed. 
-            if (callback != null) {
-                if (t instanceof TimeoutException) {
-                    callback.onTimeout();
-                } else {
-                    callback.onException(t);
+            // run the callback only if the future is not already complete
+            if (setDone.compareAndSet(false, true)) {
+                if (callback != null) {
+                    if (t instanceof TimeoutException) {
+                        callback.onTimeout();
+                    } else {
+                        callback.onException(t);
+                    }
                 }
             }
+        } catch (Throwable th) {
+            // we need to tolerate exceptions coming from the callback
         } finally {
             super.setException(t);
         }

Modified: 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/SessionCache.java
URL: 
http://svn.apache.org/viewvc/mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/SessionCache.java?rev=788870&r1=788869&r2=788870&view=diff
==============================================================================
--- 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/SessionCache.java
 (original)
+++ 
mina/asyncweb/branches/1.0-mina1/client/src/main/java/org/apache/asyncweb/client/codec/SessionCache.java
 Fri Jun 26 20:59:15 2009
@@ -60,7 +60,9 @@
         IoSession cached = null;
         while ((cached = queue.poll()) != null) {
             // see if the session is usable
-            if (cached.isConnected() && !cached.isClosing()) {
+               // ensure the connection is not closing nor is there a pending 
close
+            if (cached.isConnected() && 
+                    !cached.isClosing()  && 
cached.getAttribute(HttpIoHandler.CLOSE_PENDING) == null) {
                 return cached;
             }
         }


Reply via email to