PROTON-881: Add code to release resources (e.g. sockets, selectors, etc.) The proton-j codebase does not, generally, implement cleanup logic in the same way as proton-c (explicit reference counting) and for some resources (such as sockets, selectors, etc.) cannot always way for garbage collection to finalize objects. This commit adds a 'free' method to a number of classes. Calling this closes any Java resources held by the class. The reactor also has a 'free' method, which frees both the reactor and any children that the reactor has.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d6c4ba7b Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d6c4ba7b Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d6c4ba7b Branch: refs/heads/master Commit: d6c4ba7bb18d3b8ce02a8dcd52c0f1e569694bb7 Parents: e9d4a78 Author: Adrian Preston <prest...@uk.ibm.com> Authored: Mon May 4 21:57:53 2015 +0100 Committer: Adrian Preston <prest...@uk.ibm.com> Committed: Wed May 6 23:24:54 2015 +0100 ---------------------------------------------------------------------- .../org/apache/qpid/proton/reactor/Reactor.java | 1 + .../qpid/proton/reactor/ReactorChild.java | 3 +- .../apache/qpid/proton/reactor/Selectable.java | 4 +- .../apache/qpid/proton/reactor/Selector.java | 18 +++---- .../qpid/proton/reactor/impl/AcceptorImpl.java | 13 +++-- .../qpid/proton/reactor/impl/IOHandler.java | 4 +- .../qpid/proton/reactor/impl/ReactorImpl.java | 54 +++++++++----------- .../proton/reactor/impl/SelectableImpl.java | 12 ++--- .../qpid/proton/reactor/impl/SelectorImpl.java | 8 +++ .../apache/qpid/proton/reactor/ReactorTest.java | 28 ++++++++++ .../proton/reactor/impl/AcceptorImplTest.java | 21 ++++++++ 11 files changed, 114 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java index 935523a..e658057 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java @@ -99,5 +99,6 @@ public interface Reactor { Acceptor acceptor(String host, int port) throws IOException; Acceptor acceptor(String host, int port, Handler handler) throws IOException; + // This also frees any children that the reactor has! public void free(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java index d020d1a..c39bdd9 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java @@ -21,7 +21,8 @@ package org.apache.qpid.proton.reactor; -// Tagging interface used to identify classes that can be a child of a reactor. +// Interface used to identify classes that can be a child of a reactor. public interface ReactorChild { + void free(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java index 390685f..1bdaa1e 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java @@ -55,7 +55,7 @@ public interface Selectable extends ReactorChild { public void onRelease(Callback runnable); - public void onFinalize(Callback runnable); + public void onFree(Callback runnable); void readable() ; @@ -67,7 +67,7 @@ public interface Selectable extends ReactorChild { void release() ; - void _finalize() ; + void free() ; // These are equivalent to the C code's set/get file descritor functions. void setChannel(SelectableChannel channel) ; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java index 12188e2..592e32a 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java @@ -26,18 +26,18 @@ import java.util.Iterator; public interface Selector { - public void add(Selectable selectable) throws IOException ; + void add(Selectable selectable) throws IOException ; - public void update(Selectable selectable); + void update(Selectable selectable); - public void remove(Selectable selectable) ; + void remove(Selectable selectable) ; - public void select(long timeout) throws IOException ; + void select(long timeout) throws IOException ; - public Iterator<Selectable> readable() ; + Iterator<Selectable> readable() ; + Iterator<Selectable> writeable() ; + Iterator<Selectable> expired() ; + Iterator<Selectable> error() ; - public Iterator<Selectable> writeable() ; - - public Iterator<Selectable> expired() ; - public Iterator<Selectable> error() ; + void free(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java index fb48df6..38d5416 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java @@ -69,11 +69,13 @@ public class AcceptorImpl implements Acceptor { } } - private class AcceptorFinalize implements Callback { + private class AcceptorFree implements Callback { @Override public void run(Selectable selectable) { try { - selectable.getChannel().close(); + if (selectable.getChannel() != null) { + selectable.getChannel().close(); + } } catch(IOException ioException) { ioException.printStackTrace(); // TODO: what now? @@ -95,7 +97,7 @@ public class AcceptorImpl implements Acceptor { sel = ((ReactorImpl)reactor).selectable(this); sel.setChannel(ssc); sel.onReadable(new AcceptorReadable()); - sel.onFinalize(new AcceptorFinalize()); // TODO: currently, this is not called from anywhere!! + sel.onFree(new AcceptorFree()); // TODO: currently, this is not called from anywhere!! sel.setReactor(reactor); sel.setAttachment(handler); sel.setReading(true); @@ -128,4 +130,9 @@ public class AcceptorImpl implements Acceptor { ServerSocketChannel ssc = (ServerSocketChannel)sel.getChannel(); return ((InetSocketAddress)ssc.getLocalAddress()).getPort(); } + + @Override + public void free() { + sel.free(); + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java index a182722..6199c56 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java @@ -251,7 +251,7 @@ public class IOHandler extends BaseHandler { } } - private static class ConnectionFinalize implements Callback { + private static class ConnectionFree implements Callback { @Override public void run(Selectable selectable) { try { @@ -272,7 +272,7 @@ public class IOHandler extends BaseHandler { selectable.onWritable(new ConnectionWritable()); selectable.onError(new ConnectionError()); selectable.onExpired(new ConnectionExpired()); - selectable.onFinalize(new ConnectionFinalize()); // TODO: the corresponding selectable._finalize method is never called anywhere in the C codebase! + selectable.onFree(new ConnectionFree()); selectable.setTransport(transport); ((TransportImpl)transport).setSelectable(selectable); ((TransportImpl)transport).setReactor(reactor); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java index e25e813..cde5f42 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -120,33 +120,28 @@ public class ReactorImpl implements Reactor { @Override public void free() { - // TODO -/* - 132 void pn_reactor_free(pn_reactor_t *reactor) { - 133 if (reactor) { - 134 pn_collector_release(reactor->collector); - 135 pn_handler_free(reactor->handler); - 136 reactor->handler = NULL; - 137 pn_decref(reactor); - 138 } - 139 } - */ - /* - 85 static void pn_reactor_finalize(pn_reactor_t *reactor) { - 86 for (int i = 0; i < 2; i++) { - 87 if (reactor->wakeup[i] != PN_INVALID_SOCKET) { - 88 pn_close(reactor->io, reactor->wakeup[i]); - 89 } - 90 } - 91 pn_decref(reactor->attachments); - 92 pn_decref(reactor->collector); - 93 pn_decref(reactor->global); - 94 pn_decref(reactor->handler); - 95 pn_decref(reactor->children); - 96 pn_decref(reactor->timer); - 97 pn_decref(reactor->io); - 98 } - */ + if (wakeup.source().isOpen()) { + try { + wakeup.source().close(); + } catch(IOException e) { + // Ignore. + } + } + if (wakeup.sink().isOpen()) { + try { + wakeup.sink().close(); + } catch(IOException e) { + // Ignore + } + } + + if (selector != null) { + selector.free(); + } + + for (ReactorChild child : children) { + child.free(); + } } @Override @@ -219,6 +214,7 @@ public class ReactorImpl implements Reactor { public void run(Selectable selectable) { if (reactor.children.remove(child)) { --reactor.selectables; + child.free(); } } } @@ -430,7 +426,7 @@ public class ReactorImpl implements Reactor { // pni_timer_finalize from reactor.c - private class TimerFinalize implements Callback { + private class TimerFree implements Callback { @Override public void run(Selectable selectable) { try { @@ -447,7 +443,7 @@ public class ReactorImpl implements Reactor { sel.setChannel(wakeup.source()); sel.onReadable(new TimerReadable()); sel.onExpired(new TimerExpired()); - sel.onFinalize(new TimerFinalize()); // TODO: not sure the corresponding sel._finalize() gets called anywhere... + sel.onFree(new TimerFree()); sel.setReading(true); sel.setDeadline(timer.deadline()); update(sel); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java index 2ddb372..570851e 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java @@ -39,7 +39,7 @@ public class SelectableImpl implements Selectable { private Callback error; private Callback expire; private Callback release; - private Callback finalize; + private Callback free; private boolean reading = false; private boolean writing = false; @@ -108,8 +108,8 @@ public class SelectableImpl implements Selectable { } @Override - public void onFinalize(Callback runnable) { - this.finalize = runnable; + public void onFree(Callback runnable) { + this.free = runnable; } @Override @@ -148,9 +148,9 @@ public class SelectableImpl implements Selectable { } @Override - public void _finalize() { - if (finalize != null) { - finalize.run(this); + public void free() { + if (free != null) { + free.run(this); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java index a53be7d..cf50456 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java @@ -150,4 +150,12 @@ public class SelectorImpl implements Selector { return error.iterator(); } + @Override + public void free() { + try { + selector.close(); + } catch(IOException ioException) { + // Ignore + } + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java index 0bcfd80..4b1dba0 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java @@ -1,3 +1,24 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + package org.apache.qpid.proton.reactor; import static org.junit.Assert.assertArrayEquals; @@ -36,6 +57,7 @@ public class ReactorTest { Reactor reactor = Proton.reactor(); assertNotNull(reactor); reactor.run(); + reactor.free(); } private static class TestHandler extends BaseHandler { @@ -65,6 +87,7 @@ public class ReactorTest { TestHandler testHandler = new TestHandler(); handler.add(testHandler); reactor.run(); + reactor.free(); testHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL, Type.REACTOR_FINAL); } @@ -89,6 +112,7 @@ public class ReactorTest { TestHandler reactorHandler = new TestHandler(); reactor.getHandler().add(reactorHandler); reactor.run(); + reactor.free(); reactorHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL, Type.REACTOR_FINAL); connectionHandler.assertEvents(Type.CONNECTION_INIT); } @@ -119,6 +143,7 @@ public class ReactorTest { } }); reactor.run(); + reactor.free(); acceptorHandler.assertEvents(Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL); assertFalse("acceptor should have been removed from the reactor's children", reactor.children().contains(acceptor)); } @@ -184,6 +209,7 @@ public class ReactorTest { assertTrue("connection should be one of the reactor's children", reactor.children().contains(connection)); reactor.run(); + reactor.free(); assertFalse("acceptor should have been removed from the reactor's children", reactor.children().contains(acceptor)); assertFalse("connection should have been removed from the reactor's children", reactor.children().contains(connection)); @@ -278,6 +304,7 @@ public class ReactorTest { reactor.connection(src); reactor.run(); + reactor.free(); assertEquals("Did not receive the expected number of messages", count, snk.received); } @@ -306,6 +333,7 @@ public class ReactorTest { TestHandler taskHandler = new TestHandler(); reactor.schedule(0, taskHandler); reactor.run(); + reactor.free(); reactorHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.REACTOR_QUIESCED, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL, Type.REACTOR_FINAL); taskHandler.assertEvents(Type.TIMER_TASK); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java index 44605ae..85750ea 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java @@ -1,3 +1,24 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + package org.apache.qpid.proton.reactor.impl; import java.io.IOException; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org