Author: rhs
Date: Thu Aug 15 13:17:42 2013
New Revision: 1514254

URL: http://svn.apache.org/r1514254
Log:
PROTON-393: fixed recv to not error in non blocking mode; fixed python binding 
to use seconds (in floats) rather than milliseconds; fixed a stall in java 
messenger

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    qpid/proton/trunk/tests/python/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Aug 15 13:17:42 
2013
@@ -304,10 +304,18 @@ will not be verified.
 """)
 
   def _get_timeout(self):
-    return pn_messenger_get_timeout(self._mng)
+    t = pn_messenger_get_timeout(self._mng)
+    if t == -1:
+      return None
+    else:
+      return float(t)/1000
 
   def _set_timeout(self, value):
-    self._check(pn_messenger_set_timeout(self._mng, value))
+    if value is None:
+      t = -1
+    else:
+      t = long(1000*value)
+    self._check(pn_messenger_set_timeout(self._mng, t))
 
   timeout = property(_get_timeout, _set_timeout,
                      doc="""
@@ -450,8 +458,12 @@ send. Defaults to zero.
       n = -1
     self._check(pn_messenger_recv(self._mng, n))
 
-  def work(self, timeout=-1):
-    err = pn_messenger_work(self._mng, timeout)
+  def work(self, timeout=None):
+    if timeout is None:
+      t = -1
+    else:
+      t = long(1000*timeout)
+    err = pn_messenger_work(self._mng, t)
     if (err == PN_TIMEOUT):
       return False
     else:

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Aug 15 13:17:42 
2013
@@ -1280,13 +1280,15 @@ int pn_messenger_send(pn_messenger_t *me
 int pn_messenger_recv(pn_messenger_t *messenger, int n)
 {
   if (!messenger) return PN_ARG_ERR;
-  if (!pn_listener_head(messenger->driver) && 
!pn_connector_head(messenger->driver))
+  if (messenger->blocking && !pn_listener_head(messenger->driver)
+      && !pn_connector_head(messenger->driver))
     return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
   messenger->receiving = n;
   pn_messenger_flow(messenger);
   int err = pn_messenger_sync(messenger, pn_messenger_rcvd);
   if (err) return err;
   if (!pn_messenger_incoming(messenger) &&
+      messenger->blocking &&
       !pn_listener_head(messenger->driver) &&
       !pn_connector_head(messenger->driver)) {
     return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py 
(original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Thu Aug 
15 13:17:42 2013
@@ -1318,7 +1318,11 @@ class Messenger(object):
   def recv(self, n=-1):
     self.impl.recv(n)
 
-  def work(self, t):
+  def work(self, timeout=None):
+    if timeout is None:
+      t = -1
+    else:
+      t = long(1000*timeout)
     return self.impl.work(t)
 
   def interrupt(self):
@@ -1339,8 +1343,16 @@ class Messenger(object):
     return self.impl.incoming()
 
   def _get_timeout(self):
-    return self.impl.getTimeout()
-  def _set_timeout(self, t):
+    t = self.impl.getTimeout()
+    if t == -1:
+      return None
+    else:
+      return float(t)/1000
+  def _set_timeout(self, timeout):
+    if timeout is None:
+      t = -1
+    else:
+      t = long(1000*timeout)
     self.impl.setTimeout(t)
   timeout = property(_get_timeout, _set_timeout)
 

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
 Thu Aug 15 13:17:42 2013
@@ -53,7 +53,6 @@ class ConnectorImpl<C> implements Connec
     private SelectionKey _key;
     private ConnectorState _state = UNINITIALIZED;
 
-    private boolean _readPending = true;
     private boolean _inputDone = false;
     private boolean _outputDone = false;
     private boolean _closed = false;
@@ -75,8 +74,6 @@ class ConnectorImpl<C> implements Connec
             _selected = true;
             _driver.selectConnector(this);
         }
-
-        _readPending = true;
     }
 
     void unselected()
@@ -91,12 +88,8 @@ class ConnectorImpl<C> implements Connec
         boolean processed = false;
         if (!_inputDone)
         {
-            if (_readPending)
-            {
-                if (read()) {
-                    processed = true;
-                }
-                _readPending = false;
+            if (read()) {
+                processed = true;
             }
         }
 

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 Thu Aug 15 13:17:42 2013
@@ -433,6 +433,7 @@ public class MessengerImpl implements Me
             {
                 _logger.log(Level.SEVERE, "Error processing connection", e);
             }
+            processEndpoints(c);
         }
     }
 
@@ -467,75 +468,80 @@ public class MessengerImpl implements Me
             } catch (IOException e) {
                 _logger.log(Level.SEVERE, "Error processing connection", e);
             }
-            Connection connection = c.getConnection();
+            processEndpoints(c);
+        }
+    }
 
-            if (connection.getLocalState() == EndpointState.UNINITIALIZED)
-            {
-                connection.open();
-            }
+    private void processEndpoints(Connector c)
+    {
+        Connection connection = c.getConnection();
 
-            Delivery delivery = connection.getWorkHead();
-            while (delivery != null)
-            {
-                if (delivery.getLink() instanceof Sender && 
delivery.isUpdated())
-                {
-                    delivery.disposition(delivery.getRemoteState());
-                }
-                //TODO: delivery.clear(); What's the equivalent in java?
-                delivery = delivery.getWorkNext();
-            }
-            _outgoing.slide();
+        if (connection.getLocalState() == EndpointState.UNINITIALIZED)
+        {
+            connection.open();
+        }
 
-            for (Session session : new Sessions(connection, UNINIT, ANY))
-            {
-                session.open();
-                _logger.log(Level.FINE, "Opened session " + session);
-            }
-            for (Link link : new Links(connection, UNINIT, ANY))
+        Delivery delivery = connection.getWorkHead();
+        while (delivery != null)
+        {
+            if (delivery.getLink() instanceof Sender && delivery.isUpdated())
             {
-                //TODO: the following is not correct; should only copy those 
properties that we understand
-                link.setSource(link.getRemoteSource());
-                link.setTarget(link.getRemoteTarget());
-                link.open();
-                _logger.log(Level.FINE, "Opened link " + link);
+                delivery.disposition(delivery.getRemoteState());
             }
+            //TODO: delivery.clear(); What's the equivalent in java?
+            delivery = delivery.getWorkNext();
+        }
+        _outgoing.slide();
 
-            distributeCredit();
+        for (Session session : new Sessions(connection, UNINIT, ANY))
+        {
+            session.open();
+            _logger.log(Level.FINE, "Opened session " + session);
+        }
+        for (Link link : new Links(connection, UNINIT, ANY))
+        {
+            //TODO: the following is not correct; should only copy those 
properties that we understand
+            link.setSource(link.getRemoteSource());
+            link.setTarget(link.getRemoteTarget());
+            link.open();
+            _logger.log(Level.FINE, "Opened link " + link);
+        }
 
-            for (Link link : new Links(connection, ACTIVE, CLOSED))
-            {
-                link.close();
-            }
-            for (Session session : new Sessions(connection, ACTIVE, CLOSED))
-            {
-                session.close();
-            }
-            if (connection.getRemoteState() == EndpointState.CLOSED)
+        distributeCredit();
+
+        for (Link link : new Links(connection, ACTIVE, CLOSED))
+        {
+            link.close();
+        }
+        for (Session session : new Sessions(connection, ACTIVE, CLOSED))
+        {
+            session.close();
+        }
+        if (connection.getRemoteState() == EndpointState.CLOSED)
+        {
+            if (connection.getLocalState() == EndpointState.ACTIVE)
             {
-                if (connection.getLocalState() == EndpointState.ACTIVE)
-                {
-                    connection.close();
-                }
+                connection.close();
             }
+        }
 
-            if (c.isClosed())
+        if (c.isClosed())
+        {
+            reclaimCredit(connection);
+            c.destroy();
+            // XXX: could we do this once at the end of the loop
+            // instead of every time we reclaim?
+            distributeCredit();
+        }
+        else
+        {
+            try
             {
-                reclaimCredit(connection);
-                c.destroy();
-                // XXX: could we do this once at the end of the loop
-                // instead of every time we reclaim?
-                distributeCredit();
+                c.process();
             }
-            else
+            catch (IOException e)
             {
-                try
-                {
-                    c.process();
-                }
-                catch (IOException e)
-                {
-                    _logger.log(Level.SEVERE, "Error processing connection", 
e);
-                }
+                _logger.log(Level.SEVERE, "Error processing connection", e);
             }
         }
     }

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Thu Aug 15 
13:17:42 2013
@@ -29,7 +29,7 @@ class Test(common.Test):
     self.server_credit = 10
     self.server_received = 0
     self.server = Messenger("server")
-    self.server.timeout = int(self.timeout*1000)
+    self.server.timeout = self.timeout
     self.server.start()
     self.server.subscribe("amqp://~0.0.0.0:12345")
     self.server_thread = Thread(name="server-thread", target=self.run_server)
@@ -39,7 +39,7 @@ class Test(common.Test):
     self.server_thread_started = False
 
     self.client = Messenger("client")
-    self.client.timeout = int(self.timeout*1000)
+    self.client.timeout = self.timeout
 
   def start(self):
     self.server_thread_started = True
@@ -276,10 +276,6 @@ class MessengerTest(Test):
       assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
 
   def testIncomingQueueBiggerThanWindow(self, size=10):
-    if IMPLEMENTATION_LANGUAGE == "Java":
-      # Currently fails with proton-j. See 
https://issues.apache.org/jira/browse/PROTON-315
-      raise Skipped
-
     self.server.outgoing_window = size
     self.client.incoming_window = size
     self.start()
@@ -583,8 +579,8 @@ class MessengerTest(Test):
 class NBMessengerTest(common.Test):
 
   def setup(self):
-    self.client = Messenger()
-    self.server = Messenger()
+    self.client = Messenger("client")
+    self.server = Messenger("server")
     self.client.blocking = False
     self.server.blocking = False
     self.server.start()
@@ -592,7 +588,10 @@ class NBMessengerTest(common.Test):
     self.address = "amqp://0.0.0.0:12345"
     self.server.subscribe("amqp://~0.0.0.0:12345")
 
-  def pump(self):
+  def pump(self, timeout=0):
+    while self.client.work(0) or self.server.work(0): pass
+    self.client.work(timeout)
+    self.server.work(timeout)
     while self.client.work(0) or self.server.work(0): pass
 
   def teardown(self):
@@ -641,3 +640,23 @@ class NBMessengerTest(common.Test):
         break
 
     assert self.client.outgoing > 0
+
+  def testRecvBeforeSubscribe(self):
+    self.client.recv()
+    self.client.subscribe(self.address + "/foo")
+
+    self.pump()
+
+    msg = Message()
+    msg.address = "amqp://client/foo"
+    msg.body = "Hello World!"
+    self.server.put(msg)
+
+    assert self.client.incoming == 0
+    self.pump(self.delay)
+    assert self.client.incoming == 1
+
+    msg2 = Message()
+    self.client.get(msg2)
+    assert msg2.address == msg.address
+    assert msg2.body == msg.body



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to