PROTON-881: Add reactor unit tests based on those in proton-c/src/tests/reactor.c
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e9d4a78d Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e9d4a78d Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e9d4a78d Branch: refs/heads/proton-j-reactor Commit: e9d4a78d294f08e7eb1bd7a3f28bd3f97ce6b9df Parents: 5748bb9 Author: Adrian Preston <[email protected]> Authored: Mon May 4 21:01:19 2015 +0100 Committer: Adrian Preston <[email protected]> Committed: Wed May 6 23:24:48 2015 +0100 ---------------------------------------------------------------------- .../org/apache/qpid/proton/reactor/Reactor.java | 4 +- .../qpid/proton/reactor/impl/AcceptorImpl.java | 6 + .../qpid/proton/reactor/impl/ReactorImpl.java | 15 ++ .../apache/qpid/proton/reactor/ReactorTest.java | 243 ++++++++++++++++++- 4 files changed, 263 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java index 68375b1..935523a 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java @@ -93,11 +93,11 @@ public interface Reactor { // pn_reactor_schedule from reactor.c public Task schedule(int delay, Handler handler); - // TODO: acceptor - // TODO: acceptorClose Connection connection(Handler handler); Acceptor acceptor(String host, int port) throws IOException; Acceptor acceptor(String host, int port, Handler handler) throws IOException; + + public void free(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java index 7084dfb..fb48df6 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java @@ -122,4 +122,10 @@ public class AcceptorImpl implements Acceptor { public void add(Handler handler) { sel.add(handler); } + + // Used for unit tests, where acceptor is bound to an ephemeral port + public int getPortNumber() throws IOException { + ServerSocketChannel ssc = (ServerSocketChannel)sel.getChannel(); + return ((InetSocketAddress)ssc.getLocalAddress()).getPort(); + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java index a3180ab..e25e813 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -117,6 +117,20 @@ public class ReactorImpl implements Reactor { wakeup = Pipe.open(); mark(); } + + @Override + public void free() { + // TODO +/* + 132 void pn_reactor_free(pn_reactor_t *reactor) { + 133 if (reactor) { + 134 pn_collector_release(reactor->collector); + 135 pn_handler_free(reactor->handler); + 136 reactor->handler = NULL; + 137 pn_decref(reactor); + 138 } + 139 } + */ /* 85 static void pn_reactor_finalize(pn_reactor_t *reactor) { 86 for (int i = 0; i < 2; i++) { @@ -133,6 +147,7 @@ public class ReactorImpl implements Reactor { 97 pn_decref(reactor->io); 98 } */ + } @Override public void attach(Object attachment) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java index 2378d5f..0bcfd80 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java @@ -1,6 +1,7 @@ package org.apache.qpid.proton.reactor; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -10,8 +11,14 @@ import java.util.ArrayList; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Event.Type; +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.reactor.impl.AcceptorImpl; import org.junit.Test; public class ReactorTest { @@ -45,7 +52,49 @@ public class ReactorTest { } /** - * Tests basic operation of the Reactor.acceptor method by creating an acceptor + * Tests adding a handler to a reactor and running the reactor. The + * expected behaviour is for the reactor to return, and a number of reactor- + * related events to have been delivered to the handler. + * @throws IOException + */ + @Test + public void handlerRun() throws IOException { + Reactor reactor = Proton.reactor(); + Handler handler = reactor.getHandler(); + assertNotNull(handler); + TestHandler testHandler = new TestHandler(); + handler.add(testHandler); + reactor.run(); + testHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL, Type.REACTOR_FINAL); + } + + /** + * Tests basic operation of the Reactor.connection method by creating a + * connection from a reactor, then running the reactor. The expected behaviour + * is for: + * <ul> + * <li>The reactor to end immediately.</li> + * <li>The handler associated with the connection receives an init event.</li> + * <li>The connection is one of the reactor's children.</li> + * </ul> + * @throws IOException + */ + @Test + public void connection() throws IOException { + Reactor reactor = Proton.reactor(); + TestHandler connectionHandler = new TestHandler(); + Connection connection = reactor.connection(connectionHandler); + assertNotNull(connection); + assertTrue("connection should be one of the reactor's children", reactor.children().contains(connection)); + TestHandler reactorHandler = new TestHandler(); + reactor.getHandler().add(reactorHandler); + reactor.run(); + reactorHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL, Type.REACTOR_FINAL); + connectionHandler.assertEvents(Type.CONNECTION_INIT); + } + + /** + * Tests operation of the Reactor.acceptor method by creating an acceptor * which is immediately closed by the reactor. The expected behaviour is for: * <ul> * <li>The reactor to end immediately (as it has no more work to process).</li> @@ -56,9 +105,9 @@ public class ReactorTest { * @throws IOException */ @Test - public void basicAcceptor() throws IOException { + public void acceptor() throws IOException { Reactor reactor = Proton.reactor(); - final Acceptor acceptor = reactor.acceptor("localhost", 0); + final Acceptor acceptor = reactor.acceptor("127.0.0.1", 0); assertNotNull(acceptor); assertTrue("acceptor should be one of the reactor's children", reactor.children().contains(acceptor)); TestHandler acceptorHandler = new TestHandler(); @@ -73,4 +122,192 @@ public class ReactorTest { acceptorHandler.assertEvents(Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL); assertFalse("acceptor should have been removed from the reactor's children", reactor.children().contains(acceptor)); } + + private static class ServerHandler extends TestHandler { + private Acceptor acceptor; + public void setAcceptor(Acceptor acceptor) { + this.acceptor = acceptor; + } + @Override + public void onConnectionRemoteOpen(Event event) { + super.onConnectionRemoteOpen(event); + event.getConnection().open(); + } + @Override + public void onConnectionRemoteClose(Event event) { + super.onConnectionRemoteClose(event); + acceptor.close(); + event.getConnection().close(); + event.getConnection().free(); + } + } + + /** + * Tests end to end behaviour of the reactor by creating an acceptor and then + * a connection (which connects to the port the acceptor is listening on). + * As soon as the connection is established, both the acceptor and connection + * are closed. The events generated by the acceptor and the connection are + * compared to a set of expected events. + * @throws IOException + */ + @Test + public void connect() throws IOException { + Reactor reactor = Proton.reactor(); + + ServerHandler sh = new ServerHandler(); + Acceptor acceptor = reactor.acceptor("127.0.0.1", 0, sh); + final int listeningPort = ((AcceptorImpl)acceptor).getPortNumber(); + sh.setAcceptor(acceptor); + + class ClientHandler extends TestHandler { + @Override + public void onConnectionInit(Event event) { + super.onConnectionInit(event); + event.getConnection().setHostname("127.0.0.1:" + listeningPort); + event.getConnection().open(); + } + @Override + public void onConnectionRemoteOpen(Event event) { + super.onConnectionRemoteOpen(event); + event.getConnection().close(); + } + @Override + public void onConnectionRemoteClose(Event event) { + super.onConnectionRemoteClose(event); + event.getConnection().free(); + } + } + ClientHandler ch = new ClientHandler(); + Connection connection = reactor.connection(ch); + + assertTrue("acceptor should be one of the reactor's children", reactor.children().contains(acceptor)); + assertTrue("connection should be one of the reactor's children", reactor.children().contains(connection)); + + reactor.run(); + + assertFalse("acceptor should have been removed from the reactor's children", reactor.children().contains(acceptor)); + assertFalse("connection should have been removed from the reactor's children", reactor.children().contains(connection)); + sh.assertEvents(Type.CONNECTION_INIT, Type.CONNECTION_BOUND, + // XXX: proton-c generates a PN_TRANSPORT event here + Type.CONNECTION_REMOTE_OPEN, Type.CONNECTION_LOCAL_OPEN, + Type.TRANSPORT, Type.CONNECTION_REMOTE_CLOSE, + Type.TRANSPORT_TAIL_CLOSED, Type.CONNECTION_LOCAL_CLOSE, + Type.TRANSPORT, Type.TRANSPORT_HEAD_CLOSED, + Type.TRANSPORT_CLOSED, Type.CONNECTION_UNBOUND, + Type.CONNECTION_FINAL); + + ch.assertEvents(Type.CONNECTION_INIT, Type.CONNECTION_LOCAL_OPEN, + Type.CONNECTION_BOUND, + // XXX: proton-c generates two PN_TRANSPORT events here + Type.CONNECTION_REMOTE_OPEN, Type.CONNECTION_LOCAL_CLOSE, + Type.TRANSPORT, Type.TRANSPORT_HEAD_CLOSED, + Type.CONNECTION_REMOTE_CLOSE, Type.TRANSPORT_TAIL_CLOSED, + Type.TRANSPORT_CLOSED, Type.CONNECTION_UNBOUND, + Type.CONNECTION_FINAL); + + } + + private static class SinkHandler extends BaseHandler { + protected int received = 0; + + @Override + public void onDelivery(Event event) { + Delivery dlv = event.getDelivery(); + if (!dlv.isPartial()) { + dlv.settle(); + ++received; + } + } + } + + private static class SourceHandler extends BaseHandler { + private int remaining; + private final int port; + + protected SourceHandler(int count, int port) { + remaining = count; + this.port = port; + } + + @Override + public void onConnectionInit(Event event) { + Connection conn = event.getConnection(); + conn.setHostname("127.0.0.1:" + port); + Session ssn = conn.session(); + Sender snd = ssn.sender("sender"); + conn.open(); + ssn.open(); + snd.open(); + } + + @Override + public void onLinkFlow(Event event) { + Sender link = (Sender)event.getLink(); + while (link.getCredit() > 0 && remaining > 0) { + Delivery dlv = link.delivery(new byte[0]); + assertNotNull(dlv); + dlv.settle(); + link.advance(); + --remaining; + } + + if (remaining == 0) { + event.getConnection().close(); + } + } + + @Override + public void onConnectionRemoteClose(Event event) { + event.getConnection().free(); + } + } + + private void transfer(int count, int window) throws IOException { + Reactor reactor = Proton.reactor(); + ServerHandler sh = new ServerHandler(); + Acceptor acceptor = reactor.acceptor("127.0.0.1", 0, sh); + sh.setAcceptor(acceptor); + sh.add(new Handshaker()); + // XXX: a window of 1 doesn't work unless the flowcontroller is + // added after the thing that settles the delivery + sh.add(new FlowController(window)); + SinkHandler snk = new SinkHandler(); + sh.add(snk); + + SourceHandler src = new SourceHandler(count, ((AcceptorImpl)acceptor).getPortNumber()); + reactor.connection(src); + + reactor.run(); + assertEquals("Did not receive the expected number of messages", count, snk.received); + } + + @Test + public void transfer_0to64_2() throws IOException { + for (int i = 0; i < 64; ++i) { + transfer(i, 2); + } + } + + @Test + public void transfer_1024_64() throws IOException { + transfer(1024, 64); + } + + @Test + public void transfer_4096_1024() throws IOException { + transfer(4*1024, 1024); + } + + @Test + public void schedule() throws IOException { + Reactor reactor = Proton.reactor(); + TestHandler reactorHandler = new TestHandler(); + reactor.getHandler().add(reactorHandler); + TestHandler taskHandler = new TestHandler(); + reactor.schedule(0, taskHandler); + reactor.run(); + reactorHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.REACTOR_QUIESCED, Type.SELECTABLE_UPDATED, + Type.SELECTABLE_FINAL, Type.REACTOR_FINAL); + taskHandler.assertEvents(Type.TIMER_TASK); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
