Author: rhs
Date: Thu Aug 15 13:17:42 2013
New Revision: 1514254
URL: http://svn.apache.org/r1514254
Log:
PROTON-393: fixed recv to not error in non blocking mode; fixed python binding
to use seconds (in floats) rather than milliseconds; fixed a stall in java
messenger
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/tests/python/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Aug 15 13:17:42
2013
@@ -304,10 +304,18 @@ will not be verified.
""")
def _get_timeout(self):
- return pn_messenger_get_timeout(self._mng)
+ t = pn_messenger_get_timeout(self._mng)
+ if t == -1:
+ return None
+ else:
+ return float(t)/1000
def _set_timeout(self, value):
- self._check(pn_messenger_set_timeout(self._mng, value))
+ if value is None:
+ t = -1
+ else:
+ t = long(1000*value)
+ self._check(pn_messenger_set_timeout(self._mng, t))
timeout = property(_get_timeout, _set_timeout,
doc="""
@@ -450,8 +458,12 @@ send. Defaults to zero.
n = -1
self._check(pn_messenger_recv(self._mng, n))
- def work(self, timeout=-1):
- err = pn_messenger_work(self._mng, timeout)
+ def work(self, timeout=None):
+ if timeout is None:
+ t = -1
+ else:
+ t = long(1000*timeout)
+ err = pn_messenger_work(self._mng, t)
if (err == PN_TIMEOUT):
return False
else:
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Aug 15 13:17:42
2013
@@ -1280,13 +1280,15 @@ int pn_messenger_send(pn_messenger_t *me
int pn_messenger_recv(pn_messenger_t *messenger, int n)
{
if (!messenger) return PN_ARG_ERR;
- if (!pn_listener_head(messenger->driver) &&
!pn_connector_head(messenger->driver))
+ if (messenger->blocking && !pn_listener_head(messenger->driver)
+ && !pn_connector_head(messenger->driver))
return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
messenger->receiving = n;
pn_messenger_flow(messenger);
int err = pn_messenger_sync(messenger, pn_messenger_rcvd);
if (err) return err;
if (!pn_messenger_incoming(messenger) &&
+ messenger->blocking &&
!pn_listener_head(messenger->driver) &&
!pn_connector_head(messenger->driver)) {
return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
(original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Thu Aug
15 13:17:42 2013
@@ -1318,7 +1318,11 @@ class Messenger(object):
def recv(self, n=-1):
self.impl.recv(n)
- def work(self, t):
+ def work(self, timeout=None):
+ if timeout is None:
+ t = -1
+ else:
+ t = long(1000*timeout)
return self.impl.work(t)
def interrupt(self):
@@ -1339,8 +1343,16 @@ class Messenger(object):
return self.impl.incoming()
def _get_timeout(self):
- return self.impl.getTimeout()
- def _set_timeout(self, t):
+ t = self.impl.getTimeout()
+ if t == -1:
+ return None
+ else:
+ return float(t)/1000
+ def _set_timeout(self, timeout):
+ if timeout is None:
+ t = -1
+ else:
+ t = long(1000*timeout)
self.impl.setTimeout(t)
timeout = property(_get_timeout, _set_timeout)
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
Thu Aug 15 13:17:42 2013
@@ -53,7 +53,6 @@ class ConnectorImpl<C> implements Connec
private SelectionKey _key;
private ConnectorState _state = UNINITIALIZED;
- private boolean _readPending = true;
private boolean _inputDone = false;
private boolean _outputDone = false;
private boolean _closed = false;
@@ -75,8 +74,6 @@ class ConnectorImpl<C> implements Connec
_selected = true;
_driver.selectConnector(this);
}
-
- _readPending = true;
}
void unselected()
@@ -91,12 +88,8 @@ class ConnectorImpl<C> implements Connec
boolean processed = false;
if (!_inputDone)
{
- if (_readPending)
- {
- if (read()) {
- processed = true;
- }
- _readPending = false;
+ if (read()) {
+ processed = true;
}
}
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Thu Aug 15 13:17:42 2013
@@ -433,6 +433,7 @@ public class MessengerImpl implements Me
{
_logger.log(Level.SEVERE, "Error processing connection", e);
}
+ processEndpoints(c);
}
}
@@ -467,75 +468,80 @@ public class MessengerImpl implements Me
} catch (IOException e) {
_logger.log(Level.SEVERE, "Error processing connection", e);
}
- Connection connection = c.getConnection();
+ processEndpoints(c);
+ }
+ }
- if (connection.getLocalState() == EndpointState.UNINITIALIZED)
- {
- connection.open();
- }
+ private void processEndpoints(Connector c)
+ {
+ Connection connection = c.getConnection();
- Delivery delivery = connection.getWorkHead();
- while (delivery != null)
- {
- if (delivery.getLink() instanceof Sender &&
delivery.isUpdated())
- {
- delivery.disposition(delivery.getRemoteState());
- }
- //TODO: delivery.clear(); What's the equivalent in java?
- delivery = delivery.getWorkNext();
- }
- _outgoing.slide();
+ if (connection.getLocalState() == EndpointState.UNINITIALIZED)
+ {
+ connection.open();
+ }
- for (Session session : new Sessions(connection, UNINIT, ANY))
- {
- session.open();
- _logger.log(Level.FINE, "Opened session " + session);
- }
- for (Link link : new Links(connection, UNINIT, ANY))
+ Delivery delivery = connection.getWorkHead();
+ while (delivery != null)
+ {
+ if (delivery.getLink() instanceof Sender && delivery.isUpdated())
{
- //TODO: the following is not correct; should only copy those
properties that we understand
- link.setSource(link.getRemoteSource());
- link.setTarget(link.getRemoteTarget());
- link.open();
- _logger.log(Level.FINE, "Opened link " + link);
+ delivery.disposition(delivery.getRemoteState());
}
+ //TODO: delivery.clear(); What's the equivalent in java?
+ delivery = delivery.getWorkNext();
+ }
+ _outgoing.slide();
- distributeCredit();
+ for (Session session : new Sessions(connection, UNINIT, ANY))
+ {
+ session.open();
+ _logger.log(Level.FINE, "Opened session " + session);
+ }
+ for (Link link : new Links(connection, UNINIT, ANY))
+ {
+ //TODO: the following is not correct; should only copy those
properties that we understand
+ link.setSource(link.getRemoteSource());
+ link.setTarget(link.getRemoteTarget());
+ link.open();
+ _logger.log(Level.FINE, "Opened link " + link);
+ }
- for (Link link : new Links(connection, ACTIVE, CLOSED))
- {
- link.close();
- }
- for (Session session : new Sessions(connection, ACTIVE, CLOSED))
- {
- session.close();
- }
- if (connection.getRemoteState() == EndpointState.CLOSED)
+ distributeCredit();
+
+ for (Link link : new Links(connection, ACTIVE, CLOSED))
+ {
+ link.close();
+ }
+ for (Session session : new Sessions(connection, ACTIVE, CLOSED))
+ {
+ session.close();
+ }
+ if (connection.getRemoteState() == EndpointState.CLOSED)
+ {
+ if (connection.getLocalState() == EndpointState.ACTIVE)
{
- if (connection.getLocalState() == EndpointState.ACTIVE)
- {
- connection.close();
- }
+ connection.close();
}
+ }
- if (c.isClosed())
+ if (c.isClosed())
+ {
+ reclaimCredit(connection);
+ c.destroy();
+ // XXX: could we do this once at the end of the loop
+ // instead of every time we reclaim?
+ distributeCredit();
+ }
+ else
+ {
+ try
{
- reclaimCredit(connection);
- c.destroy();
- // XXX: could we do this once at the end of the loop
- // instead of every time we reclaim?
- distributeCredit();
+ c.process();
}
- else
+ catch (IOException e)
{
- try
- {
- c.process();
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Error processing connection",
e);
- }
+ _logger.log(Level.SEVERE, "Error processing connection", e);
}
}
}
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1514254&r1=1514253&r2=1514254&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Thu Aug 15
13:17:42 2013
@@ -29,7 +29,7 @@ class Test(common.Test):
self.server_credit = 10
self.server_received = 0
self.server = Messenger("server")
- self.server.timeout = int(self.timeout*1000)
+ self.server.timeout = self.timeout
self.server.start()
self.server.subscribe("amqp://~0.0.0.0:12345")
self.server_thread = Thread(name="server-thread", target=self.run_server)
@@ -39,7 +39,7 @@ class Test(common.Test):
self.server_thread_started = False
self.client = Messenger("client")
- self.client.timeout = int(self.timeout*1000)
+ self.client.timeout = self.timeout
def start(self):
self.server_thread_started = True
@@ -276,10 +276,6 @@ class MessengerTest(Test):
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
def testIncomingQueueBiggerThanWindow(self, size=10):
- if IMPLEMENTATION_LANGUAGE == "Java":
- # Currently fails with proton-j. See
https://issues.apache.org/jira/browse/PROTON-315
- raise Skipped
-
self.server.outgoing_window = size
self.client.incoming_window = size
self.start()
@@ -583,8 +579,8 @@ class MessengerTest(Test):
class NBMessengerTest(common.Test):
def setup(self):
- self.client = Messenger()
- self.server = Messenger()
+ self.client = Messenger("client")
+ self.server = Messenger("server")
self.client.blocking = False
self.server.blocking = False
self.server.start()
@@ -592,7 +588,10 @@ class NBMessengerTest(common.Test):
self.address = "amqp://0.0.0.0:12345"
self.server.subscribe("amqp://~0.0.0.0:12345")
- def pump(self):
+ def pump(self, timeout=0):
+ while self.client.work(0) or self.server.work(0): pass
+ self.client.work(timeout)
+ self.server.work(timeout)
while self.client.work(0) or self.server.work(0): pass
def teardown(self):
@@ -641,3 +640,23 @@ class NBMessengerTest(common.Test):
break
assert self.client.outgoing > 0
+
+ def testRecvBeforeSubscribe(self):
+ self.client.recv()
+ self.client.subscribe(self.address + "/foo")
+
+ self.pump()
+
+ msg = Message()
+ msg.address = "amqp://client/foo"
+ msg.body = "Hello World!"
+ self.server.put(msg)
+
+ assert self.client.incoming == 0
+ self.pump(self.delay)
+ assert self.client.incoming == 1
+
+ msg2 = Message()
+ self.client.get(msg2)
+ assert msg2.address == msg.address
+ assert msg2.body == msg.body
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]