Author: rhs
Date: Thu Aug 22 12:14:20 2013
New Revision: 1516427
URL: http://svn.apache.org/r1516427
Log:
Fixed hang in Messenger.stop(); added recv() to Messenger interface.
Modified:
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Modified:
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java?rev=1516427&r1=1516426&r2=1516427&view=diff
==============================================================================
---
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
(original)
+++
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
Thu Aug 22 12:14:20 2013
@@ -100,6 +100,12 @@ class JNIMessenger implements Messenger
}
@Override
+ public void recv() throws TimeoutException
+ {
+ recv(-1);
+ }
+
+ @Override
public void recv(final int count) throws TimeoutException
{
int err = Proton.pn_messenger_recv(_impl, count);
Modified:
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1516427&r1=1516426&r2=1516427&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
(original)
+++
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
Thu Aug 22 12:14:20 2013
@@ -106,6 +106,12 @@ public interface Messenger
*/
void subscribe(String source) throws MessengerException;
/**
+ * Receives an arbitrary number of messages into the
+ * incoming queue of the Messenger. This method will block until
+ * at least one message is available or the operation times out.
+ */
+ void recv() throws TimeoutException;
+ /**
* Receives up to the specified number of messages into the
* incoming queue of the Messenger. This method will block until
* at least one message is available or the operation times out.
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1516427&r1=1516426&r2=1516427&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Thu Aug 22 12:14:20 2013
@@ -137,14 +137,6 @@ public class MessengerImpl implements Me
{
Connection connection = c.getConnection();
connection.close();
- try
- {
- c.process();
- }
- catch (IOException e)
- {
- _logger.log(Level.WARNING, "Error while sending close", e);
- }
}
//stop listeners
for (Listener<?> l : _driver.listeners())
@@ -246,6 +238,11 @@ public class MessengerImpl implements Me
waitUntil(_messageAvailable);
}
+ public void recv() throws TimeoutException
+ {
+ recv(-1);
+ }
+
public int receiving()
{
return _receiving;
@@ -435,6 +432,7 @@ public class MessengerImpl implements Me
distributeCredit();
for (Connector<?> c : _driver.connectors())
{
+ processEndpoints(c);
try
{
if (c.process()) {
@@ -445,7 +443,6 @@ public class MessengerImpl implements Me
{
_logger.log(Level.SEVERE, "Error processing connection", e);
}
- processEndpoints(c);
}
bringDestruction();
distributeCredit();
@@ -483,6 +480,22 @@ public class MessengerImpl implements Me
_logger.log(Level.SEVERE, "Error processing connection", e);
}
processEndpoints(c);
+ if (c.isClosed())
+ {
+ _awaitingDestruction.add(c);
+ reclaimCredit(c.getConnection());
+ }
+ else
+ {
+ try
+ {
+ c.process();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Error processing connection",
e);
+ }
+ }
}
bringDestruction();
distributeCredit();
@@ -540,23 +553,6 @@ public class MessengerImpl implements Me
connection.close();
}
}
-
- if (c.isClosed())
- {
- _awaitingDestruction.add(c);
- reclaimCredit(connection);
- }
- else
- {
- try
- {
- c.process();
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- }
}
private boolean waitUntil(Predicate condition) throws TimeoutException
@@ -771,8 +767,12 @@ public class MessengerImpl implements Me
{
public boolean test()
{
- if (_driver.connectors().iterator().hasNext()) return false;
- else return true;
+ for (Connector<?> c : _driver.connectors()) {
+ if (!c.isClosed()) {
+ return false;
+ }
+ }
+ return true;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]