Author: kwall
Date: Thu Feb 18 17:05:14 2016
New Revision: 1731097
URL: http://svn.apache.org/viewvc?rev=1731097&view=rev
Log:
QPID-7057: [Java Client] Have SSLSender gather buffers and wrap them all on
each flush
* Changed 0-10's (Client) Disassembler to no longer assumes it can mutate the
buffer after it passes to org.apache.qpid.transport.ByteBufferSender#send
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1731097&r1=1731096&r2=1731097&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
Thu Feb 18 17:05:14 2016
@@ -28,7 +28,9 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.network.Frame.LAST_SEG;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,9 +48,6 @@ import org.apache.qpid.transport.Protoco
import org.apache.qpid.transport.SegmentType;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
-import org.apache.qpid.transport.codec.Encoder;
-import org.apache.qpid.transport.util.Functions;
-import org.apache.qpid.util.ByteBufferUtils;
/**
* Disassembler
@@ -56,9 +55,9 @@ import org.apache.qpid.util.ByteBufferUt
public final class Disassembler implements ProtocolEventSender,
ProtocolDelegate<Void>, FrameSizeObserver
{
private static final Logger LOGGER =
LoggerFactory.getLogger(Disassembler.class);
- private final ByteBufferSender sender;
- private int maxPayload;
- private final Object sendlock = new Object();
+ private final ByteBufferSender _sender;
+ private final Object _sendlock = new Object();
+ private volatile int _maxPayload;
private final static ThreadLocal<BBEncoder> _encoder = new
ThreadLocal<BBEncoder>()
{
public BBEncoder initialValue()
@@ -69,12 +68,12 @@ public final class Disassembler implemen
public Disassembler(ByteBufferSender sender, int maxFrame)
{
- this.sender = sender;
+ _sender = sender;
if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
{
throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE
and < 64K: " + maxFrame);
}
- this.maxPayload = maxFrame - HEADER_SIZE;
+ _maxPayload = maxFrame - HEADER_SIZE;
}
public void send(ProtocolEvent event)
@@ -84,91 +83,26 @@ public final class Disassembler implemen
public void flush()
{
- synchronized (sendlock)
+ synchronized (_sendlock)
{
- sender.flush();
+ _sender.flush();
}
}
public void close()
{
- synchronized (sendlock)
+ synchronized (_sendlock)
{
- sender.close();
- }
- }
-
- private final ByteBuffer _frameHeader = ByteBuffer.allocate(HEADER_SIZE);
-
- {
- _frameHeader.order(ByteOrder.BIG_ENDIAN);
- }
-
- private void frame(byte flags, byte type, byte track, int channel, int
size, ByteBuffer buf)
- {
- synchronized (sendlock)
- {
- ByteBuffer data = _frameHeader;
- _frameHeader.rewind();
-
-
- data.put(0, flags);
- data.put(1, type);
- data.putShort(2, (short) (size + HEADER_SIZE));
- data.put(5, track);
- data.putShort(6, (short) channel);
-
-
- int limit = buf.limit();
- buf.limit(buf.position() + size);
-
- data.rewind();
- sender.send(QpidByteBuffer.wrap(data));
- sender.send(QpidByteBuffer.wrap(buf));
-
- buf.limit(limit);
-
- }
- }
-
- private void fragment(byte flags, SegmentType type, ProtocolEvent event,
ByteBuffer buf)
- {
- byte typeb = (byte) type.getValue();
- byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
-
- int remaining = buf.remaining();
- boolean first = true;
- while (true)
- {
- int size = min(maxPayload, remaining);
- remaining -= size;
-
- byte newflags = flags;
- if (first)
- {
- newflags |= FIRST_FRAME;
- first = false;
- }
- if (remaining == 0)
- {
- newflags |= LAST_FRAME;
- }
-
- frame(newflags, typeb, track, event.getChannel(), size, buf);
-
- if (remaining == 0)
- {
- break;
- }
+ _sender.close();
}
}
public void init(Void v, ProtocolHeader header)
{
- synchronized (sendlock)
+ synchronized (_sendlock)
{
- sender.send(header.toByteBuffer());
- sender.flush();
+ _sender.send(header.toByteBuffer());
+ _sender.flush();
}
}
@@ -234,24 +168,114 @@ public final class Disassembler implemen
headerLimit = enc.position();
}
- synchronized (sendlock)
+ synchronized (_sendlock)
{
ByteBuffer buf = enc.underlyingBuffer();
- buf.position(0);
- buf.limit(methodLimit);
-
- fragment(flags, type, method, buf);
+ buf.flip();
+ QpidByteBuffer copy =
QpidByteBuffer.allocate(_sender.isDirectBufferPreferred(), buf.remaining());
+ copy.putCopyOf(QpidByteBuffer.wrap(buf));
+ copy.flip();
+
+ final QpidByteBuffer methodBuf = copy.view(0, methodLimit);
+ fragment(flags, type, method,
Collections.singletonList(methodBuf));
+ methodBuf.dispose();
if (payload)
{
- ByteBuffer body = ByteBufferUtils.combine(method.getBody());
- buf.limit(headerLimit);
- buf.position(methodLimit);
- fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER,
method, buf);
- if (body != null)
+ Collection<QpidByteBuffer> bodies = method.getBody();
+ QpidByteBuffer headerBuf = copy.view(methodLimit, headerLimit);
+ fragment(bodies == null ? LAST_SEG : 0x0, SegmentType.HEADER,
method, Collections.singletonList(headerBuf));
+ headerBuf.dispose();
+ if (bodies != null)
{
- fragment(LAST_SEG, SegmentType.BODY, method, body);
+ Collection<QpidByteBuffer> dup = new
ArrayList<>(bodies.size());
+ for(QpidByteBuffer b : bodies)
+ {
+ dup.add(b.duplicate());
+ }
+ fragment(LAST_SEG, SegmentType.BODY, method, dup);
+ for(QpidByteBuffer b : dup)
+ {
+ b.dispose();
+ }
}
+ }
+ copy.dispose();
+ }
+ }
+
+ private void fragment(byte flags, SegmentType type, ProtocolEvent event,
Collection<QpidByteBuffer> buffers)
+ {
+ byte typeb = (byte) type.getValue();
+ byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
+
+ int remaining = 0;
+ for(QpidByteBuffer b : buffers)
+ {
+ remaining += b.remaining();
+ }
+ boolean first = true;
+ while (true)
+ {
+ int size = min(_maxPayload, remaining);
+ remaining -= size;
+ byte newflags = flags;
+ if (first)
+ {
+ newflags |= FIRST_FRAME;
+ first = false;
+ }
+ if (remaining == 0)
+ {
+ newflags |= LAST_FRAME;
+ }
+
+ frame(newflags, typeb, track, event.getChannel(), size, buffers);
+
+ if (remaining == 0)
+ {
+ break;
+ }
+ }
+ }
+
+ private void frame(byte flags, byte type, byte track, int channel, int
size, Collection<QpidByteBuffer> buffers)
+ {
+ QpidByteBuffer data =
QpidByteBuffer.allocate(_sender.isDirectBufferPreferred(), HEADER_SIZE);
+
+ data.put(0, flags);
+ data.put(1, type);
+ data.putShort(2, (short) (size + HEADER_SIZE));
+ data.put(4, (byte) 0);
+ data.put(5, track);
+ data.putShort(6, (short) channel);
+
+
+ _sender.send(data);
+ data.dispose();
+
+ if(size > 0)
+ {
+ int residual = size;
+ for(QpidByteBuffer b : buffers)
+ {
+ final int remaining = b.remaining();
+ if(remaining > 0 )
+ {
+ if(remaining >= residual)
+ {
+ final QpidByteBuffer buffer = b.view(0, residual);
+ _sender.send(buffer);
+ buffer.dispose();
+ b.position(b.position() + residual);
+ break;
+ }
+ else
+ {
+ _sender.send(b);
+ residual-=remaining;
+ }
+ }
}
}
}
@@ -268,7 +292,7 @@ public final class Disassembler implemen
{
throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE
and < 64K: " + maxFrame);
}
- this.maxPayload = maxFrame - HEADER_SIZE;
+ _maxPayload = maxFrame - HEADER_SIZE;
}
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java?rev=1731097&r1=1731096&r2=1731097&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
Thu Feb 18 17:05:14 2016
@@ -20,6 +20,11 @@
package org.apache.qpid.transport.network.security.ssl;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine;
@@ -43,13 +48,14 @@ public class SSLSender implements ByteBu
private final ByteBufferSender delegate;
private final SSLEngine engine;
private final int sslBufSize;
- private final ByteBuffer netData;
+ private final QpidByteBuffer netData;
private final long timeout;
private final SSLStatus _sslStatus;
private String _hostname;
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final ConcurrentLinkedQueue<QpidByteBuffer> _pending = new
ConcurrentLinkedQueue<>();
public SSLSender(SSLEngine engine, ByteBufferSender delegate, SSLStatus
sslStatus)
@@ -57,7 +63,7 @@ public class SSLSender implements ByteBu
this.engine = engine;
this.delegate = delegate;
sslBufSize = engine.getSession().getPacketBufferSize();
- netData = ByteBuffer.allocate(sslBufSize);
+ netData = QpidByteBuffer.allocate(sslBufSize);
timeout = Long.getLong("qpid.ssl_timeout", 60000);
_sslStatus = sslStatus;
}
@@ -109,7 +115,9 @@ public class SSLSender implements ByteBu
private void tearDownSSLConnection() throws Exception
{
- SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
+ SSLEngineResult result = QpidByteBuffer.encryptSSL(engine,
+
Collections.singletonList(QpidByteBuffer.allocate(0)),
+ netData);
Status status = result.getStatus();
int read = result.bytesProduced();
while (status != Status.CLOSED)
@@ -124,15 +132,17 @@ public class SSLSender implements ByteBu
netData.limit(netData.position());
netData.position(netData.position() - read);
- ByteBuffer data = netData.slice();
+ QpidByteBuffer data = netData.slice();
netData.limit(limit);
netData.position(netData.position() + read);
- delegate.send(QpidByteBuffer.wrap(data));
+ delegate.send(data);
flush();
}
- result = engine.wrap(ByteBuffer.allocate(0), netData);
+ result = QpidByteBuffer.encryptSSL(engine,
+
Collections.singletonList(QpidByteBuffer.allocate(0)),
+ netData);
status = result.getStatus();
read = result.bytesProduced();
}
@@ -140,7 +150,9 @@ public class SSLSender implements ByteBu
public void flush()
{
+ doSend();
delegate.flush();
+
}
@Override
@@ -151,7 +163,12 @@ public class SSLSender implements ByteBu
public void send(QpidByteBuffer appData)
{
- appData = appData.duplicate();
+ _pending.add(appData.duplicate());
+
+ }
+
+ public void doSend()
+ {
if (closed.get() && !_sslStatus.getSslErrorFlag())
{
throw new SenderException("SSL Sender is closed");
@@ -160,12 +177,24 @@ public class SSLSender implements ByteBu
HandshakeStatus handshakeStatus;
Status status;
- while(appData.hasRemaining() && !_sslStatus.getSslErrorFlag())
+ while(!_pending.isEmpty() && !_sslStatus.getSslErrorFlag())
{
int read = 0;
try
{
- SSLEngineResult result = engine.wrap(appData.asByteBuffer(),
netData);
+ SSLEngineResult result = QpidByteBuffer.encryptSSL(engine,
_pending, netData);
+
+ while(!_pending.isEmpty())
+ {
+ QpidByteBuffer buf = _pending.peek();
+ if (buf.hasRemaining())
+ {
+ break;
+ }
+ buf.dispose();
+ _pending.poll();
+ }
+
read = result.bytesProduced();
status = result.getStatus();
handshakeStatus = result.getHandshakeStatus();
@@ -182,12 +211,12 @@ public class SSLSender implements ByteBu
netData.limit(netData.position());
netData.position(netData.position() - read);
- ByteBuffer data = netData.slice();
+ QpidByteBuffer data = netData.slice();
netData.limit(limit);
netData.position(netData.position() + read);
- delegate.send(QpidByteBuffer.wrap(data));
+ delegate.send(data);
}
switch(status)
@@ -219,7 +248,7 @@ public class SSLSender implements ByteBu
break;
case NEED_UNWRAP:
- flush();
+ delegate.flush();
synchronized(_sslStatus.getSslLock())
{
if (_sslStatus.getSslErrorFlag())
@@ -265,11 +294,9 @@ public class SSLSender implements ByteBu
}
}
- appData.dispose();
}
-
private void doTasks()
{
Runnable runnable;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]