Author: rajith
Date: Mon Jun 23 18:57:54 2014
New Revision: 1604907
URL: http://svn.apache.org/r1604907
Log:
PROTON-589
Fixed test failures and cleaned up the code.
A few more improvements could be made.
Currently 2 SSL tests are failing, but not sure if they are genuine test
failures.
Modified:
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Modified:
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1604907&r1=1604906&r2=1604907&view=diff
==============================================================================
---
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
(original)
+++
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Mon Jun 23 18:57:54 2014
@@ -237,14 +237,13 @@ public class MessengerImpl implements Me
//close all connections.
for (SelectableImpl sel : _selectables)
{
- SelectableImpl s = (SelectableImpl)sel;
- Connection connection = s.getConnection();
+ Connection connection = sel.getConnection();
connection.close();
- if (!_passive && !s.getNetworkConnection().isClosed())
+ if (!_passive && !sel.getNetworkConnection().isClosed())
{
- s.getNetworkConnection().registerForWriteEvents(true);
+ sel.getNetworkConnection().registerForWriteEvents(true);
}
- s.markClosed();
+ sel.markClosed();
}
waitUntil(_allClosed);
@@ -796,10 +795,6 @@ public class MessengerImpl implements Me
}
}
ConnectionContext ctx = (ConnectionContext)connection.getContext();
- if (!_passive)
- {
-
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
- }
}
private void processSession(Session session)
@@ -820,10 +815,6 @@ public class MessengerImpl implements Me
}
}
ConnectionContext ctx =
(ConnectionContext)session.getConnection().getContext();
- if (!_passive)
- {
-
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
- }
}
private void processLink(Link link)
@@ -851,10 +842,6 @@ public class MessengerImpl implements Me
}
}
ConnectionContext ctx =
(ConnectionContext)link.getSession().getConnection().getContext();
- if (!_passive)
- {
-
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
- }
}
private void processFlow(Link link)
@@ -863,11 +850,6 @@ public class MessengerImpl implements Me
{
pumpOut(link.getTarget().getAddress(), (Sender)link);
}
- ConnectionContext ctx =
(ConnectionContext)link.getSession().getConnection().getContext();
- if (!_passive)
- {
-
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
- }
}
private void processDelivery(Delivery delivery)
@@ -889,11 +871,6 @@ public class MessengerImpl implements Me
}
delivery.clear();
- ConnectionContext ctx =
(ConnectionContext)link.getSession().getConnection().getContext();
- if (!_passive)
- {
-
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
- }
}
private boolean waitUntil(Predicate condition) throws TimeoutException
@@ -913,14 +890,18 @@ public class MessengerImpl implements Me
}
private boolean waitUntil(Predicate condition, long timeout)
- {
+ {
+ if (!_passive)
+ {
+ processEvents();
+ processPendingSelectables();
+ }
processEvents();
if (_passive)
{
return condition.test();
}
-
// wait until timeout expires or until test is true
long now = System.currentTimeMillis();
final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
@@ -949,7 +930,7 @@ public class MessengerImpl implements Me
long wakeup = (_next_drain > now) ? _next_drain - now : 0;
remaining = (remaining == -1) ? wakeup : Math.min(remaining,
wakeup);
}
- processPendingSelectables(remaining);
+ waitOnIOEvents(remaining);
processEvents();
if (_interrupted.get())
{
@@ -963,7 +944,28 @@ public class MessengerImpl implements Me
}
// Used when passive mode is false.
- private void processPendingSelectables(long timeout)
+ private void processPendingSelectables()
+ {
+ //Iterate through the Selectables and read/write if required.
+ Iterator<SelectableImpl> it = _selectables.iterator();
+ while (it.hasNext())
+ {
+ SelectableImpl sel = it.next();
+ if (sel.isCompleted())
+ {
+ it.remove();
+ continue;
+ }
+ connectionWritable(sel);
+ connectionReadable(sel);
+ if (sel.isCompleted())
+ {
+ it.remove();
+ }
+ }
+ }
+
+ private void waitOnIOEvents(long timeout)
{
try
{
@@ -1105,11 +1107,6 @@ public class MessengerImpl implements Me
_credited.add(link);
// flow changed, must process it
- ConnectionContext ctx = (ConnectionContext)
link.getSession().getConnection().getContext();
- if (!_passive)
- {
-
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
- }
}
if (_blocked.isEmpty())
@@ -1695,8 +1692,6 @@ public class MessengerImpl implements Me
*/
void inboundConnection(Listener listener, IoConnection networkConnection)
{
- //System.out.println("inboundConnection
............................................"+ _name);
-
_worked = true;
Connection connection = Proton.connection();
connection.collect(_collector);
@@ -1731,13 +1726,23 @@ public class MessengerImpl implements Me
void connectionReadable(SelectableImpl selectable)
{
- //System.out.println("connectionReadable
............................................" + _name);
-
- _worked = true;
+ if (selectable.isCompleted())
+ {
+ return;
+ }
IoConnection networkConnection = selectable.getNetworkConnection();
SelectableImpl sel = (SelectableImpl) selectable;
Transport transport = sel.getTransport();
- ByteBuffer tail = transport.tail();
+ ByteBuffer tail = null;
+ try
+ {
+ tail = transport.tail();
+ }
+ catch (Exception e1)
+ {
+ connectionClosed(sel);
+ return;
+ }
try
{
int read = networkConnection.read(tail);
@@ -1746,6 +1751,10 @@ public class MessengerImpl implements Me
connectionClosed(sel);
return;
}
+ if (read > 0)
+ {
+ _worked = true;
+ }
}
catch (IOException e)
{
@@ -1760,15 +1769,13 @@ public class MessengerImpl implements Me
{
_logger.log(Level.SEVERE, this + " error processing input", e);
}
- networkConnection.registerForReadEvents(transport.capacity() > 0);
+ //Currently it doesn't work without the following. But this is best
handled elsewhere.
+ //Need to investigate and find the best way of handling it/
networkConnection.registerForWriteEvents(true);
}
void connectionWritable(SelectableImpl selectable)
{
- //System.out.println("connectionWritable
............................................" + _name);
-
- _worked = true;
IoConnection networkConnection = selectable.getNetworkConnection();
SelectableImpl sel = (SelectableImpl) selectable;
Transport transport = sel.getTransport();
@@ -1785,11 +1792,14 @@ public class MessengerImpl implements Me
}
catch (IOException e)
{
- _logger.log(Level.SEVERE, this + " error writing to the
file descriptor", e);
+ _logger.log(Level.SEVERE, this + " error writing to
network connection : " + e.getMessage(), e);
+ networkConnection.registerForWriteEvents(false);
+ return;
// Need to throw the exception as well.
}
if (wrote > 0)
{
+ _worked = true;
transport.pop(wrote);
}
else
@@ -1799,7 +1809,7 @@ public class MessengerImpl implements Me
}
networkConnection.registerForWriteEvents(transport.pending() > 0);
- if (selectable.isClosed() && !_passive)
+ if (selectable.isClosed())
{
try
{
@@ -1836,6 +1846,5 @@ public class MessengerImpl implements Me
}
}
sel.markCompleted();
- _selectables.remove(sel);
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]