Author: rhs
Date: Thu Aug 22 16:04:14 2013
New Revision: 1516495
URL: http://svn.apache.org/r1516495
Log:
fixed java driver stall; fixed java messenger to clean up the driver after
stopping; added useful illegal state exceptions to the java messenger impl
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1516495&r1=1516494&r2=1516495&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
Thu Aug 22 16:04:14 2013
@@ -58,6 +58,7 @@ class ConnectorImpl<C> implements Connec
private boolean _closed = false;
private boolean _selected = false;
+ private boolean _readAllowed = false;
ConnectorImpl(DriverImpl driver, Listener<C> listener, SocketChannel c, C
context, SelectionKey key)
{
@@ -73,6 +74,7 @@ class ConnectorImpl<C> implements Connec
if (!_selected) {
_selected = true;
_driver.selectConnector(this);
+ _readAllowed = true;
}
}
@@ -110,6 +112,8 @@ class ConnectorImpl<C> implements Connec
private boolean read() throws IOException
{
+ if (!_readAllowed) return false;
+ _readAllowed = false;
boolean processed = false;
int interest = _key.interestOps();
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1516495&r1=1516494&r2=1516495&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Thu Aug 22 16:04:14 2013
@@ -128,30 +128,31 @@ public class MessengerImpl implements Me
public void stop()
{
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to stop");
- }
- //close all connections
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- connection.close();
- }
- //stop listeners
- for (Listener<?> l : _driver.listeners())
- {
- try
+ if (_driver != null) {
+ if(_logger.isLoggable(Level.FINE))
{
- l.close();
+ _logger.fine(this + " about to stop");
}
- catch (IOException e)
+ //close all connections
+ for (Connector<?> c : _driver.connectors())
{
- _logger.log(Level.WARNING, "Error while closing listener", e);
+ Connection connection = c.getConnection();
+ connection.close();
}
+ //stop listeners
+ for (Listener<?> l : _driver.listeners())
+ {
+ try
+ {
+ l.close();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.WARNING, "Error while closing listener",
e);
+ }
+ }
+ waitUntil(_allClosed);
}
- waitUntil(_allClosed);
- //_driver.destroy();
}
public boolean stopped()
@@ -161,17 +162,24 @@ public class MessengerImpl implements Me
public boolean work(long timeout)
{
+ if (_driver == null) { return false; }
_worked = false;
return waitUntil(_workPred, timeout);
}
public void interrupt()
{
- _driver.wakeup();
+ if (_driver != null) {
+ _driver.wakeup();
+ }
}
public void put(Message m) throws MessengerException
{
+ if (_driver == null) {
+ throw new IllegalStateException("cannot put while messenger is
stopped");
+ }
+
if(_logger.isLoggable(Level.FINE))
{
_logger.fine(this + " about to put message: " + m);
@@ -218,6 +226,10 @@ public class MessengerImpl implements Me
public void send(int n) throws TimeoutException
{
+ if (_driver == null) {
+ throw new IllegalStateException("cannot send while messenger is
stopped");
+ }
+
if(_logger.isLoggable(Level.FINE))
{
_logger.fine(this + " about to send");
@@ -227,6 +239,10 @@ public class MessengerImpl implements Me
public void recv(int n) throws TimeoutException
{
+ if (_driver == null) {
+ throw new IllegalStateException("cannot recv while messenger is
stopped");
+ }
+
if(_logger.isLoggable(Level.FINE))
{
_logger.fine(this + " about to wait for up to " + n + " messages
to be received");
@@ -250,28 +266,30 @@ public class MessengerImpl implements Me
public Message get()
{
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- _logger.log(Level.FINE, "Attempting to get message from " +
connection);
- Delivery delivery = connection.getWorkHead();
- while (delivery != null)
- {
- if (delivery.isReadable() && !delivery.isPartial())
- {
- _logger.log(Level.FINE, "Readable delivery found: " +
delivery);
- int size = read((Receiver) delivery.getLink());
- Message message = Proton.message();
- message.decode(_buffer, 0, size);
- delivery.getLink().advance();
- _incoming.add(delivery);
- _distributed--;
- return message;
- }
- else
+ if (_driver != null) {
+ for (Connector<?> c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ _logger.log(Level.FINE, "Attempting to get message from " +
connection);
+ Delivery delivery = connection.getWorkHead();
+ while (delivery != null)
{
- _logger.log(Level.FINE, "Delivery not readable: " +
delivery);
- delivery = delivery.getWorkNext();
+ if (delivery.isReadable() && !delivery.isPartial())
+ {
+ _logger.log(Level.FINE, "Readable delivery found: " +
delivery);
+ int size = read((Receiver) delivery.getLink());
+ Message message = Proton.message();
+ message.decode(_buffer, 0, size);
+ delivery.getLink().advance();
+ _incoming.add(delivery);
+ _distributed--;
+ return message;
+ }
+ else
+ {
+ _logger.log(Level.FINE, "Delivery not readable: " +
delivery);
+ delivery = delivery.getWorkNext();
+ }
}
}
}
@@ -280,6 +298,10 @@ public class MessengerImpl implements Me
public void subscribe(String source) throws MessengerException
{
+ if (_driver == null) {
+ throw new IllegalStateException("messenger is stopped");
+ }
+
//the following is not safe or accurate, but it appears '~' is
//invalid as the start of the hostname and URI can't handle
//it, so this is a quick hack to avoid rewriting the parsing
@@ -378,18 +400,20 @@ public class MessengerImpl implements Me
private int queued(boolean outgoing)
{
int count = 0;
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- for (Link link : new Links(connection, ACTIVE, ANY))
+ if (_driver != null) {
+ for (Connector<?> c : _driver.connectors())
{
- if (outgoing)
- {
- if (link instanceof Sender) count += link.getQueued();
- }
- else
+ Connection connection = c.getConnection();
+ for (Link link : new Links(connection, ACTIVE, ANY))
{
- if (link instanceof Receiver) count += link.getQueued();
+ if (outgoing)
+ {
+ if (link instanceof Sender) count += link.getQueued();
+ }
+ else
+ {
+ if (link instanceof Receiver) count +=
link.getQueued();
+ }
}
}
}
@@ -573,6 +597,10 @@ public class MessengerImpl implements Me
private boolean waitUntil(Predicate condition, long timeout)
{
+ if (_driver == null) {
+ throw new IllegalStateException("cannot wait while messenger is
stopped");
+ }
+
processAllConnectors();
// wait until timeout expires or until test is true
@@ -634,6 +662,7 @@ public class MessengerImpl implements Me
// @todo track the number of opened receive links
for (Connector<?> c : _driver.connectors())
{
+ if (c.isClosed()) continue;
Connection connection = c.getConnection();
for (Link link : new Links(connection, ACTIVE, ANY))
{
@@ -655,6 +684,7 @@ public class MessengerImpl implements Me
int batch = (_credit < linkCt) ? 1 : (_credit/linkCt);
for (Connector<?> c : _driver.connectors())
{
+ if (c.isClosed()) continue;
Connection connection = c.getConnection();
for (Link link : new Links(connection, ACTIVE, ANY))
{
@@ -767,11 +797,19 @@ public class MessengerImpl implements Me
{
public boolean test()
{
+ if (_driver == null) {
+ return true;
+ }
+
for (Connector<?> c : _driver.connectors()) {
if (!c.isClosed()) {
return false;
}
}
+
+ _driver.destroy();
+ _driver = null;
+
return true;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]