Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Fri Aug 7 00:28:17 2015 @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.qpid.QpidException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; @@ -105,7 +106,7 @@ public class ContentHeaderBody implement @Override public long writePayload(final ByteBufferSender sender) throws IOException { - ByteBuffer data = ByteBuffer.allocate(14); + QpidByteBuffer data = QpidByteBuffer.allocate(14); EncodingUtils.writeUnsignedShort(data, CLASS_ID); EncodingUtils.writeUnsignedShort(data, 0); data.putLong(_bodySize);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Fri Aug 7 00:28:17 2015 @@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.codec.MarkableDataInput; public class EncodingUtils @@ -280,7 +281,7 @@ public class EncodingUtils } } - public static void writeUnsignedShort(ByteBuffer buffer, int s) throws IOException + public static void writeUnsignedShort(QpidByteBuffer buffer, int s) throws IOException { // TODO: Is this comparison safe? Do I need to cast RHS to long? if (s < Short.MAX_VALUE) @@ -321,7 +322,7 @@ public class EncodingUtils } } - public static void writeUnsignedInteger(ByteBuffer buffer, long l) throws IOException + public static void writeUnsignedInteger(QpidByteBuffer buffer, long l) throws IOException { // TODO: Is this comparison safe? Do I need to cast RHS to long? if (l < Integer.MAX_VALUE) @@ -627,7 +628,7 @@ public class EncodingUtils } else { - return new FieldTable(buffer.readAsByteBuffer((int)length)); + return new FieldTable(buffer.readAsByteBuffer((int) length)); } } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java Fri Aug 7 00:28:17 2015 @@ -41,6 +41,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; +import org.apache.qpid.codec.MarkableDataInput; // extends FieldTable public class FieldTable @@ -49,7 +51,7 @@ public class FieldTable private static final String STRICT_AMQP_NAME = "STRICT_AMQP"; private static final boolean STRICT_AMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false")); - private ByteBuffer _encodedForm; + private QpidByteBuffer _encodedForm; private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null; private long _encodedSize; private static final int INITIAL_HASHMAP_CAPACITY = 16; @@ -70,10 +72,10 @@ public class FieldTable public FieldTable(byte[] encodedForm, int offset, int length) { - this(ByteBuffer.wrap(encodedForm,offset,length)); + this(QpidByteBuffer.wrap(encodedForm,offset,length)); } - public FieldTable(ByteBuffer buffer) + public FieldTable(QpidByteBuffer buffer) { this(); _encodedForm = buffer; @@ -1094,13 +1096,13 @@ public class FieldTable private void setFromBuffer() throws AMQFrameDecodingException, IOException { - ByteBufferDataInput dataInput = new ByteBufferDataInput(_encodedForm.duplicate()); + MarkableDataInput dataInput = _encodedForm.slice().asDataInput(); if (_encodedSize > 0) { - _properties = new LinkedHashMap<AMQShortString, AMQTypedValue>(INITIAL_HASHMAP_CAPACITY); + _properties = new LinkedHashMap<>(INITIAL_HASHMAP_CAPACITY); do { Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java Fri Aug 7 00:28:17 2015 @@ -20,10 +20,11 @@ */ package org.apache.qpid.framing; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.qpid.bytebuffer.QpidByteBuffer; + public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>, ClientMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>, ServerMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor> @@ -604,7 +605,7 @@ public class FrameCreatingMethodProcesso } @Override - public void receiveMessageContent(ByteBuffer data) + public void receiveMessageContent(QpidByteBuffer data) { _processedMethods.add(new AMQFrame(_channelId, new ContentBody(data))); } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Fri Aug 7 00:28:17 2015 @@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets import java.util.Arrays; import org.apache.qpid.QpidException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.util.BytesDataOutput; @@ -97,7 +98,7 @@ public class ProtocolInitiation extends byte[] data = new byte[8]; BytesDataOutput out = new BytesDataOutput(data); writePayload(out); - sender.send(ByteBuffer.wrap(data)); + sender.send(QpidByteBuffer.wrap(data)); return 8l; } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java Fri Aug 7 00:28:17 2015 @@ -22,9 +22,12 @@ package org.apache.qpid.transport; import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; + public interface ByteBufferSender { - void send(ByteBuffer msg); + + void send(QpidByteBuffer msg); void flush(); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageTransfer.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageTransfer.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageTransfer.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageTransfer.java Fri Aug 7 00:28:17 2015 @@ -21,9 +21,13 @@ package org.apache.qpid.transport; */ +import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.codec.Decoder; import org.apache.qpid.transport.codec.Encoder; @@ -66,13 +70,17 @@ public final class MessageTransfer exten private MessageAcceptMode acceptMode; private MessageAcquireMode acquireMode; private Header header; - private ByteBuffer body; + private Collection<QpidByteBuffer> _body; public MessageTransfer() {} + public MessageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, java.nio.ByteBuffer body, Option ... options) + { + this(destination, acceptMode, acquireMode, header, Collections.singletonList(QpidByteBuffer.wrap(body)), options); + } - public MessageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, java.nio.ByteBuffer body, Option ... _options) { + public MessageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, Collection<QpidByteBuffer> body, Option ... _options) { if(destination != null) { setDestination(destination); } @@ -194,42 +202,42 @@ public final class MessageTransfer exten return this; } - public int getBodySize() - { - return this.body == null ? 0 : this.body.remaining(); - } - - public final ByteBuffer getBody() { - if (this.body == null) + @Override + public final Collection<QpidByteBuffer> getBody() { + if (this._body == null) { return null; } else { - return this.body.slice(); + return Collections.unmodifiableCollection(_body); } } - public final void setBody(ByteBuffer body) { - this.body = body; + @Override + public final void setBody(Collection<QpidByteBuffer> body) { + this._body = body; } - public final MessageTransfer body(ByteBuffer body) + public final MessageTransfer body(List<QpidByteBuffer> body) { setBody(body); return this; } public final byte[] getBodyBytes() { - ByteBuffer buf = getBody(); - byte[] bytes = new byte[buf.remaining()]; - buf.get(bytes); + Collection<QpidByteBuffer> body = getBody(); + byte[] bytes = new byte[getBodySize()]; + for(QpidByteBuffer buf : body) + { + buf.duplicate().get(bytes); + } return bytes; } public final void setBody(byte[] body) { - setBody(ByteBuffer.wrap(body)); + setBody(Collections.singletonList(QpidByteBuffer.wrap(body))); } public final String getBodyString() { Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Method.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Method.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Method.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Method.java Fri Aug 7 00:28:17 2015 @@ -20,11 +20,14 @@ */ package org.apache.qpid.transport; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.network.Frame; import static org.apache.qpid.transport.util.Functions.str; import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; /** * Method @@ -125,26 +128,31 @@ public abstract class Method extends Str throw new UnsupportedOperationException(); } - public ByteBuffer getBody() + public Collection<QpidByteBuffer> getBody() { return null; } - public void setBody(ByteBuffer body) + public void setBody(Collection<QpidByteBuffer> body) { throw new UnsupportedOperationException(); } public int getBodySize() { - ByteBuffer body = getBody(); + Collection<QpidByteBuffer> body = getBody(); if (body == null) { return 0; } else { - return body.remaining(); + int size = 0; + for(QpidByteBuffer buf : body) + { + size += buf.remaining(); + } + return size; } } @@ -223,7 +231,7 @@ public abstract class Method extends Str str.append(st); } } - ByteBuffer body = getBody(); + Collection<QpidByteBuffer> body = getBody(); if (body != null) { str.append("\n body="); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java Fri Aug 7 00:28:17 2015 @@ -20,6 +20,7 @@ */ package org.apache.qpid.transport; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.network.Frame; import org.apache.qpid.transport.network.NetworkDelegate; import org.apache.qpid.transport.network.NetworkEvent; @@ -93,9 +94,9 @@ public final class ProtocolHeader implem return false; } - public ByteBuffer toByteBuffer(final boolean useDirect) + public QpidByteBuffer toByteBuffer() { - ByteBuffer buf = ByteBuffer.allocate(8); + QpidByteBuffer buf = QpidByteBuffer.allocate(8); buf.put(AMQP); buf.put(protoClass); buf.put(instance); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java Fri Aug 7 00:28:17 2015 @@ -42,7 +42,7 @@ import java.util.UUID; * @author Rafael H. Schloming */ -abstract class AbstractDecoder implements Decoder +public abstract class AbstractDecoder implements Decoder { private final Map<Binary,String> str8cache = new LinkedHashMap<Binary,String>() Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Fri Aug 7 00:28:17 2015 @@ -22,10 +22,12 @@ package org.apache.qpid.transport.networ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageProperties; @@ -241,7 +243,7 @@ public class Assembler implements Networ break; case BODY: command = getIncompleteCommand(channel); - command.setBody(segment); + command.setBody(Collections.singletonList(QpidByteBuffer.wrap(segment))); setIncompleteCommand(channel, null); emit(channel, command); break; Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Fri Aug 7 00:28:17 2015 @@ -30,6 +30,7 @@ import static org.apache.qpid.transport. import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.FrameSizeObserver; import org.apache.qpid.transport.Header; @@ -43,6 +44,7 @@ import org.apache.qpid.transport.Segment import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; import org.apache.qpid.transport.codec.Encoder; +import org.apache.qpid.util.ByteBufferUtils; /** * Disassembler @@ -116,8 +118,8 @@ public final class Disassembler implemen buf.limit(buf.position() + size); data.rewind(); - sender.send(data); - sender.send(buf); + sender.send(QpidByteBuffer.wrap(data)); + sender.send(QpidByteBuffer.wrap(buf)); buf.limit(limit); } @@ -159,7 +161,7 @@ public final class Disassembler implemen { synchronized (sendlock) { - sender.send(header.toByteBuffer(false)); + sender.send(header.toByteBuffer()); sender.flush(); } } @@ -235,7 +237,7 @@ public final class Disassembler implemen fragment(flags, type, method, buf); if (payload) { - ByteBuffer body = method.getBody(); + ByteBuffer body = ByteBufferUtils.combine(method.getBody()); buf.limit(headerLimit); buf.position(methodLimit); fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Fri Aug 7 00:28:17 2015 @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.Atomi import javax.net.ssl.SSLSocket; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; @@ -115,7 +116,7 @@ public final class IoSender implements R return result; } - public void send(ByteBuffer buf) + public void send(QpidByteBuffer buf) { checkNotAlreadyClosed(); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java Fri Aug 7 00:28:17 2015 @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.Atomi import javax.security.sasl.SaslException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.util.Logger; @@ -69,13 +70,13 @@ public class SASLSender extends SASLEncr delegate.flush(); } - public void send(ByteBuffer buf) - { + public void send(QpidByteBuffer buf) + { if (closed.get()) { throw new SenderException("SSL Sender is closed"); } - + if (isSecurityLayerEstablished()) { while (buf.hasRemaining()) @@ -83,28 +84,29 @@ public class SASLSender extends SASLEncr int length = Math.min(buf.remaining(), getSendBuffSize()); log.debug("sendBuffSize %s", getSendBuffSize()); log.debug("buf.remaining() %s", buf.remaining()); - + buf.get(appData, 0, length); try { byte[] out = getSaslClient().wrap(appData, 0, length); log.debug("out.length %s", out.length); - - delegate.send(ByteBuffer.wrap(out)); - } + + delegate.send(QpidByteBuffer.wrap(out)); + } catch (SaslException e) { log.error("Exception while encrypting data.",e); throw new SenderException("SASL Sender, Error occurred while encrypting data",e); } - } + } } else { delegate.send(buf); - } + } } + public void securityLayerEstablished() { appData = new byte[getSendBuffSize()]; Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java Fri Aug 7 00:28:17 2015 @@ -28,6 +28,7 @@ import javax.net.ssl.SSLEngineResult.Han import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.network.security.SSLStatus; @@ -126,7 +127,7 @@ public class SSLSender implements ByteBu netData.limit(limit); netData.position(netData.position() + read); - delegate.send(data); + delegate.send(QpidByteBuffer.wrap(data)); flush(); } result = engine.wrap(ByteBuffer.allocate(0), netData); @@ -140,7 +141,7 @@ public class SSLSender implements ByteBu delegate.flush(); } - public void send(ByteBuffer appData) + public void send(QpidByteBuffer appData) { if (closed.get() && !_sslStatus.getSslErrorFlag()) { @@ -155,7 +156,7 @@ public class SSLSender implements ByteBu int read = 0; try { - SSLEngineResult result = engine.wrap(appData, netData); + SSLEngineResult result = engine.wrap(appData.getNativeBuffer(), netData); read = result.bytesProduced(); status = result.getStatus(); handshakeStatus = result.getHandshakeStatus(); @@ -177,7 +178,7 @@ public class SSLSender implements ByteBu netData.limit(limit); netData.position(netData.position() + read); - delegate.send(data); + delegate.send(QpidByteBuffer.wrap(data)); } switch(status) @@ -219,24 +220,24 @@ public class SSLSender implements ByteBu switch (engine.getHandshakeStatus()) { - case NEED_UNWRAP: - final long start = System.currentTimeMillis(); - try - { - _sslStatus.getSslLock().wait(timeout); - } - catch(InterruptedException e) - { - // pass - } - - if (!_sslStatus.getSslErrorFlag() && System.currentTimeMillis() - start >= timeout) - { - throw new SenderException( - "SSL Engine timed out after waiting " + timeout + "ms. for a response." + - "To get more info,run with -Djavax.net.debug=ssl"); - } - break; + case NEED_UNWRAP: + final long start = System.currentTimeMillis(); + try + { + _sslStatus.getSslLock().wait(timeout); + } + catch(InterruptedException e) + { + // pass + } + + if (!_sslStatus.getSslErrorFlag() && System.currentTimeMillis() - start >= timeout) + { + throw new SenderException( + "SSL Engine timed out after waiting " + timeout + "ms. for a response." + + "To get more info,run with -Djavax.net.debug=ssl"); + } + break; } } break; @@ -246,7 +247,7 @@ public class SSLSender implements ByteBu { SSLUtil.verifyHostname(engine, _hostname); } - + case NOT_HANDSHAKING: break; //do nothing @@ -257,6 +258,8 @@ public class SSLSender implements ByteBu } } + + private void doTasks() { Runnable runnable; Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java Fri Aug 7 00:28:17 2015 @@ -23,6 +23,11 @@ package org.apache.qpid.transport.util; import static java.lang.Math.min; import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; + +import org.apache.qpid.bytebuffer.QpidByteBuffer; +import org.apache.qpid.util.ByteBufferUtils; /** @@ -63,11 +68,37 @@ public final class Functions public static final String str(ByteBuffer buf, int limit) { - return str(buf, limit,buf.position()); + return str(buf, limit,buf.position()); } - + public static final String str(ByteBuffer buf, int limit,int start) { + return str(QpidByteBuffer.wrap(buf), start,limit); + } + + public static final String str(QpidByteBuffer buf) + { + return str(buf, buf.remaining()); + } + + public static final String str(QpidByteBuffer buf, int limit) + { + return str(buf, limit, buf.position()); + } + + public static final String str(Collection<QpidByteBuffer> buf, int limit, int start) + { + return str(ByteBufferUtils.combine(buf),limit,start); + } + + + public static final String str(Collection<QpidByteBuffer> buf, int limit) + { + return str(buf, limit, 0); + } + + public static final String str(QpidByteBuffer buf, int limit, int start) + { StringBuilder str = new StringBuilder(); str.append('"'); @@ -109,7 +140,7 @@ public final class Functions { return hex(bytes, limit, ""); } - public static String hex(ByteBuffer bytes, int limit) + public static String hex(QpidByteBuffer bytes, int limit) { return hex(bytes, limit, ""); } @@ -134,7 +165,7 @@ public final class Functions return sb.toString(); } - public static String hex(ByteBuffer bytes, int limit, CharSequence separator) + public static String hex(QpidByteBuffer bytes, int limit, CharSequence separator) { limit = Math.min(limit, bytes == null ? 0 : bytes.remaining()); StringBuilder sb = new StringBuilder(3 + limit*2); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java Fri Aug 7 00:28:17 2015 @@ -23,37 +23,49 @@ package org.apache.qpid.util; import java.nio.ByteBuffer; import java.util.Collection; +import org.apache.qpid.bytebuffer.QpidByteBuffer; + public class ByteBufferUtils { private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); - public static ByteBuffer combine(Collection<ByteBuffer> bufs) + public static ByteBuffer combine(Collection<QpidByteBuffer> bufs) { if(bufs == null || bufs.isEmpty()) { return EMPTY_BYTE_BUFFER; } - else if(bufs.size() == 1) - { - return bufs.iterator().next(); - } else { int size = 0; boolean isDirect = false; - for(ByteBuffer buf : bufs) + for(QpidByteBuffer buf : bufs) { size += buf.remaining(); isDirect = isDirect || buf.isDirect(); } ByteBuffer combined = isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size); - for(ByteBuffer buf : bufs) + for(QpidByteBuffer buf : bufs) { - combined.put(buf.duplicate()); + buf.duplicate().get(combined); } combined.flip(); return combined; } } + + public static int remaining(Collection<QpidByteBuffer> bufs) + { + int size = 0; + if (bufs != null && !bufs.isEmpty()) + { + for (QpidByteBuffer buf : bufs) + { + size += buf.remaining(); + } + + } + return size; + } } Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original) +++ qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Fri Aug 7 00:28:17 2015 @@ -44,7 +44,7 @@ import org.apache.qpid.util.BytesDataOut public class AMQDecoderTest extends QpidTestCase { - private AMQDecoder _decoder; + private ClientDecoder _decoder; private FrameCreatingMethodProcessor _methodProcessor; @@ -97,7 +97,7 @@ public class AMQDecoderTest extends Qpid { assertEquals(ContentBody.TYPE, ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); ContentBody decodedBody = (ContentBody) ((AMQFrame) frames.get(0)).getBodyFrame(); - final ByteBuffer byteBuffer = decodedBody.getPayload().duplicate(); + final ByteBuffer byteBuffer = decodedBody.getPayload().getNativeBuffer().duplicate(); byte[] bodyBytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bodyBytes); assertTrue("Body was corrupted", Arrays.equals(payload, bodyBytes)); Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java (original) +++ qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java Fri Aug 7 00:28:17 2015 @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.test.utils.QpidTestCase; import org.junit.Assert; @@ -601,7 +602,7 @@ public class FieldTableTest extends Qpid ByteArrayOutputStream baos = new ByteArrayOutputStream((int) table.getEncodedSize() + 4); table.writeToBuffer(new DataOutputStream(baos)); - ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); + QpidByteBuffer buf = QpidByteBuffer.wrap(baos.toByteArray()); long length = buf.getInt() & 0xFFFFFFFFL; buf = buf.slice(); @@ -920,7 +921,7 @@ public class FieldTableTest extends Qpid assertEquals("unexpected data length", 24, length); //Create a second FieldTable from the encoded bytes - FieldTable tableFromBytes = new FieldTable(ByteBuffer.wrap(data)); + FieldTable tableFromBytes = new FieldTable(QpidByteBuffer.wrap(data)); //Create a final FieldTable and addAll() from the table created with encoded bytes FieldTable destinationTable = new FieldTable(); Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java Fri Aug 7 00:28:17 2015 @@ -62,6 +62,8 @@ public class MultipleAuthenticationManag sslPortAttributes.put(Port.KEY_STORE, TestBrokerConfiguration.ENTRY_NAME_SSL_KEYSTORE); sslPortAttributes.put(Port.TRUST_STORES, Collections.singleton(TestBrokerConfiguration.ENTRY_NAME_SSL_TRUSTSTORE)); sslPortAttributes.put(Port.AUTHENTICATION_PROVIDER, TestBrokerConfiguration.ENTRY_NAME_ANONYMOUS_PROVIDER); + sslPortAttributes.put(Port.PROTOCOLS, System.getProperty(TEST_AMQP_PORT_PROTOCOLS_PROPERTY)); + config.addObjectConfiguration(Port.class, sslPortAttributes); Map<String, Object> aliasAttributes = new HashMap<>(); Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java Fri Aug 7 00:28:17 2015 @@ -23,6 +23,7 @@ package org.apache.qpid.test.unit.basic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -133,7 +134,7 @@ public class FieldTableMessageTest exten final long bodyLength = bytesMessage.getBodyLength(); byte[] data = new byte[(int) bodyLength]; bytesMessage.readBytes(data); - FieldTable actual = new FieldTable(ByteBuffer.wrap(data)); + FieldTable actual = new FieldTable(QpidByteBuffer.wrap(data)); for (String key : _expected.keys()) { assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key)); Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Fri Aug 7 00:28:17 2015 @@ -264,7 +264,7 @@ public class MaxFrameSizeTest extends Qp } final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); - AMQDecoder decoder = new ClientDecoder(methodProcessor); + ClientDecoder decoder = new ClientDecoder(methodProcessor); byte[] buffer = new byte[1024]; Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java Fri Aug 7 00:28:17 2015 @@ -29,6 +29,7 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.framing.ByteArrayDataInput; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.ProtocolInitiation; @@ -197,8 +198,34 @@ public class ProtocolNegotiationTest ext ConnectionHeartbeat heartbeat = new ConnectionHeartbeat(); ServerDisassembler serverDisassembler = new ServerDisassembler(new ByteBufferSender() { + private void send(final ByteBuffer msg) + { + try + { + if(msg.hasArray()) + { + dataOutputStream.write(msg.array(), msg.arrayOffset() + msg.position(), msg.remaining()); + } + else + { + byte[] data = new byte[msg.remaining()]; + msg.duplicate().get(data); + dataOutputStream.write(data, 0, data.length); + } + } + catch (SocketException se) + { + + success.set(false); + } + catch(IOException e) + { + throw new RuntimeException("Unexpected IOException", e); + } + } + @Override - public void send(final ByteBuffer msg) + public void send(final QpidByteBuffer msg) { try { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
