Author: rgodfrey
Date: Tue Jul 22 01:00:18 2014
New Revision: 1612441
URL: http://svn.apache.org/r1612441
Log:
QPID-5400 : stop waiting for SASL completion if the connection is closed for
input
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java?rev=1612441&r1=1612440&r2=1612441&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
Tue Jul 22 01:00:18 2014
@@ -20,16 +20,17 @@
*/
package org.apache.qpid.amqp_1_0.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import java.nio.ByteBuffer;
-
public class AMQPProtocolHeaderHandler implements ProtocolHandler
{
private ConnectionEndpoint _connection;
private static final byte MAJOR_VERSION = (byte) 1;
private static final byte MINOR_VERSION = (byte) 0;
+ private boolean _done;
enum State {
AWAITING_MAJOR,
@@ -53,13 +54,13 @@ public class AMQPProtocolHeaderHandler i
{
case AWAITING_MAJOR:
_state = in.get() == MAJOR_VERSION ? State.AWAITING_MINOR
: State.ERROR;
- if(!in.hasRemaining())
+ if(_state == State.ERROR || !in.hasRemaining())
{
break;
}
case AWAITING_MINOR:
_state = in.get() == MINOR_VERSION ? State.AWAITING_MINOR
: State.ERROR;
- if(!in.hasRemaining())
+ if(_state == State.ERROR || !in.hasRemaining())
{
break;
}
@@ -67,11 +68,13 @@ public class AMQPProtocolHeaderHandler i
byte revision = in.get();
_connection.protocolHeaderReceived(MAJOR_VERSION,
MINOR_VERSION, revision);
ProtocolHandler handler = new FrameHandler(_connection);
+ _done = true;
return handler.parse(in);
}
}
if(_state == State.ERROR)
{
+ _done = true;
_connection.invalidHeaderReceived();
}
return this;
@@ -80,6 +83,6 @@ public class AMQPProtocolHeaderHandler i
public boolean isDone()
{
- return _state != State.ERROR && !_connection.closedForInput();
+ return _done || _connection.closedForInput();
}
}
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java?rev=1612441&r1=1612440&r2=1612441&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
Tue Jul 22 01:00:18 2014
@@ -19,16 +19,17 @@
package org.apache.qpid.amqp_1_0.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import java.nio.ByteBuffer;
-
public class SASLProtocolHeaderHandler implements ProtocolHandler
{
private ConnectionEndpoint _connection;
private static final byte MAJOR_VERSION = (byte) 1;
private static final byte MINOR_VERSION = (byte) 0;
+ private boolean _done;
enum State {
AWAITING_MAJOR,
@@ -54,26 +55,30 @@ public class SASLProtocolHeaderHandler i
{
case AWAITING_MAJOR:
_state = in.get() == MAJOR_VERSION ? State.AWAITING_MINOR
: State.ERROR;
- if(!in.hasRemaining())
+ if(_state == State.ERROR || !in.hasRemaining())
{
+ _done = true;
break;
}
case AWAITING_MINOR:
_state = in.get() == MINOR_VERSION ? State.AWAITING_MINOR
: State.ERROR;
- if(!in.hasRemaining())
+ if(_state == State.ERROR || !in.hasRemaining())
{
+ _done = true;
break;
}
case AWAITING_REVISION:
byte revision = in.get();
_connection.protocolHeaderReceived(MAJOR_VERSION,
MINOR_VERSION, revision);
ProtocolHandler handler = new
SASLFrameHandler(_connection);
+ _done = true;
return handler.parse(in);
}
}
if(_state == State.ERROR)
{
_connection.invalidHeaderReceived();
+
}
return this;
@@ -81,6 +86,6 @@ public class SASLProtocolHeaderHandler i
public boolean isDone()
{
- return _state != State.ERROR && !_connection.closedForInput();
+ return _done || _connection.closedForInput();
}
}
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1612441&r1=1612440&r2=1612441&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
Tue Jul 22 01:00:18 2014
@@ -181,7 +181,7 @@ public class ConnectionEndpoint implemen
{
synchronized (getLock())
{
- while (!_saslComplete)
+ while (!(_saslComplete || _closedForInput))
{
try
{
@@ -711,8 +711,7 @@ public class ConnectionEndpoint implemen
public void invalidHeaderReceived()
{
- // TODO
- _closedForInput = true;
+ setClosedForInput(true);
}
public synchronized boolean closedForInput()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]