Author: rhs
Date: Thu Jul 17 19:52:02 2014
New Revision: 1611453
URL: http://svn.apache.org/r1611453
Log:
fixed logic for clearing the transport work queue
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java?rev=1611453&r1=1611452&r2=1611453&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
Thu Jul 17 19:52:02 2014
@@ -221,6 +221,6 @@ public abstract class EndpointImpl imple
@Override
public String toString()
{
- return "EndpointImpl(" + System.identityHashCode(this) + ")
[_localState=" + _localState + ", _remoteState=" + _remoteState + ",
_localError=" + _localError + ", _remoteError=" + _remoteError + "]";
+ return super.toString() + "[_localState=" + _localState + ",
_remoteState=" + _remoteState + ", _localError=" + _localError + ",
_remoteError=" + _remoteError + "]";
}
}
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1611453&r1=1611452&r2=1611453&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Thu Jul 17 19:52:02 2014
@@ -352,38 +352,41 @@ public class TransportImpl extends Endpo
&& transportLink.isLocalHandleSet()
&& !_isCloseSent)
{
- if(!(link instanceof SenderImpl)
- || link.getQueued() == 0
- || transportLink.detachReceived()
- || transportSession.endReceived()
- || _closeReceived)
- {
- UnsignedInteger localHandle =
transportLink.getLocalHandle();
- transportLink.clearLocalHandle();
- transportSession.freeLocalHandle(localHandle);
+ if((link instanceof SenderImpl)
+ && link.getQueued() > 0
+ && !transportLink.detachReceived()
+ && !transportSession.endReceived()
+ && !_closeReceived) {
+ endpoint = endpoint.transportNext();
+ continue;
+ }
+ UnsignedInteger localHandle =
transportLink.getLocalHandle();
+ transportLink.clearLocalHandle();
+ transportSession.freeLocalHandle(localHandle);
- Detach detach = new Detach();
- detach.setHandle(localHandle);
- // TODO - need an API for detaching rather than
closing the link
- detach.setClosed(true);
- ErrorCondition localError = link.getCondition();
- if( localError.getCondition() !=null )
- {
- detach.setError(localError);
- }
+ Detach detach = new Detach();
+ detach.setHandle(localHandle);
+ // TODO - need an API for detaching rather than
closing the link
+ detach.setClosed(true);
+
+ ErrorCondition localError = link.getCondition();
+ if( localError.getCondition() !=null )
+ {
+ detach.setError(localError);
+ }
- writeFrame(transportSession.getLocalChannel(),
detach, null, null);
- endpoint.clearModified();
+ writeFrame(transportSession.getLocalChannel(), detach,
null, null);
- // TODO - temporary hack for PROTON-154, this line
should be removed and replaced
- // with proper handling for closed links
- link.free();
- }
+ // TODO - temporary hack for PROTON-154, this line
should be removed and replaced
+ // with proper handling for closed links
+ link.free();
}
+ endpoint.clearModified();
+
}
endpoint = endpoint.transportNext();
}
@@ -429,8 +432,6 @@ public class TransportImpl extends Endpo
sender.setDrained(0);
writeFlow(transportSession, transportLink);
-
- endpoint.clearModified();
}
}
@@ -614,10 +615,6 @@ public class TransportImpl extends Endpo
{
transportLink.addCredit(credits);
writeFlow(transportSession, transportLink);
- if(receiver.getLocalState() ==
EndpointState.ACTIVE)
- {
- endpoint.clearModified();
- }
}
}
}
@@ -707,10 +704,6 @@ public class TransportImpl extends Endpo
writeFrame(transportSession.getLocalChannel(),
attach, null, null);
transportLink.sentAttach();
- if(link.getLocalState() == EndpointState.ACTIVE &&
(link instanceof SenderImpl || !link.hasCredit()))
- {
- endpoint.clearModified();
- }
}
}
}
@@ -780,10 +773,6 @@ public class TransportImpl extends Endpo
writeFrame(channelId, begin, null, null);
transportSession.sentBegin();
- if(session.getLocalState() == EndpointState.ACTIVE)
- {
- endpoint.clearModified();
- }
}
}
endpoint = endpoint.transportNext();
@@ -847,21 +836,27 @@ public class TransportImpl extends Endpo
SessionImpl session;
TransportSession transportSession;
- if((endpoint instanceof SessionImpl)
- && (session = (SessionImpl)endpoint).getLocalState() ==
EndpointState.CLOSED
- && (transportSession =
session.getTransportSession()).isLocalChannelSet()
- && !hasSendableMessages(session)
- && !_isCloseSent)
- {
- int channel = freeLocalChannel(transportSession);
- End end = new End();
- ErrorCondition localError = endpoint.getCondition();
- if( localError.getCondition() !=null )
+ if((endpoint instanceof SessionImpl)) {
+ if ((session = (SessionImpl)endpoint).getLocalState() ==
EndpointState.CLOSED
+ && (transportSession =
session.getTransportSession()).isLocalChannelSet()
+ && !_isCloseSent)
{
- end.setError(localError);
+ if (hasSendableMessages(session)) {
+ endpoint = endpoint.transportNext();
+ continue;
+ }
+
+ int channel = freeLocalChannel(transportSession);
+ End end = new End();
+ ErrorCondition localError = endpoint.getCondition();
+ if( localError.getCondition() !=null )
+ {
+ end.setError(localError);
+ }
+
+ writeFrame(channel, end, null, null);
}
- writeFrame(channel, end, null, null);
endpoint.clearModified();
}
@@ -911,6 +906,7 @@ public class TransportImpl extends Endpo
_isCloseSent = true;
writeFrame(0, close, null, null);
+ _connectionEndpoint.clearModified();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]