PROTON-881: Add error handling for "accept socket connection" path through Acceptor
Adds error handling for various problems that can occur when accepting a connection (to a listening socket). Also adds unittests that cover these cases. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5748bb98 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5748bb98 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5748bb98 Branch: refs/heads/master Commit: 5748bb9880432bab64c42d34f4d7277163077427 Parents: 51529f6 Author: Adrian Preston <prest...@uk.ibm.com> Authored: Sat May 2 22:30:40 2015 +0100 Committer: Adrian Preston <prest...@uk.ibm.com> Committed: Wed May 6 23:24:41 2015 +0100 ---------------------------------------------------------------------- .../apache/qpid/proton/reactor/Acceptor.java | 5 +- .../qpid/proton/reactor/impl/AcceptorImpl.java | 21 ++++-- .../qpid/proton/reactor/impl/ReactorImpl.java | 16 +++-- .../reactor/impl/ReactorInternalException.java | 23 ++++++ .../qpid/proton/reactor/impl/SelectorImpl.java | 2 +- .../apache/qpid/proton/reactor/ReactorTest.java | 76 ++++++++++++++++++++ .../proton/reactor/impl/AcceptorImplTest.java | 74 +++++++++++++++++++ 7 files changed, 205 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java index 91b20a3..f2a320b 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java @@ -21,7 +21,10 @@ package org.apache.qpid.proton.reactor; -public interface Acceptor { +import org.apache.qpid.proton.engine.Handler; +public interface Acceptor extends ReactorChild { + + void add(Handler handler); void close(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java index efe0eb4..7084dfb 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java @@ -45,6 +45,9 @@ public class AcceptorImpl implements Acceptor { Reactor reactor = selectable.getReactor(); try { SocketChannel socketChannel = ((ServerSocketChannel)selectable.getChannel()).accept(); + if (socketChannel == null) { + throw new ReactorInternalException("Selectable readable, but no socket to accept"); + } Handler handler = (Handler)selectable.getAttachment(); if (handler == null) { // TODO: set selectable.getAttachment() to null? @@ -61,8 +64,7 @@ public class AcceptorImpl implements Acceptor { trans.bind(conn); IOHandler.selectableTransport(reactor, socketChannel.socket(), trans); // TODO: could we pass in a channel object instead of doing socketChannel.socket()? } catch(IOException ioException) { - ioException.printStackTrace(); - // TODO: what do we do with this exception? + sel.error(); } } } @@ -81,10 +83,16 @@ public class AcceptorImpl implements Acceptor { private final Selectable sel; + // Split out from AcceptorImpl to make it easier for unittests to mock this class + // without having to open an actual port. + protected ServerSocketChannel openServerSocket() throws IOException { + return ServerSocketChannel.open(); + } + protected AcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException { - ServerSocketChannel ssc = ServerSocketChannel.open(); + ServerSocketChannel ssc = openServerSocket(); ssc.bind(new InetSocketAddress(host, port)); - sel = reactor.selectable(); + sel = ((ReactorImpl)reactor).selectable(this); sel.setChannel(ssc); sel.onReadable(new AcceptorReadable()); sel.onFinalize(new AcceptorFinalize()); // TODO: currently, this is not called from anywhere!! @@ -109,4 +117,9 @@ public class AcceptorImpl implements Acceptor { reactor.update(sel); } } + + @Override + public void add(Handler handler) { + sel.add(handler); + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java index 684d8f8..a3180ab 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -195,12 +195,14 @@ public class ReactorImpl implements Reactor { private class ReleaseCallback implements Callback { private final ReactorImpl reactor; - public ReleaseCallback(ReactorImpl reactor) { + private final ReactorChild child; + public ReleaseCallback(ReactorImpl reactor, ReactorChild child) { this.reactor = reactor; + this.child = child; } @Override public void run(Selectable selectable) { - if (reactor.children.remove(selectable)) { + if (reactor.children.remove(child)) { --reactor.selectables; } } @@ -208,18 +210,20 @@ public class ReactorImpl implements Reactor { @Override public Selectable selectable() { + return selectable(null); + } + + public Selectable selectable(ReactorChild child) { Selectable result = new SelectableImpl(); result.setCollector(collector); collector.put(Type.SELECTABLE_INIT, result); result.setReactor(this); - children.add(result); - result.onRelease(new ReleaseCallback(this)); + children.add(child == null ? result : child); + result.onRelease(new ReleaseCallback(this, child == null ? result : child)); ++selectables; return result; } - - @Override public void update(Selectable selectable) { SelectableImpl selectableImpl = (SelectableImpl)selectable; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java new file mode 100644 index 0000000..d8d67ad --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java @@ -0,0 +1,23 @@ +package org.apache.qpid.proton.reactor.impl; + +/** + * Thrown by the reactor when it encounters an internal error condition. + * This is analogous to an assertion failure in the proton-c reactor + * implementation. + */ +class ReactorInternalException extends RuntimeException { + + private static final long serialVersionUID = 8979674526584642454L; + + protected ReactorInternalException(String msg) { + super(msg); + } + + protected ReactorInternalException(Throwable cause) { + super(cause); + } + + protected ReactorInternalException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java index e4e284e..a53be7d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java @@ -40,7 +40,7 @@ public class SelectorImpl implements Selector { private final HashSet<Selectable> error = new HashSet<Selectable>(); public SelectorImpl() throws IOException { - selector = java.nio.channels.Selector.open(); + selector = java.nio.channels.Selector.open(); // TODO: need to ensure we close this somewhere... } @Override http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java new file mode 100644 index 0000000..2378d5f --- /dev/null +++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java @@ -0,0 +1,76 @@ +package org.apache.qpid.proton.reactor; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Event.Type; +import org.junit.Test; + +public class ReactorTest { + + /** + * Tests that creating a reactor and running it: + * <ul> + * <li>Doesn't throw any exceptions.</li> + * <li>Returns immediately from the run method (as there is no more work to do).</li> + * </ul> + * @throws IOException + */ + @Test + public void runEmpty() throws IOException { + Reactor reactor = Proton.reactor(); + assertNotNull(reactor); + reactor.run(); + } + + private static class TestHandler extends BaseHandler { + private final ArrayList<Type> actual = new ArrayList<Type>(); + + @Override + public void onUnhandled(Event event) { + actual.add(event.getType()); + } + + public void assertEvents(Type...expected) { + assertArrayEquals(expected, actual.toArray()); + } + } + + /** + * Tests basic operation of the Reactor.acceptor method by creating an acceptor + * which is immediately closed by the reactor. The expected behaviour is for: + * <ul> + * <li>The reactor to end immediately (as it has no more work to process).</li> + * <li>The handler, associated with the acceptor, to receive init, update and + * final events.</li> + * <li>For it's lifetime, the acceptor is one of the reactor's children.</li> + * </ul> + * @throws IOException + */ + @Test + public void basicAcceptor() throws IOException { + Reactor reactor = Proton.reactor(); + final Acceptor acceptor = reactor.acceptor("localhost", 0); + assertNotNull(acceptor); + assertTrue("acceptor should be one of the reactor's children", reactor.children().contains(acceptor)); + TestHandler acceptorHandler = new TestHandler(); + acceptor.add(acceptorHandler); + reactor.getHandler().add(new BaseHandler() { + @Override + public void onReactorInit(Event event) { + acceptor.close(); + } + }); + reactor.run(); + acceptorHandler.assertEvents(Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL); + assertFalse("acceptor should have been removed from the reactor's children", reactor.children().contains(acceptor)); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java new file mode 100644 index 0000000..44605ae --- /dev/null +++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java @@ -0,0 +1,74 @@ +package org.apache.qpid.proton.reactor.impl; + +import java.io.IOException; +import java.nio.channels.ServerSocketChannel; + +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.ReactorChild; +import org.apache.qpid.proton.reactor.Selectable.Callback; +import org.junit.Test; +import org.mockito.Mockito; + +public class AcceptorImplTest { + + /** + * Tests that if ServerSocketChannel.accept() throws an IOException the Acceptor will + * call Selectable.error() on it's underlying selector. + * @throws IOException + */ + @Test + public void acceptThrowsException() throws IOException { + final Callback mockCallback = Mockito.mock(Callback.class); + final SelectableImpl selectable = new SelectableImpl(); + selectable.onError(mockCallback); + ReactorImpl mockReactor = Mockito.mock(ReactorImpl.class); + Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable); + class MockAcceptorImpl extends AcceptorImpl { + + protected MockAcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException { + super(reactor, host, port, handler); + } + + @Override + protected ServerSocketChannel openServerSocket() throws IOException { + ServerSocketChannel result = Mockito.mock(ServerSocketChannel.class); + Mockito.when(result.accept()).thenThrow(new IOException()); + return result; + } + } + new MockAcceptorImpl(mockReactor, "host", 1234, null); + selectable.readable(); + Mockito.verify(mockCallback).run(selectable); + } + + /** + * Tests that if ServerSocketChannel.accept() returns <code>null</code> the Acceptor will + * throw a ReactorInternalException (because the acceptor's underlying selectable should + * not have been marked as readable, if there is no connection to accept). + * @throws IOException + */ + @Test(expected=ReactorInternalException.class) + public void acceptReturnsNull() throws IOException { + final Callback mockCallback = Mockito.mock(Callback.class); + final SelectableImpl selectable = new SelectableImpl(); + selectable.onError(mockCallback); + ReactorImpl mockReactor = Mockito.mock(ReactorImpl.class); + Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable); + class MockAcceptorImpl extends AcceptorImpl { + + protected MockAcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException { + super(reactor, host, port, handler); + } + + @Override + protected ServerSocketChannel openServerSocket() throws IOException { + ServerSocketChannel result = Mockito.mock(ServerSocketChannel.class); + Mockito.when(result.accept()).thenReturn(null); + return result; + } + } + new MockAcceptorImpl(mockReactor, "host", 1234, null); + selectable.readable(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org