Author: rhs
Date: Thu Aug 22 16:04:14 2013
New Revision: 1516495

URL: http://svn.apache.org/r1516495
Log:
fixed java driver stall; fixed java messenger to clean up the driver after 
stopping; added useful illegal state exceptions to the java messenger impl

Modified:
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1516495&r1=1516494&r2=1516495&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
 Thu Aug 22 16:04:14 2013
@@ -58,6 +58,7 @@ class ConnectorImpl<C> implements Connec
     private boolean _closed = false;
 
     private boolean _selected = false;
+    private boolean _readAllowed = false;
 
     ConnectorImpl(DriverImpl driver, Listener<C> listener, SocketChannel c, C 
context, SelectionKey key)
     {
@@ -73,6 +74,7 @@ class ConnectorImpl<C> implements Connec
         if (!_selected) {
             _selected = true;
             _driver.selectConnector(this);
+            _readAllowed = true;
         }
     }
 
@@ -110,6 +112,8 @@ class ConnectorImpl<C> implements Connec
 
     private boolean read() throws IOException
     {
+        if (!_readAllowed) return false;
+        _readAllowed = false;
         boolean processed = false;
 
         int interest = _key.interestOps();

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1516495&r1=1516494&r2=1516495&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 Thu Aug 22 16:04:14 2013
@@ -128,30 +128,31 @@ public class MessengerImpl implements Me
 
     public void stop()
     {
-        if(_logger.isLoggable(Level.FINE))
-        {
-            _logger.fine(this + " about to stop");
-        }
-        //close all connections
-        for (Connector<?> c : _driver.connectors())
-        {
-            Connection connection = c.getConnection();
-            connection.close();
-        }
-        //stop listeners
-        for (Listener<?> l : _driver.listeners())
-        {
-            try
+        if (_driver != null) {
+            if(_logger.isLoggable(Level.FINE))
             {
-                l.close();
+                _logger.fine(this + " about to stop");
             }
-            catch (IOException e)
+            //close all connections
+            for (Connector<?> c : _driver.connectors())
             {
-                _logger.log(Level.WARNING, "Error while closing listener", e);
+                Connection connection = c.getConnection();
+                connection.close();
             }
+            //stop listeners
+            for (Listener<?> l : _driver.listeners())
+            {
+                try
+                {
+                    l.close();
+                }
+                catch (IOException e)
+                {
+                    _logger.log(Level.WARNING, "Error while closing listener", 
e);
+                }
+            }
+            waitUntil(_allClosed);
         }
-        waitUntil(_allClosed);
-        //_driver.destroy();
     }
 
     public boolean stopped()
@@ -161,17 +162,24 @@ public class MessengerImpl implements Me
 
     public boolean work(long timeout)
     {
+        if (_driver == null) { return false; }
         _worked = false;
         return waitUntil(_workPred, timeout);
     }
 
     public void interrupt()
     {
-        _driver.wakeup();
+        if (_driver != null) {
+            _driver.wakeup();
+        }
     }
 
     public void put(Message m) throws MessengerException
     {
+        if (_driver == null) {
+            throw new IllegalStateException("cannot put while messenger is 
stopped");
+        }
+
         if(_logger.isLoggable(Level.FINE))
         {
             _logger.fine(this + " about to put message: " + m);
@@ -218,6 +226,10 @@ public class MessengerImpl implements Me
 
     public void send(int n) throws TimeoutException
     {
+        if (_driver == null) {
+            throw new IllegalStateException("cannot send while messenger is 
stopped");
+        }
+
         if(_logger.isLoggable(Level.FINE))
         {
             _logger.fine(this + " about to send");
@@ -227,6 +239,10 @@ public class MessengerImpl implements Me
 
     public void recv(int n) throws TimeoutException
     {
+        if (_driver == null) {
+            throw new IllegalStateException("cannot recv while messenger is 
stopped");
+        }
+
         if(_logger.isLoggable(Level.FINE))
         {
             _logger.fine(this + " about to wait for up to " + n + " messages 
to be received");
@@ -250,28 +266,30 @@ public class MessengerImpl implements Me
 
     public Message get()
     {
-        for (Connector<?> c : _driver.connectors())
-        {
-            Connection connection = c.getConnection();
-            _logger.log(Level.FINE, "Attempting to get message from " + 
connection);
-            Delivery delivery = connection.getWorkHead();
-            while (delivery != null)
-            {
-                if (delivery.isReadable() && !delivery.isPartial())
-                {
-                    _logger.log(Level.FINE, "Readable delivery found: " + 
delivery);
-                    int size = read((Receiver) delivery.getLink());
-                    Message message = Proton.message();
-                    message.decode(_buffer, 0, size);
-                    delivery.getLink().advance();
-                    _incoming.add(delivery);
-                    _distributed--;
-                    return message;
-                }
-                else
+        if (_driver != null) {
+            for (Connector<?> c : _driver.connectors())
+            {
+                Connection connection = c.getConnection();
+                _logger.log(Level.FINE, "Attempting to get message from " + 
connection);
+                Delivery delivery = connection.getWorkHead();
+                while (delivery != null)
                 {
-                    _logger.log(Level.FINE, "Delivery not readable: " + 
delivery);
-                    delivery = delivery.getWorkNext();
+                    if (delivery.isReadable() && !delivery.isPartial())
+                    {
+                        _logger.log(Level.FINE, "Readable delivery found: " + 
delivery);
+                        int size = read((Receiver) delivery.getLink());
+                        Message message = Proton.message();
+                        message.decode(_buffer, 0, size);
+                        delivery.getLink().advance();
+                        _incoming.add(delivery);
+                        _distributed--;
+                        return message;
+                    }
+                    else
+                    {
+                        _logger.log(Level.FINE, "Delivery not readable: " + 
delivery);
+                        delivery = delivery.getWorkNext();
+                    }
                 }
             }
         }
@@ -280,6 +298,10 @@ public class MessengerImpl implements Me
 
     public void subscribe(String source) throws MessengerException
     {
+        if (_driver == null) {
+            throw new IllegalStateException("messenger is stopped");
+        }
+
         //the following is not safe or accurate, but it appears '~' is
         //invalid as the start of the hostname and URI can't handle
         //it, so this is a quick hack to avoid rewriting the parsing
@@ -378,18 +400,20 @@ public class MessengerImpl implements Me
     private int queued(boolean outgoing)
     {
         int count = 0;
-        for (Connector<?> c : _driver.connectors())
-        {
-            Connection connection = c.getConnection();
-            for (Link link : new Links(connection, ACTIVE, ANY))
+        if (_driver != null) {
+            for (Connector<?> c : _driver.connectors())
             {
-                if (outgoing)
-                {
-                    if (link instanceof Sender) count += link.getQueued();
-                }
-                else
+                Connection connection = c.getConnection();
+                for (Link link : new Links(connection, ACTIVE, ANY))
                 {
-                    if (link instanceof Receiver) count += link.getQueued();
+                    if (outgoing)
+                    {
+                        if (link instanceof Sender) count += link.getQueued();
+                    }
+                    else
+                    {
+                        if (link instanceof Receiver) count += 
link.getQueued();
+                    }
                 }
             }
         }
@@ -573,6 +597,10 @@ public class MessengerImpl implements Me
 
     private boolean waitUntil(Predicate condition, long timeout)
     {
+        if (_driver == null) {
+            throw new IllegalStateException("cannot wait while messenger is 
stopped");
+        }
+
         processAllConnectors();
 
         // wait until timeout expires or until test is true
@@ -634,6 +662,7 @@ public class MessengerImpl implements Me
         // @todo track the number of opened receive links
         for (Connector<?> c : _driver.connectors())
         {
+            if (c.isClosed()) continue;
             Connection connection = c.getConnection();
             for (Link link : new Links(connection, ACTIVE, ANY))
             {
@@ -655,6 +684,7 @@ public class MessengerImpl implements Me
         int batch = (_credit < linkCt) ? 1 : (_credit/linkCt);
         for (Connector<?> c : _driver.connectors())
         {
+            if (c.isClosed()) continue;
             Connection connection = c.getConnection();
             for (Link link : new Links(connection, ACTIVE, ANY))
             {
@@ -767,11 +797,19 @@ public class MessengerImpl implements Me
     {
         public boolean test()
         {
+            if (_driver == null) {
+                return true;
+            }
+
             for (Connector<?> c : _driver.connectors()) {
                 if (!c.isClosed()) {
                     return false;
                 }
             }
+
+            _driver.destroy();
+            _driver = null;
+
             return true;
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to