Author: rhs
Date: Thu Jul 17 19:52:02 2014
New Revision: 1611453

URL: http://svn.apache.org/r1611453
Log:
fixed logic for clearing the transport work queue

Modified:
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java

Modified: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java?rev=1611453&r1=1611452&r2=1611453&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
 Thu Jul 17 19:52:02 2014
@@ -221,6 +221,6 @@ public abstract class EndpointImpl imple
     @Override
     public String toString()
     {
-        return "EndpointImpl(" + System.identityHashCode(this) + ") 
[_localState=" + _localState + ", _remoteState=" + _remoteState + ", 
_localError=" + _localError + ", _remoteError=" + _remoteError + "]";
+        return super.toString() + "[_localState=" + _localState + ", 
_remoteState=" + _remoteState + ", _localError=" + _localError + ", 
_remoteError=" + _remoteError + "]";
     }
 }

Modified: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1611453&r1=1611452&r2=1611453&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
 Thu Jul 17 19:52:02 2014
@@ -352,38 +352,41 @@ public class TransportImpl extends Endpo
                        && transportLink.isLocalHandleSet()
                        && !_isCloseSent)
                     {
-                        if(!(link instanceof SenderImpl)
-                           || link.getQueued() == 0
-                           || transportLink.detachReceived()
-                           || transportSession.endReceived()
-                           || _closeReceived)
-                        {
-                            UnsignedInteger localHandle = 
transportLink.getLocalHandle();
-                            transportLink.clearLocalHandle();
-                            transportSession.freeLocalHandle(localHandle);
+                        if((link instanceof SenderImpl)
+                           && link.getQueued() > 0
+                           && !transportLink.detachReceived()
+                           && !transportSession.endReceived()
+                           && !_closeReceived) {
+                            endpoint = endpoint.transportNext();
+                            continue;
+                        }
 
+                        UnsignedInteger localHandle = 
transportLink.getLocalHandle();
+                        transportLink.clearLocalHandle();
+                        transportSession.freeLocalHandle(localHandle);
 
-                            Detach detach = new Detach();
-                            detach.setHandle(localHandle);
-                            // TODO - need an API for detaching rather than 
closing the link
-                            detach.setClosed(true);
 
-                            ErrorCondition localError = link.getCondition();
-                            if( localError.getCondition() !=null )
-                            {
-                                detach.setError(localError);
-                            }
+                        Detach detach = new Detach();
+                        detach.setHandle(localHandle);
+                        // TODO - need an API for detaching rather than 
closing the link
+                        detach.setClosed(true);
+
+                        ErrorCondition localError = link.getCondition();
+                        if( localError.getCondition() !=null )
+                        {
+                            detach.setError(localError);
+                        }
 
 
-                            writeFrame(transportSession.getLocalChannel(), 
detach, null, null);
-                            endpoint.clearModified();
+                        writeFrame(transportSession.getLocalChannel(), detach, 
null, null);
 
-                            // TODO - temporary hack for PROTON-154, this line 
should be removed and replaced
-                            //        with proper handling for closed links
-                            link.free();
-                        }
+                        // TODO - temporary hack for PROTON-154, this line 
should be removed and replaced
+                        //        with proper handling for closed links
+                        link.free();
                     }
 
+                    endpoint.clearModified();
+
                 }
                 endpoint = endpoint.transportNext();
             }
@@ -429,8 +432,6 @@ public class TransportImpl extends Endpo
                         sender.setDrained(0);
 
                         writeFlow(transportSession, transportLink);
-
-                        endpoint.clearModified();
                     }
 
                 }
@@ -614,10 +615,6 @@ public class TransportImpl extends Endpo
                         {
                             transportLink.addCredit(credits);
                             writeFlow(transportSession, transportLink);
-                            if(receiver.getLocalState() == 
EndpointState.ACTIVE)
-                            {
-                                endpoint.clearModified();
-                            }
                         }
                     }
                 }
@@ -707,10 +704,6 @@ public class TransportImpl extends Endpo
 
                             writeFrame(transportSession.getLocalChannel(), 
attach, null, null);
                             transportLink.sentAttach();
-                            if(link.getLocalState() == EndpointState.ACTIVE && 
(link instanceof SenderImpl || !link.hasCredit()))
-                            {
-                                endpoint.clearModified();
-                            }
                         }
                     }
                 }
@@ -780,10 +773,6 @@ public class TransportImpl extends Endpo
 
                         writeFrame(channelId, begin, null, null);
                         transportSession.sentBegin();
-                        if(session.getLocalState() == EndpointState.ACTIVE)
-                        {
-                            endpoint.clearModified();
-                        }
                     }
                 }
                 endpoint = endpoint.transportNext();
@@ -847,21 +836,27 @@ public class TransportImpl extends Endpo
                 SessionImpl session;
                 TransportSession transportSession;
 
-                if((endpoint instanceof SessionImpl)
-                   && (session = (SessionImpl)endpoint).getLocalState() == 
EndpointState.CLOSED
-                   && (transportSession = 
session.getTransportSession()).isLocalChannelSet()
-                   && !hasSendableMessages(session)
-                   && !_isCloseSent)
-                {
-                    int channel = freeLocalChannel(transportSession);
-                    End end = new End();
-                    ErrorCondition localError = endpoint.getCondition();
-                    if( localError.getCondition() !=null )
+                if((endpoint instanceof SessionImpl)) {
+                    if ((session = (SessionImpl)endpoint).getLocalState() == 
EndpointState.CLOSED
+                        && (transportSession = 
session.getTransportSession()).isLocalChannelSet()
+                        && !_isCloseSent)
                     {
-                        end.setError(localError);
+                        if (hasSendableMessages(session)) {
+                            endpoint = endpoint.transportNext();
+                            continue;
+                        }
+
+                        int channel = freeLocalChannel(transportSession);
+                        End end = new End();
+                        ErrorCondition localError = endpoint.getCondition();
+                        if( localError.getCondition() !=null )
+                        {
+                            end.setError(localError);
+                        }
+
+                        writeFrame(channel, end, null, null);
                     }
 
-                    writeFrame(channel, end, null, null);
                     endpoint.clearModified();
                 }
 
@@ -911,6 +906,7 @@ public class TransportImpl extends Endpo
                 _isCloseSent = true;
 
                 writeFrame(0, close, null, null);
+                _connectionEndpoint.clearModified();
             }
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to