Author: rgodfrey
Date: Tue Jul 28 16:31:18 2015
New Revision: 1693129
URL: http://svn.apache.org/r1693129
Log:
QPID-6662 : Continue to use the same buffer rather than fragmenting when there
is insufficient data to decode (0-8/9/9-1)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Tue Jul 28 16:31:18 2015
@@ -275,6 +275,7 @@ public class MultiVersionProtocolEngine
public void received(ByteBuffer msg)
{
_logger.error("Error processing incoming data, could not negotiate
a common protocol");
+ msg.position(msg.limit());
}
public void closed()
@@ -395,7 +396,7 @@ public class MultiVersionProtocolEngine
if(_header.remaining() > msgheader.limit())
{
- msg.position(msg.limit());
+ return;
}
else
{
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
Tue Jul 28 16:31:18 2015
@@ -43,7 +43,7 @@ public class NonBlockingConnectionPlainD
public boolean processData(ByteBuffer buffer)
{
_parent.processAmqpData(buffer);
- buffer.position(buffer.limit());
+
return false;
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
Tue Jul 28 16:31:18 2015
@@ -212,7 +212,10 @@ public class AMQPConnection_0_10 extends
throw new ConnectionScopedRuntimeException(e);
}
}
-
+ finally
+ {
+ buf.position(buf.limit());
+ }
return null;
}
});
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Tue Jul 28 16:31:18 2015
@@ -64,7 +64,6 @@ import org.apache.qpid.server.protocol.C
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
Tue Jul 28 16:31:18 2015
@@ -159,7 +159,7 @@ public class MessageMetaData implements
{
try
{
- ByteBufferDataInput dataInput = new ByteBufferDataInput(buf);
+ ByteBufferDataInput dataInput = new
ByteBufferDataInput(buf.slice());
int size = EncodingUtils.readInteger(dataInput);
ContentHeaderBody chb =
ContentHeaderBody.createFromBuffer(dataInput, size);
final AMQShortString exchange =
EncodingUtils.readAMQShortString(dataInput);
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
Tue Jul 28 16:31:18 2015
@@ -461,6 +461,10 @@ public class AMQPConnection_1_0 extends
LOGGER.error("Exception while processing incoming data", e);
getNetwork().close();
}
+ finally
+ {
+ msg.position(msg.limit());
+ }
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
Tue Jul 28 16:31:18 2015
@@ -61,13 +61,8 @@ public abstract class AMQDecoder<T exten
private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
- private List<ByteArrayInputStream> _remainingBufs = new
ArrayList<ByteArrayInputStream>();
-
- private List<ByteBuffer> _incompleteBuffers = new ArrayList<ByteBuffer>();
-
/**
* Creates a new AMQP decoder.
- *
* @param expectProtocolInitiation <tt>true</tt> if this decoder needs to
handle protocol initiation.
* @param methodProcessor method processor
*/
@@ -101,22 +96,21 @@ public abstract class AMQDecoder<T exten
}
- public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException,
AMQProtocolVersionException, IOException
- {
-
- buf = buf.slice();
- _incompleteBuffers.add(buf);
- ByteBufferListDataInput msg = new
ByteBufferListDataInput(_incompleteBuffers);
+ public abstract void decodeBuffer(ByteBuffer buf) throws
AMQFrameDecodingException, AMQProtocolVersionException, IOException;
+ protected void decode(final MarkableDataInput msg) throws IOException,
AMQFrameDecodingException
+ {
// If this is the first read then we may be getting a protocol
initiation back if we tried to negotiate
// an unsupported version
- if(_firstRead && buf.hasRemaining())
+ if(_firstRead && msg.available()>0)
{
+ msg.mark(1);
_firstRead = false;
- if(!_expectProtocolInitiation && buf.get(buf.position()) > 8)
+ if(!_expectProtocolInitiation && (((int)msg.readByte()) &0xff) > 8)
{
_expectProtocolInitiation = true;
}
+ msg.reset();
}
boolean enoughData = true;
@@ -140,24 +134,6 @@ public abstract class AMQDecoder<T exten
}
}
-
- ListIterator<ByteBuffer> iter = _incompleteBuffers.listIterator();
- while(iter.hasNext())
- {
- ByteBuffer next = iter.next();
- if(next.hasRemaining())
- {
- if(next.position() != 0)
- {
- iter.set(next.slice());
- }
- break;
- }
- else
- {
- iter.remove();
- }
- }
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
Tue Jul 28 16:31:18 2015
@@ -21,11 +21,16 @@
package org.apache.qpid.codec;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
import org.apache.qpid.framing.*;
public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends
ClientChannelMethodProcessor>>
{
+ private List<ByteBuffer> _incompleteBuffers = new ArrayList<ByteBuffer>();
/**
* Creates a new AMQP decoder.
@@ -37,6 +42,36 @@ public class ClientDecoder extends AMQDe
super(false, methodProcessor);
}
+ @Override
+ public void decodeBuffer(ByteBuffer buf)
+ throws AMQFrameDecodingException, AMQProtocolVersionException,
IOException
+ {
+ buf = buf.slice();
+ _incompleteBuffers.add(buf);
+ MarkableDataInput msg = new
ByteBufferListDataInput(_incompleteBuffers);
+
+ decode(msg);
+
+
+ ListIterator<ByteBuffer> iter = _incompleteBuffers.listIterator();
+ while (iter.hasNext())
+ {
+ ByteBuffer next = iter.next();
+ if (next.hasRemaining())
+ {
+ if (next.position() != 0)
+ {
+ iter.set(next.slice());
+ }
+ break;
+ }
+ else
+ {
+ iter.remove();
+ }
+ }
+
+ }
void processMethod(int channelId,
MarkableDataInput in)
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
Tue Jul 28 16:31:18 2015
@@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
public interface MarkableDataInput extends DataInput
{
- public void mark(int pos);
+ public void mark(int readAhead);
public void reset() throws IOException;
int available() throws IOException;
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
Tue Jul 28 16:31:18 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.codec;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.framing.*;
@@ -37,6 +38,12 @@ public class ServerDecoder extends AMQDe
super(true, methodProcessor);
}
+ public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException,
AMQProtocolVersionException, IOException
+ {
+ decode(new ByteBufferDataInput(buf));
+ }
+
+
void processMethod(int channelId,
MarkableDataInput in)
throws AMQFrameDecodingException, IOException
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
Tue Jul 28 16:31:18 2015
@@ -503,7 +503,7 @@ public class BasicContentHeaderPropertie
_encodedForm = buffer.readAsByteBuffer(size);
- ByteBufferDataInput input = new ByteBufferDataInput(_encodedForm);
+ ByteBufferDataInput input = new
ByteBufferDataInput(_encodedForm.slice());
decode(input);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
Tue Jul 28 16:31:18 2015
@@ -28,10 +28,12 @@ public class ByteBufferDataInput impleme
{
private final ByteBuffer _underlying;
private int _mark;
+ private final int _offset;
public ByteBufferDataInput(ByteBuffer underlying)
{
- _underlying = underlying.slice();
+ _underlying = underlying;
+ _offset = underlying.position();
}
public void readFully(byte[] b)
@@ -55,7 +57,7 @@ public class ByteBufferDataInput impleme
public int skipBytes(int n)
{
_underlying.position(_underlying.position()+n);
- return _underlying.position();
+ return _underlying.position()-_offset;
}
public boolean readBoolean()
@@ -143,12 +145,12 @@ public class ByteBufferDataInput impleme
public int position()
{
- return _underlying.position();
+ return _underlying.position()-_offset;
}
public void position(int position)
{
- _underlying.position(position);
+ _underlying.position(position+_offset);
}
public int length()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]