PROTON-881: Add code to release resources (e.g. sockets, selectors, etc.)

The proton-j codebase does not, generally, implement cleanup logic in
the same way as proton-c (explicit reference counting) and for some
resources (such as sockets, selectors, etc.) cannot always way for
garbage collection to finalize objects.  This commit adds a 'free'
method to a number of classes.  Calling this closes any Java resources
held by the class.  The reactor also has a 'free' method, which frees
both the reactor and any children that the reactor has.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d6c4ba7b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d6c4ba7b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d6c4ba7b

Branch: refs/heads/master
Commit: d6c4ba7bb18d3b8ce02a8dcd52c0f1e569694bb7
Parents: e9d4a78
Author: Adrian Preston <prest...@uk.ibm.com>
Authored: Mon May 4 21:57:53 2015 +0100
Committer: Adrian Preston <prest...@uk.ibm.com>
Committed: Wed May 6 23:24:54 2015 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/proton/reactor/Reactor.java |  1 +
 .../qpid/proton/reactor/ReactorChild.java       |  3 +-
 .../apache/qpid/proton/reactor/Selectable.java  |  4 +-
 .../apache/qpid/proton/reactor/Selector.java    | 18 +++----
 .../qpid/proton/reactor/impl/AcceptorImpl.java  | 13 +++--
 .../qpid/proton/reactor/impl/IOHandler.java     |  4 +-
 .../qpid/proton/reactor/impl/ReactorImpl.java   | 54 +++++++++-----------
 .../proton/reactor/impl/SelectableImpl.java     | 12 ++---
 .../qpid/proton/reactor/impl/SelectorImpl.java  |  8 +++
 .../apache/qpid/proton/reactor/ReactorTest.java | 28 ++++++++++
 .../proton/reactor/impl/AcceptorImplTest.java   | 21 ++++++++
 11 files changed, 114 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
index 935523a..e658057 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -99,5 +99,6 @@ public interface Reactor {
     Acceptor acceptor(String host, int port) throws IOException;
     Acceptor acceptor(String host, int port, Handler handler) throws 
IOException;
 
+    // This also frees any children that the reactor has!
     public void free();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
index d020d1a..c39bdd9 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
@@ -21,7 +21,8 @@
 
 package org.apache.qpid.proton.reactor;
 
-// Tagging interface used to identify classes that can be a child of a reactor.
+// Interface used to identify classes that can be a child of a reactor.
 public interface ReactorChild {
 
+    void free();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
index 390685f..1bdaa1e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
@@ -55,7 +55,7 @@ public interface Selectable extends ReactorChild {
 
     public void onRelease(Callback runnable);
 
-    public void onFinalize(Callback runnable);
+    public void onFree(Callback runnable);
 
     void readable() ;
 
@@ -67,7 +67,7 @@ public interface Selectable extends ReactorChild {
 
     void release() ;
 
-    void _finalize() ;
+    void free() ;
 
     // These are equivalent to the C code's set/get file descritor functions.
     void setChannel(SelectableChannel channel) ;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
index 12188e2..592e32a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
@@ -26,18 +26,18 @@ import java.util.Iterator;
 
 public interface Selector {
 
-    public void add(Selectable selectable) throws IOException ;
+    void add(Selectable selectable) throws IOException ;
 
-    public void update(Selectable selectable);
+    void update(Selectable selectable);
 
-    public void remove(Selectable selectable) ;
+    void remove(Selectable selectable) ;
 
-    public void select(long timeout) throws IOException ;
+    void select(long timeout) throws IOException ;
 
-    public Iterator<Selectable> readable() ;
+    Iterator<Selectable> readable() ;
+    Iterator<Selectable> writeable() ;
+    Iterator<Selectable> expired() ;
+    Iterator<Selectable> error() ;
 
-    public Iterator<Selectable> writeable() ;
-
-    public Iterator<Selectable> expired() ;
-    public Iterator<Selectable> error() ;
+    void free();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
index fb48df6..38d5416 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
@@ -69,11 +69,13 @@ public class AcceptorImpl implements Acceptor {
         }
     }
 
-    private class AcceptorFinalize implements Callback {
+    private class AcceptorFree implements Callback {
         @Override
         public void run(Selectable selectable) {
             try {
-                selectable.getChannel().close();
+                if (selectable.getChannel() != null) {
+                    selectable.getChannel().close();
+                }
             } catch(IOException ioException) {
                 ioException.printStackTrace();
                 // TODO: what now?
@@ -95,7 +97,7 @@ public class AcceptorImpl implements Acceptor {
         sel = ((ReactorImpl)reactor).selectable(this);
         sel.setChannel(ssc);
         sel.onReadable(new AcceptorReadable());
-        sel.onFinalize(new AcceptorFinalize()); // TODO: currently, this is 
not called from anywhere!!
+        sel.onFree(new AcceptorFree()); // TODO: currently, this is not called 
from anywhere!!
         sel.setReactor(reactor);
         sel.setAttachment(handler);
         sel.setReading(true);
@@ -128,4 +130,9 @@ public class AcceptorImpl implements Acceptor {
         ServerSocketChannel ssc = (ServerSocketChannel)sel.getChannel();
         return ((InetSocketAddress)ssc.getLocalAddress()).getPort();
     }
+
+    @Override
+    public void free() {
+        sel.free();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index a182722..6199c56 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -251,7 +251,7 @@ public class IOHandler extends BaseHandler {
         }
     }
 
-    private static class ConnectionFinalize implements Callback {
+    private static class ConnectionFree implements Callback {
         @Override
         public void run(Selectable selectable) {
             try {
@@ -272,7 +272,7 @@ public class IOHandler extends BaseHandler {
         selectable.onWritable(new ConnectionWritable());
         selectable.onError(new ConnectionError());
         selectable.onExpired(new ConnectionExpired());
-        selectable.onFinalize(new ConnectionFinalize());    // TODO: the 
corresponding selectable._finalize method is never called anywhere in the C 
codebase!
+        selectable.onFree(new ConnectionFree());
         selectable.setTransport(transport);
         ((TransportImpl)transport).setSelectable(selectable);
         ((TransportImpl)transport).setReactor(reactor);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index e25e813..cde5f42 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -120,33 +120,28 @@ public class ReactorImpl implements Reactor {
 
     @Override
     public void free() {
-        // TODO
-/*
-  132 void pn_reactor_free(pn_reactor_t *reactor) {
-  133   if (reactor) {
-  134     pn_collector_release(reactor->collector);
-  135     pn_handler_free(reactor->handler);
-  136     reactor->handler = NULL;
-  137     pn_decref(reactor);
-  138   }
-  139 }
- */
-    /*
- 85 static void pn_reactor_finalize(pn_reactor_t *reactor) {
- 86   for (int i = 0; i < 2; i++) {
- 87     if (reactor->wakeup[i] != PN_INVALID_SOCKET) {
- 88       pn_close(reactor->io, reactor->wakeup[i]);
- 89     }
- 90   }
- 91   pn_decref(reactor->attachments);
- 92   pn_decref(reactor->collector);
- 93   pn_decref(reactor->global);
- 94   pn_decref(reactor->handler);
- 95   pn_decref(reactor->children);
- 96   pn_decref(reactor->timer);
- 97   pn_decref(reactor->io);
- 98 }
- */
+        if (wakeup.source().isOpen()) {
+            try {
+                wakeup.source().close();
+            } catch(IOException e) {
+                // Ignore.
+            }
+        }
+        if (wakeup.sink().isOpen()) {
+            try {
+                wakeup.sink().close();
+            } catch(IOException e) {
+                // Ignore
+            }
+        }
+
+        if (selector != null) {
+            selector.free();
+        }
+
+        for (ReactorChild child : children) {
+            child.free();
+        }
     }
 
     @Override
@@ -219,6 +214,7 @@ public class ReactorImpl implements Reactor {
         public void run(Selectable selectable) {
             if (reactor.children.remove(child)) {
                 --reactor.selectables;
+                child.free();
             }
         }
     }
@@ -430,7 +426,7 @@ public class ReactorImpl implements Reactor {
 
 
     // pni_timer_finalize from reactor.c
-    private class TimerFinalize implements Callback {
+    private class TimerFree implements Callback {
         @Override
         public void run(Selectable selectable) {
             try {
@@ -447,7 +443,7 @@ public class ReactorImpl implements Reactor {
         sel.setChannel(wakeup.source());
         sel.onReadable(new TimerReadable());
         sel.onExpired(new TimerExpired());
-        sel.onFinalize(new TimerFinalize());    // TODO: not sure the 
corresponding sel._finalize() gets called anywhere...
+        sel.onFree(new TimerFree());
         sel.setReading(true);
         sel.setDeadline(timer.deadline());
         update(sel);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
index 2ddb372..570851e 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
@@ -39,7 +39,7 @@ public class SelectableImpl implements Selectable {
     private Callback error;
     private Callback expire;
     private Callback release;
-    private Callback finalize;
+    private Callback free;
 
     private boolean reading = false;
     private boolean writing = false;
@@ -108,8 +108,8 @@ public class SelectableImpl implements Selectable {
     }
 
     @Override
-    public void onFinalize(Callback runnable) {
-        this.finalize = runnable;
+    public void onFree(Callback runnable) {
+        this.free = runnable;
     }
 
     @Override
@@ -148,9 +148,9 @@ public class SelectableImpl implements Selectable {
     }
 
     @Override
-    public void _finalize() {
-        if (finalize != null) {
-            finalize.run(this);
+    public void free() {
+        if (free != null) {
+            free.run(this);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
index a53be7d..cf50456 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
@@ -150,4 +150,12 @@ public class SelectorImpl implements Selector {
         return error.iterator();
     }
 
+    @Override
+    public void free() {
+        try {
+            selector.close();
+        } catch(IOException ioException) {
+            // Ignore
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java 
b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
index 0bcfd80..4b1dba0 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
@@ -1,3 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
 package org.apache.qpid.proton.reactor;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -36,6 +57,7 @@ public class ReactorTest {
         Reactor reactor = Proton.reactor();
         assertNotNull(reactor);
         reactor.run();
+        reactor.free();
     }
 
     private static class TestHandler extends BaseHandler {
@@ -65,6 +87,7 @@ public class ReactorTest {
         TestHandler testHandler = new TestHandler();
         handler.add(testHandler);
         reactor.run();
+        reactor.free();
         testHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, 
Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL, Type.REACTOR_FINAL);
     }
 
@@ -89,6 +112,7 @@ public class ReactorTest {
         TestHandler reactorHandler = new TestHandler();
         reactor.getHandler().add(reactorHandler);
         reactor.run();
+        reactor.free();
         reactorHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, 
Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL, Type.REACTOR_FINAL);
         connectionHandler.assertEvents(Type.CONNECTION_INIT);
     }
@@ -119,6 +143,7 @@ public class ReactorTest {
             }
         });
         reactor.run();
+        reactor.free();
         acceptorHandler.assertEvents(Type.SELECTABLE_INIT, 
Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL);
         assertFalse("acceptor should have been removed from the reactor's 
children", reactor.children().contains(acceptor));
     }
@@ -184,6 +209,7 @@ public class ReactorTest {
         assertTrue("connection should be one of the reactor's children", 
reactor.children().contains(connection));
 
         reactor.run();
+        reactor.free();
 
         assertFalse("acceptor should have been removed from the reactor's 
children", reactor.children().contains(acceptor));
         assertFalse("connection should have been removed from the reactor's 
children", reactor.children().contains(connection));
@@ -278,6 +304,7 @@ public class ReactorTest {
         reactor.connection(src);
 
         reactor.run();
+        reactor.free();
         assertEquals("Did not receive the expected number of messages", count, 
snk.received);
     }
 
@@ -306,6 +333,7 @@ public class ReactorTest {
         TestHandler taskHandler = new TestHandler();
         reactor.schedule(0, taskHandler);
         reactor.run();
+        reactor.free();
         reactorHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, 
Type.SELECTABLE_UPDATED, Type.REACTOR_QUIESCED, Type.SELECTABLE_UPDATED,
                 Type.SELECTABLE_FINAL, Type.REACTOR_FINAL);
         taskHandler.assertEvents(Type.TIMER_TASK);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d6c4ba7b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
 
b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
index 44605ae..85750ea 100644
--- 
a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
+++ 
b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
@@ -1,3 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
 package org.apache.qpid.proton.reactor.impl;
 
 import java.io.IOException;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to