Author: trustin
Date: Fri Sep 28 03:52:48 2007
New Revision: 580295
URL: http://svn.apache.org/viewvc?rev=580295&view=rev
Log:
Resolved issue: DIRMINA-269 (Cancellation operation for ConnectFuture)
* Added ConnectFuture.cancel()
* Added cancellation feature to NioSocketConnector
* Other connector implementations don't need for cancellation operation
effectively, so no change has been made.
* Added AbstractIoConnector.finishSessionInitialization() to remove code
duplication
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
mina/trunk/core/src/main/java/org/apache/mina/common/ConnectFuture.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultConnectFuture.java
mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
Fri Sep 28 03:52:48 2007
@@ -91,4 +91,27 @@
*/
protected abstract ConnectFuture doConnect(SocketAddress remoteAddress,
SocketAddress localAddress);
+
+ /**
+ * Adds required internal attributes and [EMAIL PROTECTED]
IoFutureListener}s
+ * related with event notifications to the specified [EMAIL PROTECTED]
session}
+ * and [EMAIL PROTECTED] future}.
+ */
+ protected static void finishSessionInitialization(
+ final IoSession session, ConnectFuture future) {
+ // DefaultIoFilterChain will notify the connect future.
+ session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, future);
+
+ // In case that ConnectFuture.cancel() is invoked before
+ // setSession() is invoked, add a listener that closes the
+ // connection immediately on cancellation.
+ future.addListener(new IoFutureListener() {
+ public void operationComplete(IoFuture future) {
+ ConnectFuture f = (ConnectFuture) future;
+ if (f.isCanceled()) {
+ session.close();
+ }
+ }
+ });
+ }
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/ConnectFuture.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/ConnectFuture.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/ConnectFuture.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/ConnectFuture.java Fri
Sep 28 03:52:48 2007
@@ -55,6 +55,12 @@
* Returns <tt>true</tt> if the connect operation is finished successfully.
*/
boolean isConnected();
+
+ /**
+ * Returns [EMAIL PROTECTED] true} if the connect operation has been
canceled by
+ * [EMAIL PROTECTED] #cancel()} method.
+ */
+ boolean isCanceled();
/**
* Sets the newly connected session and notifies all threads waiting for
@@ -69,6 +75,12 @@
* internally. Please do not call this method directly.
*/
void setException(Throwable exception);
+
+ /**
+ * Cancels the connection attempt and notifies all threads waiting for
+ * this future.
+ */
+ void cancel();
ConnectFuture await() throws InterruptedException;
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultConnectFuture.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultConnectFuture.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultConnectFuture.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultConnectFuture.java
Fri Sep 28 03:52:48 2007
@@ -28,6 +28,9 @@
*/
public class DefaultConnectFuture extends DefaultIoFuture implements
ConnectFuture {
+
+ private static final Object CANCELED = new Object();
+
/**
* Returns a new [EMAIL PROTECTED] ConnectFuture} which is already marked
as 'failed to connect'.
*/
@@ -52,8 +55,10 @@
} else if (v instanceof Throwable) {
throw (RuntimeIoException) new RuntimeIoException(
"Failed to get the session.").initCause((Throwable) v);
- } else {
+ } else if (v instanceof IoSession) {
return (IoSession) v;
+ } else {
+ return null;
}
}
@@ -69,6 +74,10 @@
public boolean isConnected() {
return getValue() instanceof IoSession;
}
+
+ public boolean isCanceled() {
+ return getValue() == CANCELED;
+ }
public void setSession(IoSession session) {
setValue(session);
@@ -76,6 +85,10 @@
public void setException(Throwable exception) {
setValue(exception);
+ }
+
+ public void cancel() {
+ setValue(CANCELED);
}
@Override
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Fri
Sep 28 03:52:48 2007
@@ -66,7 +66,7 @@
* Creates a new instance.
*/
public DummySession() {
- // Initialize dumy service.
+ // Initialize dummy service.
IoAcceptor acceptor = new AbstractIoAcceptor(
new AbstractIoSessionConfig() {
@Override
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
Fri Sep 28 03:52:48 2007
@@ -27,7 +27,6 @@
import org.apache.mina.common.AbstractIoConnector;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.DefaultConnectFuture;
-import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoSession;
@@ -112,7 +111,6 @@
SocketAddress localAddress) {
DatagramChannel ch = null;
boolean initialized = false;
- IoSession session = null;
try {
ch = DatagramChannel.open();
ch.socket().setReuseAddress(getSessionConfig().isReuseAddress());
@@ -125,11 +123,9 @@
ch.connect(remoteAddress);
NioProcessor processor = nextProcessor();
- session = new NioDatagramSession(this, ch, processor);
+ final IoSession session = new NioDatagramSession(this, ch,
processor);
ConnectFuture future = new DefaultConnectFuture();
- // DefaultIoFilterChain will notify the connect future.
- session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, future);
-
+ finishSessionInitialization(session, future);
processor.add(session);
initialized = true;
return future;
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
Fri Sep 28 03:52:48 2007
@@ -33,7 +33,6 @@
import org.apache.mina.common.AbstractIoConnector;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.DefaultConnectFuture;
-import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.RuntimeIoException;
@@ -58,25 +57,17 @@
private static volatile int nextId = 0;
private final Object lock = new Object();
-
private final int id = nextId++;
-
private final String threadName = "SocketConnector-" + id;
-
private final Queue<ConnectionRequest> connectQueue = new
ConcurrentLinkedQueue<ConnectionRequest>();
-
+ private final Queue<ConnectionRequest> cancelQueue = new
ConcurrentLinkedQueue<ConnectionRequest>();
private final NioProcessor[] ioProcessors;
-
private final int processorCount;
-
private final Executor executor;
-
private final Selector selector;
private Worker worker;
-
private int processorDistributor = 0;
-
private int workerTimeout = 60; // 1 min.
/**
@@ -200,9 +191,8 @@
}
ConnectionRequest request = new ConnectionRequest(ch);
- startupWorker();
-
connectQueue.add(request);
+ startupWorker();
selector.wakeup();
return request;
@@ -233,6 +223,29 @@
}
}
+ private void cancelKeys() {
+ for (; ;) {
+ ConnectionRequest req = cancelQueue.poll();
+ if (req == null) {
+ break;
+ }
+
+ SocketChannel ch = req.channel;
+ SelectionKey key = ch.keyFor(selector);
+ if (key == null) {
+ continue;
+ }
+
+ key.cancel();
+
+ try {
+ ch.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+
private void processSessions(Set<SelectionKey> keys) {
for (SelectionKey key : keys) {
if (!key.isConnectable()) {
@@ -288,14 +301,11 @@
}
private void newSession(SocketChannel ch, ConnectFuture connectFuture) {
- NioSocketSession session = new NioSocketSession(this,
- nextProcessor(), ch);
-
- // Set the ConnectFuture of the specified session, which will be
- // removed and notified by AbstractIoFilterChain eventually.
- session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE,
- connectFuture);
-
+ NioSocketSession session = new NioSocketSession(
+ this, nextProcessor(), ch);
+
+ finishSessionInitialization(session, connectFuture);
+
// Forward the remaining process to the SocketIoProcessor.
session.getProcessor().add(session);
}
@@ -325,6 +335,8 @@
}
processTimedOutSessions(selector.keys());
+
+ cancelKeys();
if (selector.keys().isEmpty()) {
if (System.currentTimeMillis() - lastActive >
workerTimeout * 1000L) {
@@ -354,13 +366,20 @@
private class ConnectionRequest extends DefaultConnectFuture {
private final SocketChannel channel;
-
private final long deadline;
private ConnectionRequest(SocketChannel channel) {
this.channel = channel;
this.deadline = System.currentTimeMillis()
+ getConnectTimeoutMillis();
+ }
+
+ @Override
+ public void cancel() {
+ super.cancel();
+ cancelQueue.add(this);
+ startupWorker();
+ selector.wakeup();
}
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=580295&r1=580294&r2=580295&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
Fri Sep 28 03:52:48 2007
@@ -25,7 +25,6 @@
import java.util.Set;
import org.apache.mina.common.AbstractIoConnector;
-import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.DefaultConnectFuture;
import org.apache.mina.common.ExceptionMonitor;
@@ -82,6 +81,8 @@
VmPipeSessionImpl localSession = new VmPipeSessionImpl(this,
getListeners(), actualLocalAddress, getHandler(), entry);
+
+ finishSessionInitialization(localSession, future);
// and reclaim the local address when the connection is closed.
localSession.getCloseFuture().addListener(LOCAL_ADDRESS_RECLAIMER);
@@ -92,7 +93,6 @@
this.getFilterChainBuilder().buildFilterChain(filterChain);
// The following sentences don't throw any exceptions.
- localSession.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE,
future);
getListeners().fireSessionCreated(localSession);
IdleStatusChecker.getInstance().addSession(localSession);
} catch (Throwable t) {