PROTON-881: Make connect a non-blocking operation Previously connecting was a blocking operation which meant that if the server being connected to took a long time to accept the connection this blocked all other work in an instance of the Reactor.
This commit makes connect a non-blocking operation, allowing the reactor to continue processing other work while the connection is established (or not). Unfortunately, I've not found a satisfactory way to test this behavior in the test suite - because Java never blocks during connect if it is using the loopback adapter. Instead, to test the non-blocking connect code path, I had to configure firewall rules to drop all packets sent to a particular port. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e2d23691 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e2d23691 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e2d23691 Branch: refs/heads/master Commit: e2d23691541b09f4efc59a32705a5306b179a0b0 Parents: 46edaeb Author: Adrian Preston <prest...@uk.ibm.com> Authored: Fri Jun 26 23:58:21 2015 +0100 Committer: Rafael Schloming <r...@alum.mit.edu> Committed: Sun Jul 5 19:57:39 2015 -0400 ---------------------------------------------------------------------- .../qpid/proton/reactor/impl/IOHandler.java | 1 + .../qpid/proton/reactor/impl/SelectorImpl.java | 70 ++++++++++++++++---- .../apache/qpid/proton/reactor/ReactorTest.java | 30 +++++++++ 3 files changed, 89 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e2d23691/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java index f810742..39d840e 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java @@ -103,6 +103,7 @@ public class IOHandler extends BaseHandler { Socket socket = null; // In this case, 'null' is the proton-j equivalent of PN_INVALID_SOCKET try { SocketChannel socketChannel = ((ReactorImpl)reactor).getIO().socketChannel(); + socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress(hostname, port)); socket = socketChannel.socket(); } catch(IOException ioException) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e2d23691/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java index 1145158..5ef74e7 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java @@ -24,9 +24,13 @@ package org.apache.qpid.proton.reactor.impl; import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.util.HashSet; import java.util.Iterator; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.reactor.Selectable; import org.apache.qpid.proton.reactor.Selector; @@ -59,14 +63,19 @@ class SelectorImpl implements Selector { public void update(Selectable selectable) { if (selectable.getChannel() != null) { int interestedOps = 0; - if (selectable.isReading()) { - if (selectable.getChannel() instanceof ServerSocketChannel) { - interestedOps |= SelectionKey.OP_ACCEPT; - } else { - interestedOps |= SelectionKey.OP_READ; + if (selectable.getChannel() instanceof SocketChannel && + ((SocketChannel)selectable.getChannel()).isConnectionPending()) { + interestedOps |= SelectionKey.OP_CONNECT; + } else { + if (selectable.isReading()) { + if (selectable.getChannel() instanceof ServerSocketChannel) { + interestedOps |= SelectionKey.OP_ACCEPT; + } else { + interestedOps |= SelectionKey.OP_READ; + } } + if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE; } - if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE; SelectionKey key = selectable.getChannel().keyFor(selector); key.interestOps(interestedOps); } @@ -76,14 +85,18 @@ class SelectorImpl implements Selector { public void remove(Selectable selectable) { if (selectable.getChannel() != null) { SelectionKey key = selectable.getChannel().keyFor(selector); - key.cancel(); - key.attach(null); + if (key != null) { + key.cancel(); + key.attach(null); + } } selectables.remove(selectable); } @Override public void select(long timeout) throws IOException { + + long now = System.currentTimeMillis(); if (timeout > 0) { long deadline = 0; for (Selectable selectable : selectables) { // TODO: this differs from the C code which requires a call to update() to make deadline changes take affect @@ -94,7 +107,6 @@ class SelectorImpl implements Selector { } if (deadline > 0) { - long now = System.currentTimeMillis(); long delta = deadline - now; if (delta < 0) { timeout = 0; @@ -104,17 +116,51 @@ class SelectorImpl implements Selector { } } + error.clear(); + + long awoken = 0; if (timeout > 0) { - selector.select(timeout); + long remainingTimeout = timeout; + while(remainingTimeout > 0) { + selector.select(remainingTimeout); + awoken = System.currentTimeMillis(); + + for (Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); iterator.hasNext();) { + SelectionKey key = iterator.next(); + if (key.isConnectable()) { + try { + ((SocketChannel)key.channel()).finishConnect(); + update((Selectable)key.attachment()); + } catch(IOException ioException) { + Selectable selectable = (Selectable)key.attachment(); + ErrorCondition condition = new ErrorCondition(); + condition.setCondition(Symbol.getSymbol("proton:io")); + condition.setDescription(ioException.getMessage()); + Transport transport = selectable.getTransport(); + if (transport != null) { + transport.setCondition(condition); + transport.close_tail(); + transport.close_head(); + transport.pop(transport.pending()); + } + error.add(selectable); + } + iterator.remove(); + } + } + if (!selector.selectedKeys().isEmpty()) { + break; + } + remainingTimeout = remainingTimeout - (awoken - now); + } } else { selector.selectNow(); + awoken = System.currentTimeMillis(); } - long awoken = System.currentTimeMillis(); readable.clear(); writeable.clear(); expired.clear(); - error.clear(); // TODO: nothing ever gets put in here... for (SelectionKey key : selector.selectedKeys()) { Selectable selectable = (Selectable)key.attachment(); if (key.isReadable()) readable.add(selectable); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e2d23691/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java index 61e2761..2d81666 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java @@ -29,6 +29,7 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.net.ServerSocket; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -563,4 +564,33 @@ public class ReactorTest { assertReactorRunBarfsOnHandler(reactor, expectedBarf, expectedHandler); reactor.free(); } + + @Test + public void connectionRefused() throws IOException { + final ServerSocket serverSocket = new ServerSocket(0, 0); + + class ConnectionHandler extends TestHandler { + @Override + public void onConnectionInit(Event event) { + super.onConnectionInit(event); + Connection connection = event.getConnection(); + connection.setHostname("127.0.0.1:" + serverSocket.getLocalPort()); + connection.open(); + try { + serverSocket.close(); + } catch(IOException e) { + AssertionFailedError afe = new AssertionFailedError(); + afe.initCause(e); + throw afe; + } + } + } + TestHandler connectionHandler = new ConnectionHandler(); + reactor.connection(connectionHandler); + reactor.run(); + reactor.free(); + serverSocket.close(); + connectionHandler.assertEvents(Type.CONNECTION_INIT, Type.CONNECTION_LOCAL_OPEN, Type.CONNECTION_BOUND, Type.TRANSPORT_ERROR, Type.TRANSPORT_TAIL_CLOSED, + Type.TRANSPORT_HEAD_CLOSED, Type.TRANSPORT_CLOSED, Type.CONNECTION_UNBOUND, Type.TRANSPORT); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org