PROTON-881: Tidy up TODOs in proton-j reactor code About half the TODOs were me being overly paranoid. The rest generally required some tidy-up. One decision I made was to swallow IOExceptions thrown from closing something - because there was no other cleanup action that could be taken - and rethrowing as an unchecked exception seemed a little heavy-handed.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/513f1525 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/513f1525 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/513f1525 Branch: refs/heads/master Commit: 513f1525608312cbd5cf92d68c93f715189478b9 Parents: e2d2369 Author: Adrian Preston <[email protected]> Authored: Sat Jun 27 20:48:37 2015 +0100 Committer: Rafael Schloming <[email protected]> Committed: Sun Jul 5 19:57:39 2015 -0400 ---------------------------------------------------------------------- .../apache/qpid/proton/reactor/Selectable.java | 1 - .../qpid/proton/reactor/impl/AcceptorImpl.java | 16 +++---- .../qpid/proton/reactor/impl/IOHandler.java | 45 +++++++++++--------- .../qpid/proton/reactor/impl/ReactorImpl.java | 25 +++++------ .../proton/reactor/impl/SelectableImpl.java | 5 +-- .../qpid/proton/reactor/impl/SelectorImpl.java | 6 ++- 6 files changed, 47 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/513f1525/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java index 73bc3b8..fa459d1 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java @@ -25,7 +25,6 @@ import java.nio.channels.SelectableChannel; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Extendable; -import org.apache.qpid.proton.engine.Handler; import org.apache.qpid.proton.engine.Transport; public interface Selectable extends ReactorChild, Extendable { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/513f1525/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java index 12006ad..7fe97af 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java @@ -43,6 +43,7 @@ import org.apache.qpid.proton.reactor.Selectable.Callback; public class AcceptorImpl implements Acceptor { private Record attachments = new RecordImpl(); + private final Selectable sel; private class AcceptorReadable implements Callback { @Override @@ -55,19 +56,16 @@ public class AcceptorImpl implements Acceptor { } Handler handler = BaseHandler.getHandler(AcceptorImpl.this); if (handler == null) { - // TODO: set selectable.getAttachment() to null? handler = reactor.getHandler(); } Connection conn = reactor.connection(handler); Transport trans = Proton.transport(); - // TODO: the C code calls pn_transport_set_server(trans) - is there a Java equivalent we need to worry about? Sasl sasl = trans.sasl(); - sasl.server(); // TODO: it would be nice if SASL was more pluggable than this (but this is what the C API currently does...) - //sasl.allowSkip(true); // TODO: this in in the C code - but the proton-j code throws a ProtonUnsupportedOperationException (as it is not implemented) + sasl.server(); sasl.setMechanisms("ANONYMOUS"); sasl.done(SaslOutcome.PN_SASL_OK); trans.bind(conn); - IOHandler.selectableTransport(reactor, socketChannel.socket(), trans); // TODO: could we pass in a channel object instead of doing socketChannel.socket()? + IOHandler.selectableTransport(reactor, socketChannel.socket(), trans); } catch(IOException ioException) { sel.error(); } @@ -82,14 +80,11 @@ public class AcceptorImpl implements Acceptor { selectable.getChannel().close(); } } catch(IOException ioException) { - ioException.printStackTrace(); - // TODO: what now? + // Ignore - as we can't make the channel any more closed... } } } - private final Selectable sel; - protected AcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException { ServerSocketChannel ssc = ((ReactorImpl)reactor).getIO().serverSocketChannel(); ssc.bind(new InetSocketAddress(host, port)); @@ -110,8 +105,7 @@ public class AcceptorImpl implements Acceptor { try { sel.getChannel().close(); } catch(IOException ioException) { - ioException.printStackTrace(); - // TODO: what now? + // Ignore. } sel.setChannel(null); sel.terminate(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/513f1525/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java index 39d840e..fa807e4 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java @@ -95,7 +95,11 @@ public class IOHandler extends BaseHandler { int colonIndex = hostname.indexOf(':'); int port = 5672; if (colonIndex >= 0) { - port = Integer.parseInt(hostname.substring(colonIndex+1)); // TODO: this can throw NumberFormatException on malformed input! + try { + port = Integer.parseInt(hostname.substring(colonIndex+1)); + } catch(NumberFormatException nfe) { + throw new IllegalArgumentException("Not a valid host: " + hostname, nfe); + } hostname = hostname.substring(0, colonIndex); } @@ -160,7 +164,7 @@ public class IOHandler extends BaseHandler { } // pni_connection_readable from connection.c - private static class ConnectionReadable implements Callback { + private static Callback connectionReadable = new Callback() { @Override public void run(Selectable selectable) { Reactor reactor = selectable.getReactor(); @@ -189,10 +193,10 @@ public class IOHandler extends BaseHandler { update(selectable); reactor.update(selectable); } - } + }; // pni_connection_writable from connection.c - private static class ConnectionWritable implements Callback { + private static Callback connectionWritable = new Callback() { @Override public void run(Selectable selectable) { Reactor reactor = selectable.getReactor(); @@ -222,21 +226,20 @@ public class IOHandler extends BaseHandler { reactor.update(selectable); } } - } + }; // pni_connection_error from connection.c - private static class ConnectionError implements Callback { + private static Callback connectionError = new Callback() { @Override public void run(Selectable selectable) { Reactor reactor = selectable.getReactor(); selectable.terminate(); reactor.update(selectable); } - - } + }; // pni_connection_expired from connection.c - private static class ConnectionExpired implements Callback { + private static Callback connectionExpired = new Callback() { @Override public void run(Selectable selectable) { Reactor reactor = selectable.getReactor(); @@ -249,9 +252,9 @@ public class IOHandler extends BaseHandler { selectable.setWriting(p > 0); reactor.update(selectable); } - } + }; - private static class ConnectionFree implements Callback { + private static Callback connectionFree = new Callback() { @Override public void run(Selectable selectable) { Channel channel = selectable.getChannel(); @@ -259,22 +262,22 @@ public class IOHandler extends BaseHandler { try { channel.close(); } catch(IOException ioException) { - throw new RuntimeException(ioException); + // Ignore } } } - } + }; // pn_reactor_selectable_transport // Note the socket argument can, validly be 'null' this is the equivalent of proton-c's PN_INVALID_SOCKET protected static Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport) { Selectable selectable = reactor.selectable(); selectable.setChannel(socket != null ? socket.getChannel() : null); - selectable.onReadable(new ConnectionReadable()); // TODO: *IF* these callbacks are stateless, do we more than one instance of them? - selectable.onWritable(new ConnectionWritable()); - selectable.onError(new ConnectionError()); - selectable.onExpired(new ConnectionExpired()); - selectable.onFree(new ConnectionFree()); + selectable.onReadable(connectionReadable); + selectable.onWritable(connectionWritable); + selectable.onError(connectionError); + selectable.onExpired(connectionExpired); + selectable.onFree(connectionFree); selectable.setTransport(transport); ((TransportImpl)transport).setSelectable(selectable); ((TransportImpl)transport).setReactor(reactor); @@ -335,9 +338,9 @@ public class IOHandler extends BaseHandler { default: break; } - } catch(IOException e) { - e.printStackTrace(); - // TODO: not clear what to do with this! + } catch(IOException ioException) { + // XXX: Might not be the right exception type, but at least the exception isn't being swallowed + throw new ReactorInternalException(ioException); } } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/513f1525/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java index 94d3595..45f9d4b 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -264,12 +264,11 @@ public class ReactorImpl implements Reactor, Extendable { yield = false; return true; } - yield = false; // TODO: is this required? Handler handler = eventHandler(event); event.dispatch(handler); event.dispatch(global); - if (event.getType() == Type.CONNECTION_FINAL) { // TODO: this should be the same as the pni_reactor_dispatch_post logic... + if (event.getType() == Type.CONNECTION_FINAL) { children.remove(event.getConnection()); } this.previous = event.getType(); @@ -299,7 +298,7 @@ public class ReactorImpl implements Reactor, Extendable { @Override public void wakeup() throws IOException { - wakeup.sink().write(ByteBuffer.allocate(1)); // TODO: c version returns a value! + wakeup.sink().write(ByteBuffer.allocate(1)); } @Override @@ -341,6 +340,13 @@ public class ReactorImpl implements Reactor, Extendable { return task; } + private void expireSelectable(Selectable selectable) { + ReactorImpl reactor = (ReactorImpl) selectable.getReactor(); + reactor.timer.tick(reactor.now); + selectable.setDeadline(reactor.timer.deadline()); + reactor.update(selectable); + } + private class TimerReadable implements Callback { @Override @@ -350,8 +356,7 @@ public class ReactorImpl implements Reactor, Extendable { } catch (IOException e) { throw new RuntimeException(e); } - // TODO: this could be more elegant... - new TimerExpired().run(selectable); + expireSelectable(selectable); } } @@ -359,10 +364,7 @@ public class ReactorImpl implements Reactor, Extendable { private class TimerExpired implements Callback { @Override public void run(Selectable selectable) { - ReactorImpl reactor = (ReactorImpl) selectable.getReactor(); - reactor.timer.tick(reactor.now); - selectable.setDeadline(reactor.timer.deadline()); - reactor.update(selectable); + expireSelectable(selectable); } } @@ -373,9 +375,8 @@ public class ReactorImpl implements Reactor, Extendable { public void run(Selectable selectable) { try { selectable.getChannel().close(); - } catch(IOException e) { - e.printStackTrace(); - // TODO: what to do here... + } catch(IOException ioException) { + // Ignore } } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/513f1525/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java index 2e053ef..5ab0176 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java @@ -23,10 +23,8 @@ package org.apache.qpid.proton.reactor.impl; import java.nio.channels.SelectableChannel; -import org.apache.qpid.proton.engine.BaseHandler; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Event.Type; -import org.apache.qpid.proton.engine.Handler; import org.apache.qpid.proton.engine.Record; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.CollectorImpl; @@ -156,7 +154,6 @@ public class SelectableImpl implements Selectable { } } - // These are equivalent to the C code's set/get file descritor functions. @Override public void setChannel(SelectableChannel channel) { this.channel = channel; @@ -208,7 +205,7 @@ public class SelectableImpl implements Selectable { } @Override - public Reactor getReactor() { // TODO: the C version uses set/getContext for this - should we do the same? + public Reactor getReactor() { return reactor; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/513f1525/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java index 5ef74e7..ed4ad69 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java @@ -99,7 +99,8 @@ class SelectorImpl implements Selector { long now = System.currentTimeMillis(); if (timeout > 0) { long deadline = 0; - for (Selectable selectable : selectables) { // TODO: this differs from the C code which requires a call to update() to make deadline changes take affect + // XXX: Note: this differs from the C code which requires a call to update() to make deadline changes take affect + for (Selectable selectable : selectables) { long d = selectable.getDeadline(); if (d > 0) { deadline = (deadline == 0) ? d : Math.min(deadline, d); @@ -168,7 +169,8 @@ class SelectorImpl implements Selector { if (key.isWritable()) writeable.add(selectable); } selector.selectedKeys().clear(); - for (Selectable selectable : selectables) { // TODO: this is different to the C code which evaluates expiry at the point the selectable is iterated over. + // XXX: Note: this is different to the C code which evaluates expiry at the point the selectable is iterated over. + for (Selectable selectable : selectables) { long deadline = selectable.getDeadline(); if (deadline > 0 && awoken >= deadline) { expired.add(selectable); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
