PROTON-881: Add error handling for "accept socket connection" path through 
Acceptor

Adds error handling for various problems that can occur when accepting
a connection (to a listening socket).  Also adds unittests that cover
these cases.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5748bb98
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5748bb98
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5748bb98

Branch: refs/heads/master
Commit: 5748bb9880432bab64c42d34f4d7277163077427
Parents: 51529f6
Author: Adrian Preston <prest...@uk.ibm.com>
Authored: Sat May 2 22:30:40 2015 +0100
Committer: Adrian Preston <prest...@uk.ibm.com>
Committed: Wed May 6 23:24:41 2015 +0100

----------------------------------------------------------------------
 .../apache/qpid/proton/reactor/Acceptor.java    |  5 +-
 .../qpid/proton/reactor/impl/AcceptorImpl.java  | 21 ++++--
 .../qpid/proton/reactor/impl/ReactorImpl.java   | 16 +++--
 .../reactor/impl/ReactorInternalException.java  | 23 ++++++
 .../qpid/proton/reactor/impl/SelectorImpl.java  |  2 +-
 .../apache/qpid/proton/reactor/ReactorTest.java | 76 ++++++++++++++++++++
 .../proton/reactor/impl/AcceptorImplTest.java   | 74 +++++++++++++++++++
 7 files changed, 205 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
index 91b20a3..f2a320b 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
@@ -21,7 +21,10 @@
 
 package org.apache.qpid.proton.reactor;
 
-public interface Acceptor {
+import org.apache.qpid.proton.engine.Handler;
 
+public interface Acceptor extends ReactorChild {
+
+    void add(Handler handler);
     void close();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
index efe0eb4..7084dfb 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
@@ -45,6 +45,9 @@ public class AcceptorImpl implements Acceptor {
             Reactor reactor = selectable.getReactor();
             try {
                 SocketChannel socketChannel = 
((ServerSocketChannel)selectable.getChannel()).accept();
+                if (socketChannel == null) {
+                    throw new ReactorInternalException("Selectable readable, 
but no socket to accept");
+                }
                 Handler handler = (Handler)selectable.getAttachment();
                 if (handler == null) {
                     // TODO: set selectable.getAttachment() to null?
@@ -61,8 +64,7 @@ public class AcceptorImpl implements Acceptor {
                 trans.bind(conn);
                 IOHandler.selectableTransport(reactor, socketChannel.socket(), 
trans);  // TODO: could we pass in a channel object instead of doing 
socketChannel.socket()?
             } catch(IOException ioException) {
-                ioException.printStackTrace();
-                // TODO: what do we do with this exception?
+                sel.error();
             }
         }
     }
@@ -81,10 +83,16 @@ public class AcceptorImpl implements Acceptor {
 
     private final Selectable sel;
 
+    // Split out from AcceptorImpl to make it easier for unittests to mock 
this class
+    // without having to open an actual port.
+    protected ServerSocketChannel openServerSocket() throws IOException {
+        return ServerSocketChannel.open();
+    }
+
     protected AcceptorImpl(Reactor reactor, String host, int port, Handler 
handler) throws IOException {
-        ServerSocketChannel ssc = ServerSocketChannel.open();
+        ServerSocketChannel ssc = openServerSocket();
         ssc.bind(new InetSocketAddress(host, port));
-        sel = reactor.selectable();
+        sel = ((ReactorImpl)reactor).selectable(this);
         sel.setChannel(ssc);
         sel.onReadable(new AcceptorReadable());
         sel.onFinalize(new AcceptorFinalize()); // TODO: currently, this is 
not called from anywhere!!
@@ -109,4 +117,9 @@ public class AcceptorImpl implements Acceptor {
             reactor.update(sel);
         }
     }
+
+    @Override
+    public void add(Handler handler) {
+        sel.add(handler);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index 684d8f8..a3180ab 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -195,12 +195,14 @@ public class ReactorImpl implements Reactor {
 
     private class ReleaseCallback implements Callback {
         private final ReactorImpl reactor;
-        public ReleaseCallback(ReactorImpl reactor) {
+        private final ReactorChild child;
+        public ReleaseCallback(ReactorImpl reactor, ReactorChild child) {
             this.reactor = reactor;
+            this.child = child;
         }
         @Override
         public void run(Selectable selectable) {
-            if (reactor.children.remove(selectable)) {
+            if (reactor.children.remove(child)) {
                 --reactor.selectables;
             }
         }
@@ -208,18 +210,20 @@ public class ReactorImpl implements Reactor {
 
     @Override
     public Selectable selectable() {
+        return selectable(null);
+    }
+
+    public Selectable selectable(ReactorChild child) {
         Selectable result = new SelectableImpl();
         result.setCollector(collector);
         collector.put(Type.SELECTABLE_INIT, result);
         result.setReactor(this);
-        children.add(result);
-        result.onRelease(new ReleaseCallback(this));
+        children.add(child == null ? result : child);
+        result.onRelease(new ReleaseCallback(this, child == null ? result : 
child));
         ++selectables;
         return result;
     }
 
-
-
     @Override
     public void update(Selectable selectable) {
         SelectableImpl selectableImpl = (SelectableImpl)selectable;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java
new file mode 100644
index 0000000..d8d67ad
--- /dev/null
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java
@@ -0,0 +1,23 @@
+package org.apache.qpid.proton.reactor.impl;
+
+/**
+ * Thrown by the reactor when it encounters an internal error condition.
+ * This is analogous to an assertion failure in the proton-c reactor
+ * implementation.
+ */
+class ReactorInternalException extends RuntimeException {
+
+    private static final long serialVersionUID = 8979674526584642454L;
+
+    protected ReactorInternalException(String msg) {
+        super(msg);
+    }
+
+    protected ReactorInternalException(Throwable cause) {
+        super(cause);
+    }
+
+    protected ReactorInternalException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
index e4e284e..a53be7d 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
@@ -40,7 +40,7 @@ public class SelectorImpl implements Selector {
     private final HashSet<Selectable> error = new HashSet<Selectable>();
 
     public SelectorImpl() throws IOException {
-        selector = java.nio.channels.Selector.open();
+        selector = java.nio.channels.Selector.open();   // TODO: need to 
ensure we close this somewhere...
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java 
b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
new file mode 100644
index 0000000..2378d5f
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
@@ -0,0 +1,76 @@
+package org.apache.qpid.proton.reactor;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Event.Type;
+import org.junit.Test;
+
+public class ReactorTest {
+
+    /**
+     * Tests that creating a reactor and running it:
+     * <ul>
+     *   <li>Doesn't throw any exceptions.</li>
+     *   <li>Returns immediately from the run method (as there is no more work 
to do).</li>
+     * </ul>
+     * @throws IOException
+     */
+    @Test
+    public void runEmpty() throws IOException {
+        Reactor reactor = Proton.reactor();
+        assertNotNull(reactor);
+        reactor.run();
+    }
+
+    private static class TestHandler extends BaseHandler {
+        private final ArrayList<Type> actual = new ArrayList<Type>();
+
+        @Override
+        public void onUnhandled(Event event) {
+            actual.add(event.getType());
+        }
+
+        public void assertEvents(Type...expected) {
+            assertArrayEquals(expected, actual.toArray());
+        }
+    }
+
+    /**
+     * Tests basic operation of the Reactor.acceptor method by creating an 
acceptor
+     * which is immediately closed by the reactor.  The expected behaviour is 
for:
+     * <ul>
+     *   <li>The reactor to end immediately (as it has no more work to 
process).</li>
+     *   <li>The handler, associated with the acceptor, to receive init, 
update and
+     *       final events.</li>
+     *   <li>For it's lifetime, the acceptor is one of the reactor's 
children.</li>
+     * </ul>
+     * @throws IOException
+     */
+    @Test
+    public void basicAcceptor() throws IOException {
+        Reactor reactor = Proton.reactor();
+        final Acceptor acceptor = reactor.acceptor("localhost", 0);
+        assertNotNull(acceptor);
+        assertTrue("acceptor should be one of the reactor's children", 
reactor.children().contains(acceptor));
+        TestHandler acceptorHandler = new TestHandler();
+        acceptor.add(acceptorHandler);
+        reactor.getHandler().add(new BaseHandler() {
+            @Override
+            public void onReactorInit(Event event) {
+                acceptor.close();
+            }
+        });
+        reactor.run();
+        acceptorHandler.assertEvents(Type.SELECTABLE_INIT, 
Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL);
+        assertFalse("acceptor should have been removed from the reactor's 
children", reactor.children().contains(acceptor));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5748bb98/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
 
b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
new file mode 100644
index 0000000..44605ae
--- /dev/null
+++ 
b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
@@ -0,0 +1,74 @@
+package org.apache.qpid.proton.reactor.impl;
+
+import java.io.IOException;
+import java.nio.channels.ServerSocketChannel;
+
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.ReactorChild;
+import org.apache.qpid.proton.reactor.Selectable.Callback;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class AcceptorImplTest {
+
+    /**
+     * Tests that if ServerSocketChannel.accept() throws an IOException the 
Acceptor will
+     * call Selectable.error() on it's underlying selector.
+     * @throws IOException
+     */
+    @Test
+    public void acceptThrowsException() throws IOException {
+        final Callback mockCallback = Mockito.mock(Callback.class);
+        final SelectableImpl selectable = new SelectableImpl();
+        selectable.onError(mockCallback);
+        ReactorImpl mockReactor = Mockito.mock(ReactorImpl.class);
+        
Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable);
+        class MockAcceptorImpl extends AcceptorImpl {
+
+            protected MockAcceptorImpl(Reactor reactor, String host, int port, 
Handler handler) throws IOException {
+                super(reactor, host, port, handler);
+            }
+
+            @Override
+            protected ServerSocketChannel openServerSocket() throws 
IOException {
+                ServerSocketChannel result = 
Mockito.mock(ServerSocketChannel.class);
+                Mockito.when(result.accept()).thenThrow(new IOException());
+                return result;
+            }
+        }
+        new MockAcceptorImpl(mockReactor, "host", 1234, null);
+        selectable.readable();
+        Mockito.verify(mockCallback).run(selectable);
+    }
+
+    /**
+     * Tests that if ServerSocketChannel.accept() returns <code>null</code> 
the Acceptor will
+     * throw a ReactorInternalException (because the acceptor's underlying 
selectable should
+     * not have been marked as readable, if there is no connection to accept).
+     * @throws IOException
+     */
+    @Test(expected=ReactorInternalException.class)
+    public void acceptReturnsNull() throws IOException {
+        final Callback mockCallback = Mockito.mock(Callback.class);
+        final SelectableImpl selectable = new SelectableImpl();
+        selectable.onError(mockCallback);
+        ReactorImpl mockReactor = Mockito.mock(ReactorImpl.class);
+        
Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable);
+        class MockAcceptorImpl extends AcceptorImpl {
+
+            protected MockAcceptorImpl(Reactor reactor, String host, int port, 
Handler handler) throws IOException {
+                super(reactor, host, port, handler);
+            }
+
+            @Override
+            protected ServerSocketChannel openServerSocket() throws 
IOException {
+                ServerSocketChannel result = 
Mockito.mock(ServerSocketChannel.class);
+                Mockito.when(result.accept()).thenReturn(null);
+                return result;
+            }
+        }
+        new MockAcceptorImpl(mockReactor, "host", 1234, null);
+        selectable.readable();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to