Author: kwall
Date: Sat Nov 14 14:31:28 2015
New Revision: 1714337
URL: http://svn.apache.org/viewvc?rev=1714337&view=rev
Log:
NO-JIRA: [Java Broker] Fix formatting in AMQP 1.0 FrameHandlers Removing
logging to stderr
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1714337&r1=1714336&r2=1714337&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
Sat Nov 14 14:31:28 2015
@@ -23,6 +23,9 @@ package org.apache.qpid.amqp_1_0.framing
import java.nio.ByteBuffer;
import java.util.Formatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
@@ -35,6 +38,8 @@ import org.apache.qpid.bytebuffer.QpidBy
public class FrameHandler implements ProtocolHandler
{
+ public static Logger LOGGER = LoggerFactory.getLogger(FrameHandler.class);
+
private ConnectionEndpoint _connection;
private ValueHandler _typeHandler;
@@ -67,136 +72,148 @@ public class FrameHandler implements Pro
{
try
{
- Error frameParsingError = null;
- int size = _size;
- State state = _state;
- QpidByteBuffer oldIn = null;
-
- while(in.hasRemaining() && state != State.ERROR)
- {
+ Error frameParsingError = null;
+ int size = _size;
+ State state = _state;
+ QpidByteBuffer oldIn = null;
- final int remaining = in.remaining();
- if(remaining == 0)
+ while (in.hasRemaining() && state != State.ERROR)
{
- return this;
- }
+ final int remaining = in.remaining();
+ if (remaining == 0)
+ {
+ return this;
+ }
- switch(state)
- {
- case SIZE_0:
- if(remaining >= 4)
- {
- size = in.getInt();
- state = State.PRE_PARSE;
- break;
- }
- else
- {
- size = (in.get() << 24) & 0xFF000000;
- if(!in.hasRemaining())
+
+ switch (state)
+ {
+ case SIZE_0:
+ if (remaining >= 4)
{
- state = State.SIZE_1;
+ size = in.getInt();
+ state = State.PRE_PARSE;
break;
}
- }
- case SIZE_1:
- size |= (in.get() << 16) & 0xFF0000;
- if(!in.hasRemaining())
- {
- state = State.SIZE_2;
- break;
- }
- case SIZE_2:
- size |= (in.get() << 8) & 0xFF00;
- if(!in.hasRemaining())
- {
- state = State.SIZE_3;
- break;
- }
- case SIZE_3:
- size |= in.get() & 0xFF;
- state = State.PRE_PARSE;
+ else
+ {
+ size = (in.get() << 24) & 0xFF000000;
+ if (!in.hasRemaining())
+ {
+ state = State.SIZE_1;
+ break;
+ }
+ }
+ case SIZE_1:
+ size |= (in.get() << 16) & 0xFF0000;
+ if (!in.hasRemaining())
+ {
+ state = State.SIZE_2;
+ break;
+ }
+ case SIZE_2:
+ size |= (in.get() << 8) & 0xFF00;
+ if (!in.hasRemaining())
+ {
+ state = State.SIZE_3;
+ break;
+ }
+ case SIZE_3:
+ size |= in.get() & 0xFF;
+ state = State.PRE_PARSE;
- case PRE_PARSE:
+ case PRE_PARSE:
- if(size < 8)
- {
- frameParsingError = createFramingError("specified
frame size %d smaller than minimum frame header size %d", size, 8);
- state = State.ERROR;
- break;
- }
+ if (size < 8)
+ {
+ frameParsingError = createFramingError(
+ "specified frame size %d smaller than
minimum frame header size %d",
+ size,
+ 8);
+ state = State.ERROR;
+ break;
+ }
- else if(size >
_connection.getDesiredMaxFrameSize().intValue())
- {
- frameParsingError = createFramingError("specified
frame size %d larger than maximum frame header size %d", size,
_connection.getDesiredMaxFrameSize().intValue());
- state = State.ERROR;
- break;
- }
+ else if (size >
_connection.getDesiredMaxFrameSize().intValue())
+ {
+ frameParsingError = createFramingError(
+ "specified frame size %d larger than
maximum frame header size %d",
+ size,
+
_connection.getDesiredMaxFrameSize().intValue());
+ state = State.ERROR;
+ break;
+ }
- if(in.remaining() < size-4)
- {
- _buffer = QpidByteBuffer.allocateDirect(size-4);
- _buffer.put(in);
- state = State.BUFFERING;
- break;
- }
- case BUFFERING:
- if(_buffer != null)
- {
- final int bufferRemaining = _buffer.remaining();
- if(in.remaining() < bufferRemaining)
+ if (in.remaining() < size - 4)
{
+ _buffer = QpidByteBuffer.allocateDirect(size - 4);
_buffer.put(in);
+ state = State.BUFFERING;
break;
}
- else
+ case BUFFERING:
+ if (_buffer != null)
{
- QpidByteBuffer dup = in.duplicate();
- dup.limit(dup.position() + bufferRemaining);
+ final int bufferRemaining = _buffer.remaining();
+ if (in.remaining() < bufferRemaining)
+ {
+ _buffer.put(in);
+ break;
+ }
+ else
+ {
+ QpidByteBuffer dup = in.duplicate();
+ dup.limit(dup.position() + bufferRemaining);
- _buffer.put(dup);
- dup.dispose();
+ _buffer.put(dup);
+ dup.dispose();
- in.position(in.position() + bufferRemaining);
- oldIn = in;
+ in.position(in.position() + bufferRemaining);
+ oldIn = in;
- _buffer.flip();
- in = _buffer;
- state = State.PARSING;
+ _buffer.flip();
+ in = _buffer;
+ state = State.PARSING;
+ }
}
- }
- case PARSING:
+ case PARSING:
- int dataOffset = (in.get() << 2) & 0x3FF;
+ int dataOffset = (in.get() << 2) & 0x3FF;
- if(dataOffset < 8)
- {
- frameParsingError = createFramingError("specified
frame data offset %d smaller than minimum frame header size %d", dataOffset, 8);
- state = State.ERROR;
- break;
- }
- else if(dataOffset > size)
- {
- frameParsingError = createFramingError("specified
frame data offset %d larger than the frame size %d", dataOffset, _size);
- state = State.ERROR;
- break;
- }
+ if (dataOffset < 8)
+ {
+ frameParsingError = createFramingError(
+ "specified frame data offset %d smaller
than minimum frame header size %d",
+ dataOffset,
+ 8);
+ state = State.ERROR;
+ break;
+ }
+ else if (dataOffset > size)
+ {
+ frameParsingError = createFramingError(
+ "specified frame data offset %d larger
than the frame size %d",
+ dataOffset,
+ _size);
+ state = State.ERROR;
+ break;
+ }
- // type
+ // type
- int type = in.get() & 0xFF;
- int channel = in.getShort() & 0xFFFF;
+ int type = in.get() & 0xFF;
+ int channel = in.getShort() & 0xFFFF;
- if(type != 0 && type != 1)
- {
- frameParsingError = createFramingError("unknown frame
type: %d", type);
- state = State.ERROR;
- break;
- }
+ if (type != 0 && type != 1)
+ {
+ frameParsingError = createFramingError("unknown
frame type: %d", type);
+ state = State.ERROR;
+ break;
+ }
- // channel
+ // channel
/*if(channel > _connection.getChannelMax())
{
@@ -207,86 +224,86 @@ public class FrameHandler implements Pro
state = State.ERROR;
}
*/
- // ext header
- if(dataOffset!=8)
- {
- in.position(in.position()+dataOffset-8);
- }
+ // ext header
+ if (dataOffset != 8)
+ {
+ in.position(in.position() + dataOffset - 8);
+ }
- // oldIn null iff not working on duplicated buffer
- if(oldIn == null)
- {
- oldIn = in;
- in = in.duplicate();
- final int endPos = in.position() + size - dataOffset;
- in.limit(endPos);
- oldIn.position(endPos);
+ // oldIn null iff not working on duplicated buffer
+ if (oldIn == null)
+ {
+ oldIn = in;
+ in = in.duplicate();
+ final int endPos = in.position() + size -
dataOffset;
+ in.limit(endPos);
+ oldIn.position(endPos);
- }
+ }
- int inPos = in.position();
- int inLimit = in.limit();
- // PARSE HERE
- try
- {
- Object val = in.hasRemaining() ?
_typeHandler.parse(in) : null;
-
- if(in.hasRemaining())
+ int inPos = in.position();
+ int inLimit = in.limit();
+ // PARSE HERE
+ try
{
- if(val instanceof Transfer)
+ Object val = in.hasRemaining() ?
_typeHandler.parse(in) : null;
+
+ if (in.hasRemaining())
{
- ((Transfer)val).setPayload(in.slice());
+ if (val instanceof Transfer)
+ {
+ ((Transfer) val).setPayload(in.slice());
+ }
}
- }
- _connection.receive((short)channel,val);
+ _connection.receive((short) channel, val);
- reset();
- state = State.SIZE_0;
- break;
- }
- catch (AmqpErrorException ex)
- {
- state = State.ERROR;
- frameParsingError = ex.getError();
- }
- catch (RuntimeException e)
- {
- in.position(inPos);
- in.limit(inLimit);
- throw e;
- }
- finally
- {
- in.dispose();
-
- in = oldIn;
- oldIn = null;
- _buffer = null;
- }
- }
+ reset();
+ state = State.SIZE_0;
+ break;
+ }
+ catch (AmqpErrorException ex)
+ {
+ state = State.ERROR;
+ frameParsingError = ex.getError();
+ }
+ catch (RuntimeException e)
+ {
+ in.position(inPos);
+ in.limit(inLimit);
+ throw e;
+ }
+ finally
+ {
+ in.dispose();
- }
+ in = oldIn;
+ oldIn = null;
+ _buffer = null;
+ }
+ }
- _state = state;
- _size = size;
+ }
- if(_state == State.ERROR)
- {
- _connection.handleError(frameParsingError);
+ _state = state;
+ _size = size;
- if (_buffer != null)
+ if (_state == State.ERROR)
{
- _buffer.dispose();
- _buffer = null;
+ _connection.handleError(frameParsingError);
+
+ if (_buffer != null)
+ {
+ _buffer.dispose();
+ _buffer = null;
+ }
}
}
- }
- catch(RuntimeException e)
+ catch (RuntimeException e)
{
+ LOGGER.warn("Unexpected exception handling frame", e);
// This exception is unexpected. The up layer should handle error
condition gracefully
_connection.handleError(this.createError(ConnectionError.CONNECTION_FORCED,
e.toString()));
- e.printStackTrace();
}
return this;
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java?rev=1714337&r1=1714336&r2=1714337&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
Sat Nov 14 14:31:28 2015
@@ -30,8 +30,13 @@ import org.apache.qpid.bytebuffer.QpidBy
import java.util.Formatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class SASLFrameHandler implements ProtocolHandler
{
+ public static Logger LOGGER =
LoggerFactory.getLogger(SASLFrameHandler.class);
+
private ConnectionEndpoint _connection;
private ValueHandler _typeHandler;
@@ -64,138 +69,150 @@ public class SASLFrameHandler implements
{
try
{
- Error frameParsingError = null;
- int size = _size;
- State state = _state;
- QpidByteBuffer oldIn = null;
-
- while(in.hasRemaining() && !_connection.isSASLComplete() && state !=
State.ERROR)
- {
+ Error frameParsingError = null;
+ int size = _size;
+ State state = _state;
+ QpidByteBuffer oldIn = null;
- final int remaining = in.remaining();
- if(remaining == 0)
+ while (in.hasRemaining() && !_connection.isSASLComplete() && state
!= State.ERROR)
{
- return this;
- }
+ final int remaining = in.remaining();
+ if (remaining == 0)
+ {
+ return this;
+ }
- switch(state)
- {
- case SIZE_0:
- if(remaining >= 4)
- {
- size = in.getInt();
- state = State.PRE_PARSE;
- break;
- }
- else
- {
- size = (in.get() << 24) & 0xFF000000;
- if(!in.hasRemaining())
+
+ switch (state)
+ {
+ case SIZE_0:
+ if (remaining >= 4)
{
- state = State.SIZE_1;
+ size = in.getInt();
+ state = State.PRE_PARSE;
break;
}
- }
- case SIZE_1:
- size |= (in.get() << 16) & 0xFF0000;
- if(!in.hasRemaining())
- {
- state = State.SIZE_2;
- break;
- }
- case SIZE_2:
- size |= (in.get() << 8) & 0xFF00;
- if(!in.hasRemaining())
- {
- state = State.SIZE_3;
- break;
- }
- case SIZE_3:
- size |= in.get() & 0xFF;
- state = State.PRE_PARSE;
+ else
+ {
+ size = (in.get() << 24) & 0xFF000000;
+ if (!in.hasRemaining())
+ {
+ state = State.SIZE_1;
+ break;
+ }
+ }
+ case SIZE_1:
+ size |= (in.get() << 16) & 0xFF0000;
+ if (!in.hasRemaining())
+ {
+ state = State.SIZE_2;
+ break;
+ }
+ case SIZE_2:
+ size |= (in.get() << 8) & 0xFF00;
+ if (!in.hasRemaining())
+ {
+ state = State.SIZE_3;
+ break;
+ }
+ case SIZE_3:
+ size |= in.get() & 0xFF;
+ state = State.PRE_PARSE;
- case PRE_PARSE:
+ case PRE_PARSE:
- if(size < 8)
- {
- frameParsingError = createFramingError("specified
frame size %d smaller than minimum frame header size %d", _size, 8);
- state = State.ERROR;
- break;
- }
+ if (size < 8)
+ {
+ frameParsingError = createFramingError(
+ "specified frame size %d smaller than
minimum frame header size %d",
+ _size,
+ 8);
+ state = State.ERROR;
+ break;
+ }
- else if(size > _connection.getMaxFrameSize())
- {
- frameParsingError = createFramingError("specified
frame size %d larger than maximum frame header size %d", size,
_connection.getMaxFrameSize());
- state = State.ERROR;
- break;
- }
+ else if (size > _connection.getMaxFrameSize())
+ {
+ frameParsingError = createFramingError(
+ "specified frame size %d larger than
maximum frame header size %d",
+ size,
+ _connection.getMaxFrameSize());
+ state = State.ERROR;
+ break;
+ }
- if(in.remaining() < size-4)
- {
- _buffer = QpidByteBuffer.allocateDirect(size-4);
- _buffer.put(in);
- state = State.BUFFERING;
- break;
- }
- case BUFFERING:
- if(_buffer != null)
- {
- if(in.remaining() < _buffer.remaining())
+ if (in.remaining() < size - 4)
{
+ _buffer = QpidByteBuffer.allocateDirect(size - 4);
_buffer.put(in);
+ state = State.BUFFERING;
break;
}
- else
+ case BUFFERING:
+ if (_buffer != null)
{
- QpidByteBuffer dup = in.duplicate();
- dup.limit(dup.position()+_buffer.remaining());
- in.position(in.position()+_buffer.remaining());
- _buffer.put(dup);
- dup.dispose();
-
- oldIn = in;
- _buffer.flip();
- in = _buffer;
- state = State.PARSING;
+ if (in.remaining() < _buffer.remaining())
+ {
+ _buffer.put(in);
+ break;
+ }
+ else
+ {
+ QpidByteBuffer dup = in.duplicate();
+ dup.limit(dup.position() +
_buffer.remaining());
+ in.position(in.position() +
_buffer.remaining());
+ _buffer.put(dup);
+ dup.dispose();
+
+ oldIn = in;
+ _buffer.flip();
+ in = _buffer;
+ state = State.PARSING;
+ }
}
- }
- case PARSING:
+ case PARSING:
- int dataOffset = (in.get() << 2) & 0x3FF;
+ int dataOffset = (in.get() << 2) & 0x3FF;
- if(dataOffset < 8)
- {
- frameParsingError = createFramingError("specified
frame data offset %d smaller than minimum frame header size %d", dataOffset, 8);
- state = State.ERROR;
- break;
- }
- else if(dataOffset > size)
- {
- frameParsingError = createFramingError("specified
frame data offset %d larger than the frame size %d", dataOffset, _size);
- state = State.ERROR;
- break;
- }
+ if (dataOffset < 8)
+ {
+ frameParsingError = createFramingError(
+ "specified frame data offset %d smaller
than minimum frame header size %d",
+ dataOffset,
+ 8);
+ state = State.ERROR;
+ break;
+ }
+ else if (dataOffset > size)
+ {
+ frameParsingError = createFramingError(
+ "specified frame data offset %d larger
than the frame size %d",
+ dataOffset,
+ _size);
+ state = State.ERROR;
+ break;
+ }
- // type
+ // type
- int type = in.get() & 0xFF;
- int channel = in.getShort() & 0xFF;
+ int type = in.get() & 0xFF;
+ int channel = in.getShort() & 0xFF;
- if(type != 0 && type != 1)
- {
- frameParsingError = createFramingError("unknown frame
type: %d", type);
- state = State.ERROR;
- break;
- }
+ if (type != 0 && type != 1)
+ {
+ frameParsingError = createFramingError("unknown
frame type: %d", type);
+ state = State.ERROR;
+ break;
+ }
- if(type != 1)
- {
- System.err.println("Wrong frame type for SASL Frame");
- }
+ if (type != 1)
+ {
+ System.err.println("Wrong frame type for SASL
Frame");
+ }
- // channel
+ // channel
/*if(channel > _connection.getChannelMax())
{
@@ -206,75 +223,78 @@ public class SASLFrameHandler implements
state = State.ERROR;
}
*/
- // ext header
- if(dataOffset!=8)
- {
- in.position(in.position()+dataOffset-8);
- }
-
- // oldIn null iff not working on duplicated buffer
- if(oldIn == null)
- {
- oldIn = in;
- in = in.duplicate();
- final int endPos = in.position() + size - dataOffset;
- in.limit(endPos);
- oldIn.position(endPos);
+ // ext header
+ if (dataOffset != 8)
+ {
+ in.position(in.position() + dataOffset - 8);
+ }
- }
+ // oldIn null iff not working on duplicated buffer
+ if (oldIn == null)
+ {
+ oldIn = in;
+ in = in.duplicate();
+ final int endPos = in.position() + size -
dataOffset;
+ in.limit(endPos);
+ oldIn.position(endPos);
+ }
- // PARSE HERE
- try
- {
- Object val = _typeHandler.parse(in);
- if(in.hasRemaining())
+ // PARSE HERE
+ try
{
- state = State.ERROR;
- frameParsingError = createFramingError("Frame
length %d larger than contained frame body %s.", size, val);
+ Object val = _typeHandler.parse(in);
+
+ if (in.hasRemaining())
+ {
+ state = State.ERROR;
+ frameParsingError = createFramingError(
+ "Frame length %d larger than contained
frame body %s.",
+ size,
+ val);
+
+ }
+ else
+ {
+ _connection.receive((short) channel, val);
+ reset();
+ state = State.SIZE_0;
+ break;
+ }
+
}
- else
+ catch (AmqpErrorException ex)
{
- _connection.receive((short)channel,val);
- reset();
- state = State.SIZE_0;
- break;
+ state = State.ERROR;
+ frameParsingError = ex.getError();
}
+ finally
+ {
+ in.dispose();
+ in = oldIn;
+ oldIn = null;
+ _buffer = null;
+ }
+ }
- }
- catch (AmqpErrorException ex)
- {
- state = State.ERROR;
- frameParsingError = ex.getError();
- }
- finally
- {
- in.dispose();
-
- in = oldIn;
- oldIn = null;
- _buffer = null;
- }
}
- }
-
- _state = state;
- _size = size;
+ _state = state;
+ _size = size;
- if(_state == State.ERROR)
- {
- _connection.handleError(frameParsingError);
- if (_buffer != null)
+ if (_state == State.ERROR)
{
- _buffer.dispose();
- _buffer = null;
+ _connection.handleError(frameParsingError);
+ if (_buffer != null)
+ {
+ _buffer.dispose();
+ _buffer = null;
+ }
}
- }
- if(_connection.isSASLComplete())
+ if (_connection.isSASLComplete())
{
return new ProtocolHeaderHandler(_connection);
}
@@ -283,9 +303,9 @@ public class SASLFrameHandler implements
return this;
}
}
- catch(RuntimeException e)
+ catch (RuntimeException e)
{
- e.printStackTrace();
+ LOGGER.warn("Exception handling frame", e);
throw e;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]