Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Tue Jul 28 15:48:30 2015 @@ -28,7 +28,9 @@ import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.util.ByteBufferDataOutput; import org.apache.qpid.util.BytesDataOutput; public class BasicContentHeaderProperties @@ -84,7 +86,7 @@ public class BasicContentHeaderPropertie private static final int USER_ID_MASK = 1 << 4; private static final int APPLICATION_ID_MASK = 1 << 3; private static final int CLUSTER_ID_MASK = 1 << 2; - private byte[] _encodedForm; + private ByteBuffer _encodedForm; public BasicContentHeaderProperties(BasicContentHeaderProperties other) @@ -134,7 +136,7 @@ public class BasicContentHeaderPropertie { if(useEncodedForm()) { - return _encodedForm.length; + return _encodedForm.remaining(); } else { @@ -235,7 +237,21 @@ public class BasicContentHeaderPropertie { if(useEncodedForm()) { - buffer.write(_encodedForm); + int offset; + int length = _encodedForm.remaining();; + byte[] array; + if(_encodedForm.hasArray()) + { + array = _encodedForm.array(); + offset = _encodedForm.arrayOffset() + _encodedForm.position(); + } + else + { + array = new byte[length]; + _encodedForm.duplicate().get(array); + offset = 0; + } + buffer.write(array, offset, length); } else { @@ -318,7 +334,7 @@ public class BasicContentHeaderPropertie } } - public int read(DataInput input) throws IOException + public int read(MarkableDataInput input) throws IOException { _propertyFlags = input.readUnsignedShort(); @@ -347,7 +363,7 @@ public class BasicContentHeaderPropertie { int fieldTableLength = input.readInt(); - _headers = new FieldTable(input, fieldTableLength); + _headers = new FieldTable(input.readAsByteBuffer(fieldTableLength)); length += 4; length += fieldTableLength; @@ -460,22 +476,23 @@ public class BasicContentHeaderPropertie { if(useEncodedForm()) { - sender.send(ByteBuffer.wrap(_encodedForm)); - return _encodedForm.length; + sender.send(_encodedForm.duplicate()); + return _encodedForm.remaining(); } else { int propertyListSize = getPropertyListSize(); - byte[] data = new byte[propertyListSize]; - BytesDataOutput out = new BytesDataOutput(data); + ByteBuffer buf = ByteBuffer.allocateDirect(propertyListSize); + ByteBufferDataOutput out = new ByteBufferDataOutput(buf); writePropertyListPayload(out); - sender.send(ByteBuffer.wrap(data)); + buf.flip(); + sender.send(buf); return propertyListSize; } } - public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException + public void populatePropertiesFromBuffer(MarkableDataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException { _propertyFlags = propertyFlags; @@ -484,16 +501,15 @@ public class BasicContentHeaderPropertie _logger.debug("Property flags: " + _propertyFlags); } - _encodedForm = new byte[size]; - buffer.readFully(_encodedForm); + _encodedForm = buffer.readAsByteBuffer(size); - ByteArrayDataInput input = new ByteArrayDataInput(_encodedForm); + ByteBufferDataInput input = new ByteBufferDataInput(_encodedForm); decode(input); } - private void decode(ByteArrayDataInput buffer) throws IOException, AMQFrameDecodingException + private void decode(MarkableDataInput buffer) throws IOException, AMQFrameDecodingException { int headersOffset = 0; @@ -513,7 +529,11 @@ public class BasicContentHeaderPropertie { long length = EncodingUtils.readUnsignedInteger(buffer); - _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length); + ByteBuffer buf = _encodedForm.slice(); + buf.position(headersOffset+4); + buf = buf.slice(); + buf.limit((int)length); + _headers = new FieldTable(buf); buffer.skipBytes((int)length); }
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java Tue Jul 28 15:48:30 2015 @@ -20,6 +20,8 @@ */ package org.apache.qpid.framing; +import java.nio.ByteBuffer; + import org.apache.qpid.codec.MarkableDataInput; public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput @@ -164,6 +166,14 @@ public class ByteArrayDataInput implemen return b.length; } + @Override + public ByteBuffer readAsByteBuffer(final int len) + { + byte[] data = new byte[len]; + readFully(data); + return ByteBuffer.wrap(data); + } + public int position() { return _offset - _origin; Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java?rev=1693123&view=auto ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java (added) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java Tue Jul 28 15:48:30 2015 @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import java.nio.ByteBuffer; + +import org.apache.qpid.codec.MarkableDataInput; + +public class ByteBufferDataInput implements ExtendedDataInput, MarkableDataInput +{ + private final ByteBuffer _underlying; + private int _mark; + + public ByteBufferDataInput(ByteBuffer underlying) + { + _underlying = underlying.slice(); + } + + public void readFully(byte[] b) + { + _underlying.get(b); + } + + public void readFully(byte[] b, int off, int len) + { + _underlying.get(b,0, len); + } + + public ByteBuffer readAsByteBuffer(int len) + { + ByteBuffer buf = _underlying.slice(); + buf.limit(len); + skipBytes(len); + return buf; + } + + public int skipBytes(int n) + { + _underlying.position(_underlying.position()+n); + return _underlying.position(); + } + + public boolean readBoolean() + { + return _underlying.get() != 0; + } + + public byte readByte() + { + return _underlying.get(); + } + + public int readUnsignedByte() + { + return ((int)_underlying.get()) & 0xFF; + } + + public short readShort() + { + return _underlying.getShort(); + } + + public int readUnsignedShort() + { + return ((int)_underlying.getShort()) & 0xffff; + } + + public char readChar() + { + return (char) _underlying.getChar(); + } + + public int readInt() + { + return _underlying.getInt(); + } + + public long readLong() + { + return _underlying.getLong(); + } + + public float readFloat() + { + return _underlying.getFloat(); + } + + public double readDouble() + { + return _underlying.getDouble(); + } + + public AMQShortString readAMQShortString() + { + return AMQShortString.readAMQShortString(_underlying); + } + + public String readLine() + { + throw new UnsupportedOperationException(); + } + + public String readUTF() + { + throw new UnsupportedOperationException(); + } + + public int available() + { + return _underlying.remaining(); + } + + + public long skip(long i) + { + _underlying.position(_underlying.position()+(int)i); + return i; + } + + public int read(byte[] b) + { + readFully(b); + return b.length; + } + + public int position() + { + return _underlying.position(); + } + + public void position(int position) + { + _underlying.position(position); + } + + public int length() + { + return _underlying.limit(); + } + + + public void mark(int readAhead) + { + _mark = position(); + } + + public void reset() + { + _underlying.position(_mark); + } +} Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java?rev=1693123&view=auto ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java (added) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java Tue Jul 28 15:48:30 2015 @@ -0,0 +1,303 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.qpid.codec.MarkableDataInput; + +public class ByteBufferListDataInput implements ExtendedDataInput, MarkableDataInput +{ + private final List<ByteBuffer> _underlying; + private int _bufferIndex; + private int _mark; + + public ByteBufferListDataInput(List<ByteBuffer> underlying) + { + _underlying = underlying; + } + + public void readFully(byte[] b) + { + final ByteBuffer currentBuffer = getCurrentBuffer(); + if(currentBuffer.remaining()>b.length) + { + currentBuffer.get(b); + } + else + { + ByteBuffer buf = readAsByteBuffer(b.length); + buf.get(b); + } + } + + public void readFully(byte[] b, int off, int len) + { + final ByteBuffer currentBuffer = getCurrentBuffer(); + if(currentBuffer.remaining()>len) + { + currentBuffer.get(b, off, len); + } + else + { + ByteBuffer buf = readAsByteBuffer(len); + buf.get(b, off, len); + } + } + + @Override + public ByteBuffer readAsByteBuffer(int len) + { + ByteBuffer currentBuffer = getCurrentBuffer(); + if(currentBuffer.remaining()>=len) + { + ByteBuffer buf = currentBuffer.slice(); + buf.limit(len); + currentBuffer.position(currentBuffer.position()+len); + return buf; + } + else + { + ByteBuffer dest = currentBuffer.isDirect() ? ByteBuffer.allocateDirect(len) : ByteBuffer.allocate(len); + while(dest.hasRemaining() && available()>0) + { + advanceIfNecessary(); + currentBuffer = getCurrentBuffer(); + final int remaining = dest.remaining(); + if(currentBuffer.remaining()>= remaining) + { + ByteBuffer buf = currentBuffer.slice(); + buf.limit(remaining); + currentBuffer.position(currentBuffer.position()+remaining); + dest.put(buf); + } + else + { + dest.put(currentBuffer); + } + } + + dest.flip(); + return dest; + } + } + + public int skipBytes(int n) + { + final ByteBuffer currentBuffer = getCurrentBuffer(); + if(currentBuffer.remaining()>n) + { + currentBuffer.position(currentBuffer.position()+n); + } + else + { + n -= currentBuffer.remaining(); + currentBuffer.position(currentBuffer.limit()); + if(_bufferIndex != _underlying.size()-1) + { + _bufferIndex++; + skipBytes(n); + } + } + return position(); + } + + private ByteBuffer getCurrentBuffer() + { + return _underlying.get(_bufferIndex); + } + + public boolean readBoolean() + { + advanceIfNecessary(); + return getCurrentBuffer().get() != 0; + } + + private void advanceIfNecessary() + { + while(!getCurrentBuffer().hasRemaining() && _bufferIndex != _underlying.size()-1) + { + _bufferIndex++; + } + } + + public byte readByte() + { + advanceIfNecessary(); + return getCurrentBuffer().get(); + } + + public int readUnsignedByte() + { + advanceIfNecessary(); + return ((int)getCurrentBuffer().get()) & 0xFF; + } + + public short readShort() + { + return getBuffer(2).getShort(); + } + + private ByteBuffer getBuffer(int size) + { + advanceIfNecessary(); + final ByteBuffer currentBuffer = getCurrentBuffer(); + if(currentBuffer.remaining()>= size) + { + return currentBuffer; + } + else + { + return readAsByteBuffer(size); + } + } + + public int readUnsignedShort() + { + return ((int)getBuffer(2).getShort()) & 0xffff; + } + + public char readChar() + { + return (char) getBuffer(2).getChar(); + } + + public int readInt() + { + return getBuffer(4).getInt(); + } + + public long readLong() + { + return getBuffer(8).getLong(); + } + + public float readFloat() + { + return getBuffer(4).getFloat(); + } + + public double readDouble() + { + return getBuffer(8).getDouble(); + } + + public AMQShortString readAMQShortString() + { + advanceIfNecessary(); + final ByteBuffer currentBuffer = getCurrentBuffer(); + int size = ((int) currentBuffer.get(currentBuffer.position())) & 0xff; + return AMQShortString.readAMQShortString(getBuffer(size + 1)); + } + + public String readLine() + { + throw new UnsupportedOperationException(); + } + + public String readUTF() + { + throw new UnsupportedOperationException(); + } + + public int available() + { + int remaining = 0; + for(int i = _bufferIndex; i < _underlying.size(); i++) + { + remaining += _underlying.get(i).remaining(); + } + return remaining; + } + + + public long skip(long i) + { + skipBytes((int)i); + return i; + } + + public int read(byte[] b) + { + readFully(b); + return b.length; + } + + public int position() + { + int position = 0; + for(int i = 0; i < _bufferIndex; i++) + { + position += _underlying.get(i).limit(); + } + position += getCurrentBuffer().position(); + return position; + } + + public void position(int position) + { + int offset = 0; + boolean beforePos = true; + for(int i = 0; i < _underlying.size(); i++) + { + final ByteBuffer buffer = _underlying.get(i); + if(beforePos) + { + if (position - offset <= buffer.limit()) + { + buffer.position(position - offset); + _bufferIndex = i; + beforePos = false; + } + else + { + offset += buffer.limit(); + } + } + else + { + buffer.position(0); + } + } + } + + public int length() + { + int length = 0; + for(ByteBuffer buf : _underlying) + { + length+= buf.limit(); + } + return length; + } + + + public void mark(int readAhead) + { + _mark = position(); + } + + public void reset() + { + position(_mark); + } +} Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java Tue Jul 28 15:48:30 2015 @@ -20,6 +20,8 @@ */ package org.apache.qpid.framing; +import java.nio.ByteBuffer; + public interface ChannelMethodProcessor { void receiveChannelFlow(boolean active); @@ -30,7 +32,7 @@ public interface ChannelMethodProcessor void receiveChannelCloseOk(); - void receiveMessageContent(byte[] data); + void receiveMessageContent(ByteBuffer data); void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java Tue Jul 28 15:48:30 2015 @@ -20,7 +20,6 @@ */ package org.apache.qpid.framing; -import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; @@ -34,20 +33,14 @@ public class ContentBody implements AMQB { public static final byte TYPE = 3; - private byte[] _payload; + private ByteBuffer _payload; public ContentBody() { } - public ContentBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException - { - _payload = new byte[(int)size]; - buffer.readFully(getPayload()); - } - - public ContentBody(byte[] payload) + public ContentBody(ByteBuffer payload) { _payload = payload; } @@ -59,12 +52,14 @@ public class ContentBody implements AMQB public int getSize() { - return getPayload() == null ? 0 : getPayload().length; + return _payload == null ? 0 : _payload.remaining(); } public void writePayload(DataOutput buffer) throws IOException { - buffer.write(getPayload()); + byte[] data = new byte[_payload.remaining()]; + _payload.duplicate().get(data); + buffer.write(data); } public void handle(final int channelId, final AMQVersionAwareProtocolSession session) @@ -78,8 +73,8 @@ public class ContentBody implements AMQB { if(_payload != null) { - sender.send(ByteBuffer.wrap(_payload)); - return _payload.length; + sender.send(_payload.duplicate()); + return _payload.remaining(); } else { @@ -87,7 +82,7 @@ public class ContentBody implements AMQB } } - public byte[] getPayload() + public ByteBuffer getPayload() { return _payload; } @@ -97,8 +92,7 @@ public class ContentBody implements AMQB throws IOException { - byte[] payload = new byte[(int)bodySize]; - in.readFully(payload); + ByteBuffer payload = in.readAsByteBuffer((int)bodySize); if(!methodProcessor.ignoreAllButCloseOk()) { @@ -106,77 +100,6 @@ public class ContentBody implements AMQB } } - private static class BufferContentBody implements AMQBody - { - private final int _length; - private final int _offset; - private final ByteBuffer _buf; - - private BufferContentBody( ByteBuffer buf, int offset, int length) - { - _length = length; - _offset = offset; - _buf = buf; - } - - public byte getFrameType() - { - return TYPE; - } - - - public int getSize() - { - return _length; - } - - public void writePayload(DataOutput buffer) throws IOException - { - if(_buf.hasArray()) - { - buffer.write(_buf.array(), _buf.arrayOffset() + _offset, _length); - } - else - { - byte[] data = new byte[_length]; - ByteBuffer buf = _buf.duplicate(); - - buf.position(_offset); - buf.limit(_offset+_length); - buf.get(data); - buffer.write(data); - } - } - - @Override - public long writePayload(final ByteBufferSender sender) throws IOException - { - if(_buf.hasArray()) - { - sender.send(ByteBuffer.wrap(_buf.array(), _buf.arrayOffset() + _offset, _length)); - } - else - { - ByteBuffer buf = _buf.duplicate(); - - buf.position(_offset); - buf.limit(_offset+_length); - sender.send(buf); - } - return _length; - } - - public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws QpidException - { - throw new RuntimeException("Buffered Body only to be used for outgoing data"); - } - } - - public static AMQFrame createAMQFrame(int channelId, ByteBuffer buf, int offset, int length) - { - return new AMQFrame(channelId, new BufferContentBody(buf, offset, length)); - } - public static AMQFrame createAMQFrame(int channelId, ContentBody body) { return new AMQFrame(channelId, body); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Tue Jul 28 15:48:30 2015 @@ -43,7 +43,7 @@ public class ContentHeaderBody implement /** must never be null */ private BasicContentHeaderProperties _properties; - public ContentHeaderBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException + public ContentHeaderBody(MarkableDataInput buffer, long size) throws AMQFrameDecodingException, IOException { buffer.readUnsignedShort(); buffer.readUnsignedShort(); @@ -80,7 +80,7 @@ public class ContentHeaderBody implement * @throws AMQProtocolVersionException if there is a version issue * @throws IOException if there is an IO issue */ - public static ContentHeaderBody createFromBuffer(DataInputStream buffer, long size) + public static ContentHeaderBody createFromBuffer(MarkableDataInput buffer, long size) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { ContentHeaderBody body = new ContentHeaderBody(buffer, size); @@ -105,13 +105,13 @@ public class ContentHeaderBody implement @Override public long writePayload(final ByteBufferSender sender) throws IOException { - byte[] data = new byte[14]; - BytesDataOutput buffer = new BytesDataOutput(data); - EncodingUtils.writeUnsignedShort(buffer, CLASS_ID); - EncodingUtils.writeUnsignedShort(buffer, 0); - buffer.writeLong(_bodySize); - EncodingUtils.writeUnsignedShort(buffer, _properties.getPropertyFlags()); - sender.send(ByteBuffer.wrap(data)); + ByteBuffer data = ByteBuffer.allocateDirect(14); + EncodingUtils.writeUnsignedShort(data, CLASS_ID); + EncodingUtils.writeUnsignedShort(data, 0); + data.putLong(_bodySize); + EncodingUtils.writeUnsignedShort(data, _properties.getPropertyFlags()); + data.flip(); + sender.send(data); return 14 + _properties.writePropertyListPayload(sender); } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Tue Jul 28 15:48:30 2015 @@ -23,6 +23,7 @@ package org.apache.qpid.framing; import java.io.DataInput; import java.io.IOException; +import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQConstant; public class ContentHeaderPropertiesFactory @@ -39,7 +40,7 @@ public class ContentHeaderPropertiesFact } public BasicContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, - DataInput buffer, int size) + MarkableDataInput buffer, int size) throws AMQFrameDecodingException, IOException { BasicContentHeaderProperties properties; Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Tue Jul 28 15:48:30 2015 @@ -30,6 +30,8 @@ import java.nio.charset.StandardCharsets import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.codec.MarkableDataInput; + public class EncodingUtils { private static final Logger _logger = LoggerFactory.getLogger(EncodingUtils.class); @@ -278,6 +280,22 @@ public class EncodingUtils } } + public static void writeUnsignedShort(ByteBuffer buffer, int s) throws IOException + { + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (s < Short.MAX_VALUE) + { + buffer.putShort((short) s); + } + else + { + short sv = (short) s; + buffer.put((byte) (0xFF & (sv >> 8))); + buffer.put((byte) (0xFF & sv)); + } + } + + public static int unsignedIntegerLength() { return 4; @@ -303,6 +321,27 @@ public class EncodingUtils } } + public static void writeUnsignedInteger(ByteBuffer buffer, long l) throws IOException + { + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (l < Integer.MAX_VALUE) + { + buffer.putInt((int) l); + } + else + { + int iv = (int) l; + + // FIXME: This *may* go faster if we build this into a local 4-byte array and then + // put the array in a single call. + buffer.put((byte) (0xFF & (iv >> 24))); + buffer.put((byte) (0xFF & (iv >> 16))); + buffer.put((byte) (0xFF & (iv >> 8))); + buffer.put((byte) (0xFF & iv)); + } + } + + public static void writeFieldTableBytes(DataOutput buffer, FieldTable table) throws IOException { if (table != null) @@ -579,7 +618,7 @@ public class EncodingUtils return result; } - public static FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException + public static FieldTable readFieldTable(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) @@ -588,10 +627,11 @@ public class EncodingUtils } else { - return FieldTableFactory.newFieldTable(buffer, length); + return new FieldTable(buffer.readAsByteBuffer((int)length)); } } + public static AMQShortString readAMQShortString(DataInput buffer) throws IOException { return AMQShortString.readFromBuffer(buffer); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java Tue Jul 28 15:48:30 2015 @@ -21,11 +21,11 @@ package org.apache.qpid.framing; import java.io.ByteArrayOutputStream; -import java.io.DataInput; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -49,8 +49,7 @@ public class FieldTable private static final String STRICT_AMQP_NAME = "STRICT_AMQP"; private static final boolean STRICT_AMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false")); - private byte[] _encodedForm; - private int _encodedFormOffset; + private ByteBuffer _encodedForm; private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null; private long _encodedSize; private static final int INITIAL_HASHMAP_CAPACITY = 16; @@ -69,30 +68,18 @@ public class FieldTable _strictAMQP = strictAMQP; } - /** - * Construct a new field table. - * - * @param buffer the buffer from which to read data. The length byte must be read already - * @param length the length of the field table. Must be great than 0. - * @throws IOException if there is an issue reading the buffer - */ - public FieldTable(DataInput buffer, long length) throws IOException + public FieldTable(byte[] encodedForm, int offset, int length) { - this(); - _encodedForm = new byte[(int) length]; - buffer.readFully(_encodedForm); - _encodedSize = length; + this(ByteBuffer.wrap(encodedForm,offset,length)); } - public FieldTable(byte[] encodedForm, int offset, int length) + public FieldTable(ByteBuffer buffer) { this(); - _encodedForm = encodedForm; - _encodedFormOffset = offset; - _encodedSize = length; + _encodedForm = buffer; + _encodedSize = buffer.remaining(); } - public boolean isClean() { return _encodedForm != null; @@ -858,14 +845,10 @@ public class FieldTable } } - else if(_encodedFormOffset == 0 && _encodedSize == _encodedForm.length) - { - return _encodedForm.clone(); - } else { - byte[] encodedCopy = new byte[(int) _encodedSize]; - System.arraycopy(_encodedForm,_encodedFormOffset,encodedCopy,0,(int)_encodedSize); + byte[] encodedCopy = new byte[_encodedForm.remaining()]; + _encodedForm.duplicate().get(encodedCopy); return encodedCopy; } @@ -1077,10 +1060,12 @@ public class FieldTable private void putDataInBuffer(DataOutput buffer) throws IOException { - if (_encodedForm != null) { - buffer.write(_encodedForm,_encodedFormOffset,(int)_encodedSize); + byte[] encodedCopy = new byte[_encodedForm.remaining()]; + _encodedForm.duplicate().get(encodedCopy); + + buffer.write(encodedCopy); } else if (_properties != null) { @@ -1109,7 +1094,7 @@ public class FieldTable private void setFromBuffer() throws AMQFrameDecodingException, IOException { - ByteArrayDataInput baid = new ByteArrayDataInput(_encodedForm, _encodedFormOffset, (int)_encodedSize); + ByteBufferDataInput dataInput = new ByteBufferDataInput(_encodedForm.duplicate()); if (_encodedSize > 0) { @@ -1120,12 +1105,12 @@ public class FieldTable do { - final AMQShortString key = baid.readAMQShortString(); - AMQTypedValue value = AMQTypedValue.readFromBuffer(baid); + final AMQShortString key = dataInput.readAMQShortString(); + AMQTypedValue value = AMQTypedValue.readFromBuffer(dataInput); _properties.put(key, value); } - while (baid.available() > 0); + while (dataInput.available() > 0); } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java Tue Jul 28 15:48:30 2015 @@ -34,10 +34,5 @@ public class FieldTableFactory return new FieldTable(); } - public static FieldTable newFieldTable(DataInput byteBuffer, long length) throws AMQFrameDecodingException, IOException - { - return new FieldTable(byteBuffer, length); - } - } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java Tue Jul 28 15:48:30 2015 @@ -20,6 +20,7 @@ */ package org.apache.qpid.framing; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -603,7 +604,7 @@ public class FrameCreatingMethodProcesso } @Override - public void receiveMessageContent(final byte[] data) + public void receiveMessageContent(ByteBuffer data) { _processedMethods.add(new AMQFrame(_channelId, new ContentBody(data))); } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java Tue Jul 28 15:48:30 2015 @@ -227,7 +227,7 @@ public class Connection extends Connecti securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings()); IoNetworkTransport transport = new IoNetworkTransport(); - final InputHandler inputHandler = new InputHandler(new Assembler(this)); + final InputHandler inputHandler = new InputHandler(new Assembler(this), false); addFrameSizeObserver(inputHandler); ExceptionHandlingByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler); if(secureReceiver instanceof ConnectionListener) Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java Tue Jul 28 15:48:30 2015 @@ -93,9 +93,9 @@ public final class ProtocolHeader implem return false; } - public ByteBuffer toByteBuffer() + public ByteBuffer toByteBuffer(final boolean useDirect) { - ByteBuffer buf = ByteBuffer.allocate(8); + ByteBuffer buf = useDirect ? ByteBuffer.allocateDirect(8) : ByteBuffer.allocate(8); buf.put(AMQP); buf.put(protoClass); buf.put(instance); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Tue Jul 28 15:48:30 2015 @@ -155,7 +155,7 @@ public class Assembler implements Networ { size += f.getSize(); } - segment = ByteBuffer.allocate(size); + segment = allocateByteBuffer(size); for (Frame f : frames) { segment.put(f.getBody()); @@ -167,6 +167,11 @@ public class Assembler implements Networ } + protected ByteBuffer allocateByteBuffer(final int size) + { + return ByteBuffer.allocate(size); + } + private void assemble(Frame frame, ByteBuffer segment) { BBDecoder dec = _decoder.get(); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Tue Jul 28 15:48:30 2015 @@ -159,7 +159,7 @@ public final class Disassembler implemen { synchronized (sendlock) { - sender.send(header.toByteBuffer()); + sender.send(header.toByteBuffer(false)); sender.flush(); } } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java Tue Jul 28 15:48:30 2015 @@ -29,6 +29,9 @@ import static org.apache.qpid.transport. import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver; import org.apache.qpid.transport.FrameSizeObserver; @@ -46,18 +49,21 @@ import org.apache.qpid.transport.Segment public class InputHandler implements ExceptionHandlingByteBufferReceiver, FrameSizeObserver { + private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class); private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE; + public enum State { PROTO_HDR, FRAME_HDR, FRAME_BODY, - ERROR + ERROR; } - private final NetworkEventReceiver receiver; + + private final boolean _useDirect; private State state; private ByteBuffer input = null; private int needed; @@ -67,27 +73,24 @@ public class InputHandler implements Exc private byte track; private int channel; - public InputHandler(NetworkEventReceiver receiver, State state) + + public InputHandler(NetworkEventReceiver receiver, final boolean useDirect) { this.receiver = receiver; - this.state = state; + this.state = PROTO_HDR; + _useDirect = useDirect; switch (state) { - case PROTO_HDR: - needed = 8; - break; - case FRAME_HDR: - needed = Frame.HEADER_SIZE; - break; + case PROTO_HDR: + needed = 8; + break; + case FRAME_HDR: + needed = Frame.HEADER_SIZE; + break; } } - public InputHandler(NetworkEventReceiver receiver) - { - this(receiver, PROTO_HDR); - } - public void setMaxFrameSize(final int maxFrameSize) { _maxFrameSize = maxFrameSize; @@ -98,6 +101,7 @@ public class InputHandler implements Exc receiver.received(new ProtocolError(Frame.L1, fmt, args)); } + @Override public void received(ByteBuffer buf) { int limit = buf.limit(); @@ -132,7 +136,7 @@ public class InputHandler implements Exc { if (input == null) { - input = ByteBuffer.allocate(needed); + input = _useDirect ? ByteBuffer.allocateDirect(needed) : ByteBuffer.allocate(needed); } input.put(buf); needed -= remaining; @@ -185,7 +189,7 @@ public class InputHandler implements Exc channel = (0xFFFF & input.getShort(pos + 6)); if (size == 0) { - Frame frame = new Frame(flags, type, track, channel, ByteBuffer.allocate(0)); + Frame frame = new Frame(flags, type, track, channel, _useDirect ? ByteBuffer.allocateDirect(0) : ByteBuffer.allocate(0)); receiver.received(frame); needed = Frame.HEADER_SIZE; return FRAME_HDR; Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java Tue Jul 28 15:48:30 2015 @@ -109,6 +109,10 @@ public final class Functions { return hex(bytes, limit, ""); } + public static String hex(ByteBuffer bytes, int limit) + { + return hex(bytes, limit, ""); + } public static String hex(byte[] bytes, int limit, CharSequence separator) { @@ -127,6 +131,26 @@ public final class Functions { sb.append("..."); } + return sb.toString(); + } + + public static String hex(ByteBuffer bytes, int limit, CharSequence separator) + { + limit = Math.min(limit, bytes == null ? 0 : bytes.remaining()); + StringBuilder sb = new StringBuilder(3 + limit*2); + for(int i = 0; i < limit; i++) + { + sb.append(HEX_CHARACTERS[(((int)(bytes.get(bytes.position()+i))) & 0xf0)>>4]); + sb.append(HEX_CHARACTERS[(((int)bytes.get(bytes.position()+i)) & 0x0f)]); + if(i != bytes.remaining() - 1) + { + sb.append(separator); + } + } + if(bytes != null && bytes.remaining()>limit) + { + sb.append("..."); + } return sb.toString(); } Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java?rev=1693123&view=auto ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java (added) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java Tue Jul 28 15:48:30 2015 @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.io.DataOutput; +import java.nio.ByteBuffer; + +public class ByteBufferDataOutput implements DataOutput +{ + private final ByteBuffer _buf; + + public ByteBufferDataOutput(ByteBuffer buf) + { + _buf = buf; + } + + public void write(int b) + { + _buf.put((byte)b); + } + + public void write(byte[] b) + { + _buf.put(b); + } + + + public void write(byte[] b, int off, int len) + { + _buf.put(b, off, len); + + } + + public void writeBoolean(boolean v) + { + _buf.put( v ? (byte) 1 : (byte) 0); + } + + public void writeByte(int v) + { + _buf.put((byte) v); + } + + public void writeShort(int v) + { + _buf.putShort((short)v); + } + + public void writeChar(int v) + { + _buf.put((byte) (v >>> 8)); + _buf.put((byte) v); + } + + public void writeInt(int v) + { + _buf.putInt(v); + } + + public void writeLong(long v) + { + _buf.putLong(v); + } + + public void writeFloat(float v) + { + writeInt(Float.floatToIntBits(v)); + } + + public void writeDouble(double v) + { + writeLong(Double.doubleToLongBits(v)); + } + + public void writeBytes(String s) + { + throw new UnsupportedOperationException("writeBytes(String s) not supported"); + } + + public void writeChars(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + int v = s.charAt(i); + _buf.put((byte) (v >>> 8)); + _buf.put((byte) v); + } + } + + public void writeUTF(String s) + { + int strlen = s.length(); + + int pos = _buf.position(); + _buf.position(pos+2); + + + for (int i = 0; i < strlen; i++) + { + int c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) + { + c = s.charAt(i); + _buf.put((byte) c); + + } + else if (c > 0x07FF) + { + _buf.put((byte) (0xE0 | ((c >> 12) & 0x0F))); + _buf.put((byte) (0x80 | ((c >> 6) & 0x3F))); + _buf.put((byte) (0x80 | (c & 0x3F))); + } + else + { + _buf.put((byte) (0xC0 | ((c >> 6) & 0x1F))); + _buf.put((byte) (0x80 | (c & 0x3F))); + } + } + + int len = _buf.position() - (pos + 2); + + _buf.put(pos++, (byte) (len >>> 8)); + _buf.put(pos, (byte) len); + } + +} Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original) +++ qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Tue Jul 28 15:48:30 2015 @@ -83,7 +83,7 @@ public class AMQDecoderTest extends Qpid Random random = new Random(); final byte[] payload = new byte[2048]; random.nextBytes(payload); - final AMQBody body = new ContentBody(payload); + final AMQBody body = new ContentBody(ByteBuffer.wrap(payload)); AMQFrame frame = new AMQFrame(1, body); byte[] outputBuf = new byte[4096]; BytesDataOutput dataOutput = new BytesDataOutput(outputBuf); @@ -91,14 +91,16 @@ public class AMQDecoderTest extends Qpid for(int i = 0 ; i < dataOutput.length(); i++) { _decoder.decodeBuffer(ByteBuffer.wrap(outputBuf, i, 1)); - } List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods(); if (frames.get(0) instanceof AMQFrame) { assertEquals(ContentBody.TYPE, ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); ContentBody decodedBody = (ContentBody) ((AMQFrame) frames.get(0)).getBodyFrame(); - assertTrue("Body was corrupted", Arrays.equals(payload, decodedBody.getPayload())); + final ByteBuffer byteBuffer = decodedBody.getPayload().duplicate(); + byte[] bodyBytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bodyBytes); + assertTrue("Body was corrupted", Arrays.equals(payload, bodyBytes)); } else { Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java (original) +++ qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java Tue Jul 28 15:48:30 2015 @@ -87,7 +87,7 @@ public class BasicContentHeaderPropertie public void testPopulatePropertiesFromBuffer() throws Exception { - DataInputStream buf = new DataInputStream(new ByteArrayInputStream(new byte[300])); + ByteArrayDataInput buf = new ByteArrayDataInput(new byte[300]); _testProperties.populatePropertiesFromBuffer(buf, 99, 99); } Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java (original) +++ qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java Tue Jul 28 15:48:30 2015 @@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -477,7 +478,7 @@ public class FieldTableTest extends Qpid // Extract the table back from the buffer again. try { - FieldTable extractedOuterTable = EncodingUtils.readFieldTable(new DataInputStream(new ByteArrayInputStream(data))); + FieldTable extractedOuterTable = EncodingUtils.readFieldTable(new ByteArrayDataInput(data)); FieldTable extractedTable = extractedOuterTable.getFieldTable("innerTable"); @@ -600,13 +601,14 @@ public class FieldTableTest extends Qpid ByteArrayOutputStream baos = new ByteArrayOutputStream((int) table.getEncodedSize() + 4); table.writeToBuffer(new DataOutputStream(baos)); - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - DataInputStream dis = new DataInputStream(bais); + ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); + long length = buf.getInt() & 0xFFFFFFFFL; + buf = buf.slice(); + buf.limit((int)length); - long length = dis.readInt() & 0xFFFFFFFFL; - FieldTable table2 = new FieldTable(dis, length); + FieldTable table2 = new FieldTable(buf); Assert.assertEquals((Boolean) true, table2.getBoolean("bool")); Assert.assertEquals((Byte) Byte.MAX_VALUE, table2.getByte("byte")); @@ -918,7 +920,7 @@ public class FieldTableTest extends Qpid assertEquals("unexpected data length", 24, length); //Create a second FieldTable from the encoded bytes - FieldTable tableFromBytes = new FieldTable(new DataInputStream(new ByteArrayInputStream(data)), length); + FieldTable tableFromBytes = new FieldTable(ByteBuffer.wrap(data)); //Create a final FieldTable and addAll() from the table created with encoded bytes FieldTable destinationTable = new FieldTable(); Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java (original) +++ qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java Tue Jul 28 15:48:30 2015 @@ -83,7 +83,7 @@ public abstract class ConnectionBinding public ExceptionHandlingByteBufferReceiver receiver(Connection conn) { - final InputHandler inputHandler = new InputHandler(new Assembler(conn)); + final InputHandler inputHandler = new InputHandler(new Assembler(conn), false); conn.addFrameSizeObserver(inputHandler); if (conn.getConnectionSettings() != null && conn.getConnectionSettings().isUseSASLEncryption()) Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java Tue Jul 28 15:48:30 2015 @@ -41,6 +41,7 @@ import javax.jms.MessageProducer; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -132,7 +133,7 @@ public class FieldTableMessageTest exten final long bodyLength = bytesMessage.getBodyLength(); byte[] data = new byte[(int) bodyLength]; bytesMessage.readBytes(data); - FieldTable actual = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(data)), bodyLength); + FieldTable actual = new FieldTable(ByteBuffer.wrap(data)); for (String key : _expected.keys()) { assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
