Author: rgodfrey
Date: Tue Jul 22 01:00:18 2014
New Revision: 1612441

URL: http://svn.apache.org/r1612441
Log:
QPID-5400 : stop waiting for SASL completion if the connection is closed for 
input

Modified:
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java?rev=1612441&r1=1612440&r2=1612441&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
 Tue Jul 22 01:00:18 2014
@@ -20,16 +20,17 @@
  */
 package org.apache.qpid.amqp_1_0.framing;
 
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
 
-import java.nio.ByteBuffer;
-
 public class AMQPProtocolHeaderHandler implements ProtocolHandler
 {
     private ConnectionEndpoint _connection;
     private static final byte MAJOR_VERSION = (byte) 1;
     private static final byte MINOR_VERSION = (byte) 0;
+    private boolean _done;
 
     enum State {
         AWAITING_MAJOR,
@@ -53,13 +54,13 @@ public class AMQPProtocolHeaderHandler i
             {
                 case AWAITING_MAJOR:
                     _state = in.get() == MAJOR_VERSION ? State.AWAITING_MINOR 
: State.ERROR;
-                    if(!in.hasRemaining())
+                    if(_state == State.ERROR || !in.hasRemaining())
                     {
                         break;
                     }
                 case AWAITING_MINOR:
                     _state = in.get() == MINOR_VERSION ? State.AWAITING_MINOR 
: State.ERROR;
-                    if(!in.hasRemaining())
+                    if(_state == State.ERROR || !in.hasRemaining())
                     {
                         break;
                     }
@@ -67,11 +68,13 @@ public class AMQPProtocolHeaderHandler i
                     byte revision = in.get();
                     _connection.protocolHeaderReceived(MAJOR_VERSION, 
MINOR_VERSION, revision);
                     ProtocolHandler handler = new FrameHandler(_connection);
+                    _done = true;
                     return handler.parse(in);
             }
         }
         if(_state == State.ERROR)
         {
+            _done = true;
             _connection.invalidHeaderReceived();
         }
         return this;
@@ -80,6 +83,6 @@ public class AMQPProtocolHeaderHandler i
 
     public boolean isDone()
     {
-        return _state != State.ERROR && !_connection.closedForInput();
+        return _done || _connection.closedForInput();
     }
 }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java?rev=1612441&r1=1612440&r2=1612441&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
 Tue Jul 22 01:00:18 2014
@@ -19,16 +19,17 @@
 
 package org.apache.qpid.amqp_1_0.framing;
 
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
 
-import java.nio.ByteBuffer;
-
 public class SASLProtocolHeaderHandler implements ProtocolHandler
 {
     private ConnectionEndpoint _connection;
     private static final byte MAJOR_VERSION = (byte) 1;
     private static final byte MINOR_VERSION = (byte) 0;
+    private boolean _done;
 
     enum State {
         AWAITING_MAJOR,
@@ -54,26 +55,30 @@ public class SASLProtocolHeaderHandler i
             {
                 case AWAITING_MAJOR:
                     _state = in.get() == MAJOR_VERSION ? State.AWAITING_MINOR 
: State.ERROR;
-                    if(!in.hasRemaining())
+                    if(_state == State.ERROR || !in.hasRemaining())
                     {
+                        _done = true;
                         break;
                     }
                 case AWAITING_MINOR:
                     _state = in.get() == MINOR_VERSION ? State.AWAITING_MINOR 
: State.ERROR;
-                    if(!in.hasRemaining())
+                    if(_state == State.ERROR || !in.hasRemaining())
                     {
+                        _done = true;
                         break;
                     }
                 case AWAITING_REVISION:
                     byte revision = in.get();
                     _connection.protocolHeaderReceived(MAJOR_VERSION, 
MINOR_VERSION, revision);
                     ProtocolHandler handler = new 
SASLFrameHandler(_connection);
+                    _done = true;
                     return handler.parse(in);
             }
         }
         if(_state == State.ERROR)
         {
             _connection.invalidHeaderReceived();
+
         }
         return this;
 
@@ -81,6 +86,6 @@ public class SASLProtocolHeaderHandler i
 
     public boolean isDone()
     {
-        return _state != State.ERROR && !_connection.closedForInput();
+        return _done || _connection.closedForInput();
     }
 }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1612441&r1=1612440&r2=1612441&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
 Tue Jul 22 01:00:18 2014
@@ -181,7 +181,7 @@ public class ConnectionEndpoint implemen
         {
             synchronized (getLock())
             {
-                while (!_saslComplete)
+                while (!(_saslComplete || _closedForInput))
                 {
                     try
                     {
@@ -711,8 +711,7 @@ public class ConnectionEndpoint implemen
 
     public void invalidHeaderReceived()
     {
-        // TODO
-        _closedForInput = true;
+        setClosedForInput(true);
     }
 
     public synchronized boolean closedForInput()



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to