Author: trustin
Date: Fri Sep 28 03:52:48 2007
New Revision: 580295

URL: http://svn.apache.org/viewvc?rev=580295&view=rev
Log:
Resolved issue: DIRMINA-269 (Cancellation operation for ConnectFuture)
* Added ConnectFuture.cancel()
* Added cancellation feature to NioSocketConnector
* Other connector implementations don't need for cancellation operation 
effectively, so no change has been made.
* Added AbstractIoConnector.finishSessionInitialization() to remove code 
duplication


Modified:
    
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/common/ConnectFuture.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultConnectFuture.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java 
Fri Sep 28 03:52:48 2007
@@ -91,4 +91,27 @@
      */
     protected abstract ConnectFuture doConnect(SocketAddress remoteAddress,
             SocketAddress localAddress);
+    
+    /**
+     * Adds required internal attributes and [EMAIL PROTECTED] 
IoFutureListener}s
+     * related with event notifications to the specified [EMAIL PROTECTED] 
session}
+     * and [EMAIL PROTECTED] future}.
+     */
+    protected static void finishSessionInitialization(
+            final IoSession session, ConnectFuture future) {
+        // DefaultIoFilterChain will notify the connect future.
+        session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, future);
+
+        // In case that ConnectFuture.cancel() is invoked before
+        // setSession() is invoked, add a listener that closes the
+        // connection immediately on cancellation.
+        future.addListener(new IoFutureListener() {
+            public void operationComplete(IoFuture future) {
+                ConnectFuture f = (ConnectFuture) future;
+                if (f.isCanceled()) {
+                    session.close();
+                }
+            }
+        });
+    }
 }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/ConnectFuture.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/ConnectFuture.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/ConnectFuture.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/ConnectFuture.java Fri 
Sep 28 03:52:48 2007
@@ -55,6 +55,12 @@
      * Returns <tt>true</tt> if the connect operation is finished successfully.
      */
     boolean isConnected();
+    
+    /**
+     * Returns [EMAIL PROTECTED] true} if the connect operation has been 
canceled by
+     * [EMAIL PROTECTED] #cancel()} method.
+     */
+    boolean isCanceled();
 
     /**
      * Sets the newly connected session and notifies all threads waiting for
@@ -69,6 +75,12 @@
      * internally.  Please do not call this method directly.
      */
     void setException(Throwable exception);
+    
+    /**
+     * Cancels the connection attempt and notifies all threads waiting for
+     * this future.
+     */
+    void cancel();
 
     ConnectFuture await() throws InterruptedException;
 

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultConnectFuture.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultConnectFuture.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultConnectFuture.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultConnectFuture.java 
Fri Sep 28 03:52:48 2007
@@ -28,6 +28,9 @@
  */
 public class DefaultConnectFuture extends DefaultIoFuture implements
         ConnectFuture {
+    
+    private static final Object CANCELED = new Object();
+    
     /**
      * Returns a new [EMAIL PROTECTED] ConnectFuture} which is already marked 
as 'failed to connect'.
      */
@@ -52,8 +55,10 @@
         } else if (v instanceof Throwable) {
             throw (RuntimeIoException) new RuntimeIoException(
                     "Failed to get the session.").initCause((Throwable) v);
-        } else {
+        } else if (v instanceof IoSession) {
             return (IoSession) v;
+        } else {
+            return null;
         }
     }
 
@@ -69,6 +74,10 @@
     public boolean isConnected() {
         return getValue() instanceof IoSession;
     }
+    
+    public boolean isCanceled() {
+        return getValue() == CANCELED;
+    }
 
     public void setSession(IoSession session) {
         setValue(session);
@@ -76,6 +85,10 @@
 
     public void setException(Throwable exception) {
         setValue(exception);
+    }
+    
+    public void cancel() {
+        setValue(CANCELED);
     }
 
     @Override

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Fri 
Sep 28 03:52:48 2007
@@ -66,7 +66,7 @@
      * Creates a new instance.
      */
     public DummySession() {
-        // Initialize dumy service.
+        // Initialize dummy service.
         IoAcceptor acceptor = new AbstractIoAcceptor(
                 new AbstractIoSessionConfig() {
                     @Override

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
 Fri Sep 28 03:52:48 2007
@@ -27,7 +27,6 @@
 import org.apache.mina.common.AbstractIoConnector;
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.DefaultConnectFuture;
-import org.apache.mina.common.DefaultIoFilterChain;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoSession;
@@ -112,7 +111,6 @@
                                       SocketAddress localAddress) {
         DatagramChannel ch = null;
         boolean initialized = false;
-        IoSession session = null;
         try {
             ch = DatagramChannel.open();
             ch.socket().setReuseAddress(getSessionConfig().isReuseAddress());
@@ -125,11 +123,9 @@
             ch.connect(remoteAddress);
 
             NioProcessor processor = nextProcessor();
-            session = new NioDatagramSession(this, ch, processor);
+            final IoSession session = new NioDatagramSession(this, ch, 
processor);
             ConnectFuture future = new DefaultConnectFuture();
-            // DefaultIoFilterChain will notify the connect future.
-            session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, future);
-
+            finishSessionInitialization(session, future);
             processor.add(session);
             initialized = true;
             return future;

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
 Fri Sep 28 03:52:48 2007
@@ -33,7 +33,6 @@
 import org.apache.mina.common.AbstractIoConnector;
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.DefaultConnectFuture;
-import org.apache.mina.common.DefaultIoFilterChain;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.RuntimeIoException;
@@ -58,25 +57,17 @@
     private static volatile int nextId = 0;
 
     private final Object lock = new Object();
-
     private final int id = nextId++;
-
     private final String threadName = "SocketConnector-" + id;
-
     private final Queue<ConnectionRequest> connectQueue = new 
ConcurrentLinkedQueue<ConnectionRequest>();
-
+    private final Queue<ConnectionRequest> cancelQueue = new 
ConcurrentLinkedQueue<ConnectionRequest>();
     private final NioProcessor[] ioProcessors;
-
     private final int processorCount;
-
     private final Executor executor;
-
     private final Selector selector;
 
     private Worker worker;
-
     private int processorDistributor = 0;
-
     private int workerTimeout = 60; // 1 min.
 
     /**
@@ -200,9 +191,8 @@
         }
 
         ConnectionRequest request = new ConnectionRequest(ch);
-        startupWorker();
-
         connectQueue.add(request);
+        startupWorker();
         selector.wakeup();
 
         return request;
@@ -233,6 +223,29 @@
         }
     }
 
+    private void cancelKeys() {
+        for (; ;) {
+            ConnectionRequest req = cancelQueue.poll();
+            if (req == null) {
+                break;
+            }
+
+            SocketChannel ch = req.channel;
+            SelectionKey key = ch.keyFor(selector);
+            if (key == null) {
+                continue;
+            }
+            
+            key.cancel();
+            
+            try {
+                ch.close();
+            } catch (IOException e) {
+                ExceptionMonitor.getInstance().exceptionCaught(e);
+            }
+        }
+    }
+
     private void processSessions(Set<SelectionKey> keys) {
         for (SelectionKey key : keys) {
             if (!key.isConnectable()) {
@@ -288,14 +301,11 @@
     }
 
     private void newSession(SocketChannel ch, ConnectFuture connectFuture) {
-        NioSocketSession session = new NioSocketSession(this,
-                nextProcessor(), ch);
-
-        // Set the ConnectFuture of the specified session, which will be
-        // removed and notified by AbstractIoFilterChain eventually.
-        session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE,
-                connectFuture);
-
+        NioSocketSession session = new NioSocketSession(
+                this, nextProcessor(), ch);
+        
+        finishSessionInitialization(session, connectFuture);
+        
         // Forward the remaining process to the SocketIoProcessor.
         session.getProcessor().add(session);
     }
@@ -325,6 +335,8 @@
                     }
 
                     processTimedOutSessions(selector.keys());
+                    
+                    cancelKeys();
 
                     if (selector.keys().isEmpty()) {
                         if (System.currentTimeMillis() - lastActive > 
workerTimeout * 1000L) {
@@ -354,13 +366,20 @@
 
     private class ConnectionRequest extends DefaultConnectFuture {
         private final SocketChannel channel;
-
         private final long deadline;
 
         private ConnectionRequest(SocketChannel channel) {
             this.channel = channel;
             this.deadline = System.currentTimeMillis()
                     + getConnectTimeoutMillis();
+        }
+
+        @Override
+        public void cancel() {
+            super.cancel();
+            cancelQueue.add(this);
+            startupWorker();
+            selector.wakeup();
         }
     }
 }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
 Fri Sep 28 03:52:48 2007
@@ -25,7 +25,6 @@
 import java.util.Set;
 
 import org.apache.mina.common.AbstractIoConnector;
-import org.apache.mina.common.DefaultIoFilterChain;
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.DefaultConnectFuture;
 import org.apache.mina.common.ExceptionMonitor;
@@ -82,6 +81,8 @@
 
         VmPipeSessionImpl localSession = new VmPipeSessionImpl(this,
                 getListeners(), actualLocalAddress, getHandler(), entry);
+        
+        finishSessionInitialization(localSession, future);
 
         // and reclaim the local address when the connection is closed.
         localSession.getCloseFuture().addListener(LOCAL_ADDRESS_RECLAIMER);
@@ -92,7 +93,6 @@
             this.getFilterChainBuilder().buildFilterChain(filterChain);
 
             // The following sentences don't throw any exceptions.
-            localSession.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, 
future);
             getListeners().fireSessionCreated(localSession);
             IdleStatusChecker.getInstance().addSession(localSession);
         } catch (Throwable t) {


Reply via email to