Author: rgodfrey
Date: Sun Feb 1 15:18:17 2015
New Revision: 1656312
URL: http://svn.apache.org/r1656312
Log:
Reduce copying in 0-9 path
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
Sun Feb 1 15:18:17 2015
@@ -34,13 +34,12 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport;
class TCPandSSLTransport implements AcceptingTransport
{
- private IncomingNetworkTransport _networkTransport;
+ private NonBlockingNetworkTransport _networkTransport;
private Set<Transport> _transports;
private SSLContext _sslContext;
private InetSocketAddress _bindingSocketAddress;
@@ -64,7 +63,7 @@ class TCPandSSLTransport implements Acce
@Override
public void start()
{
- String bindingAddress = ((AmqpPort<?>)_port).getBindingAddress();
+ String bindingAddress = _port.getBindingAddress();
if (WILDCARD_ADDRESS.equals(bindingAddress))
{
bindingAddress = null;
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
Sun Feb 1 15:18:17 2015
@@ -486,7 +486,14 @@ public class AMQProtocolEngine implement
serverProperties,
mechanisms.getBytes(),
locales.getBytes());
- _sender.send(asByteBuffer(responseBody.generateFrame(0)));
+ try
+ {
+ responseBody.generateFrame(0).writePayload(_sender);
+ }
+ catch (IOException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
_sender.flush();
}
@@ -494,7 +501,14 @@ public class AMQProtocolEngine implement
{
_logger.info("Received unsupported protocol initiation for
protocol version: " + getProtocolVersion());
- _sender.send(asByteBuffer(new
ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
+ try
+ {
+ new
ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).writePayload(_sender);
+ }
+ catch (IOException ioex)
+ {
+ throw new ServerScopedRuntimeException(ioex);
+ }
_sender.flush();
}
}
@@ -546,16 +560,21 @@ public class AMQProtocolEngine implement
*/
public synchronized void writeFrame(AMQDataBlock frame)
{
-
- final ByteBuffer buf = asByteBuffer(frame);
- _writtenBytes += buf.remaining();
-
if(_logger.isDebugEnabled())
{
_logger.debug("SEND: " + frame);
}
- _sender.send(buf);
+ try
+ {
+ _writtenBytes += frame.writePayload(_sender);
+ }
+ catch (IOException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+
+
final long time = System.currentTimeMillis();
_lastIoTime = time;
_lastWriteTime.set(time);
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
Sun Feb 1 15:18:17 2015
@@ -42,6 +42,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.util.GZIPUtils;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -255,6 +256,15 @@ public class ProtocolOutputConverterImpl
}
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws
IOException
+ {
+ ByteBuffer buf = _message.getContent(_offset, _length);
+ long size = buf.remaining();
+ sender.send(buf.duplicate());
+ return size;
+ }
+
public void handle(int channelId, AMQVersionAwareProtocolSession
amqProtocolSession) throws AMQException
{
throw new UnsupportedOperationException();
@@ -346,6 +356,15 @@ public class ProtocolOutputConverterImpl
_underlyingBody.writePayload(buffer);
}
+ public long writePayload(ByteBufferSender sender) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.writePayload(sender);
+ }
+
public void handle(final int channelId, final
AMQVersionAwareProtocolSession amqMinaProtocolSession)
throws AMQException
{
@@ -449,6 +468,18 @@ public class ProtocolOutputConverterImpl
}
@Override
+ public long writePayload(final ByteBufferSender sender) throws
IOException
+ {
+ long size = (new AMQFrame(_channel,
_methodBody)).writePayload(sender);
+
+ size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
+
+ size += (new AMQFrame(_channel,
_contentBody)).writePayload(sender);
+
+ return size;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder();
@@ -490,6 +521,14 @@ public class ProtocolOutputConverterImpl
}
@Override
+ public long writePayload(final ByteBufferSender sender) throws
IOException
+ {
+ long size = (new AMQFrame(_channel,
_methodBody)).writePayload(sender);
+ size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
+ return size;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder();
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
Sun Feb 1 15:18:17 2015
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+
public interface AMQBody
{
public byte getFrameType();
@@ -39,4 +40,6 @@ public interface AMQBody
public void writePayload(DataOutput buffer) throws IOException;
void handle(final int channelId, final AMQVersionAwareProtocolSession
amqProtocolSession) throws AMQException;
+
+ long writePayload(ByteBufferSender sender) throws IOException;
}
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
Sun Feb 1 15:18:17 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.qpid.transport.ByteBufferSender;
+
/**
* A data block represents something that has a size in bytes and the ability
to write itself to a byte
@@ -44,4 +46,6 @@ public abstract class AMQDataBlock imple
*/
public abstract void writePayload(DataOutput buffer) throws IOException;
+ public abstract long writePayload(ByteBufferSender sender) throws
IOException;
+
}
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
Sun Feb 1 15:18:17 2015
@@ -22,6 +22,10 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -57,6 +61,25 @@ public class AMQFrame extends AMQDataBlo
buffer.writeByte(FRAME_END_BYTE);
}
+ private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] {
FRAME_END_BYTE };
+
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ byte[] frameHeader = new byte[7];
+ BytesDataOutput buffer = new BytesDataOutput(frameHeader);
+
+ buffer.writeByte(_bodyFrame.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, _channel);
+ EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
+ sender.send(ByteBuffer.wrap(frameHeader));
+
+ long size = 8 + _bodyFrame.writePayload(sender);
+
+ sender.send(ByteBuffer.wrap(FRAME_END_BYTE_ARRAY));
+ return size;
+ }
+
public final int getChannel()
{
return _channel;
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
Sun Feb 1 15:18:17 2015
@@ -24,12 +24,15 @@ package org.apache.qpid.framing;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
@@ -105,6 +108,16 @@ public abstract class AMQMethodBodyImpl
writeMethodPayload(buffer);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ final int size = getSize();
+ byte[] bytes = new byte[size];
+ BytesDataOutput buffer = new BytesDataOutput(bytes);
+ writePayload(buffer);
+ sender.send(ByteBuffer.wrap(bytes));
+ return size;
+ }
protected int getSizeOf(AMQShortString string)
{
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
Sun Feb 1 15:18:17 2015
@@ -23,10 +23,14 @@ package org.apache.qpid.framing;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
+
public class BasicContentHeaderProperties
{
//persistent & non-persistent constants, values as per JMS DeliveryMode
@@ -314,6 +318,26 @@ public class BasicContentHeaderPropertie
}
}
+
+ public long writePropertyListPayload(final ByteBufferSender sender) throws
IOException
+ {
+ if(useEncodedForm())
+ {
+ sender.send(ByteBuffer.wrap(_encodedForm));
+ return _encodedForm.length;
+ }
+ else
+ {
+ int propertyListSize = getPropertyListSize();
+ byte[] data = new byte[propertyListSize];
+ BytesDataOutput out = new BytesDataOutput(data);
+ writePropertyListPayload(out);
+ sender.send(ByteBuffer.wrap(data));
+ return propertyListSize;
+ }
+
+ }
+
public void populatePropertiesFromBuffer(DataInput buffer, int
propertyFlags, int size) throws AMQFrameDecodingException, IOException
{
_propertyFlags = propertyFlags;
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
Sun Feb 1 15:18:17 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.qpid.transport.ByteBufferSender;
+
public class CompositeAMQDataBlock extends AMQDataBlock implements
EncodableAMQDataBlock
{
@@ -58,6 +60,17 @@ public class CompositeAMQDataBlock exten
}
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ long size = 0l;
+ for (int i = 0; i < _blocks.length; i++)
+ {
+ size += _blocks[i].writePayload(sender);
+ }
+ return size;
+ }
+
public String toString()
{
if (_blocks == null)
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
Sun Feb 1 15:18:17 2015
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
public class ContentBody implements AMQBody
{
@@ -72,6 +73,20 @@ public class ContentBody implements AMQB
session.contentBodyReceived(channelId, this);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ if(_payload != null)
+ {
+ sender.send(ByteBuffer.wrap(_payload));
+ return _payload.length;
+ }
+ else
+ {
+ return 0l;
+ }
+ }
+
public byte[] getPayload()
{
return _payload;
@@ -133,6 +148,23 @@ public class ContentBody implements AMQB
}
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws
IOException
+ {
+ if(_buf.hasArray())
+ {
+ sender.send(ByteBuffer.wrap(_buf.array(), _buf.arrayOffset() +
_offset, _length));
+ }
+ else
+ {
+ ByteBuffer buf = _buf.duplicate();
+
+ buf.position(_offset);
+ buf.limit(_offset+_length);
+ sender.send(buf);
+ }
+ return _length;
+ }
public void handle(int channelId, AMQVersionAwareProtocolSession
amqProtocolSession) throws AMQException
{
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
Sun Feb 1 15:18:17 2015
@@ -24,10 +24,13 @@ import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public class ContentHeaderBody implements AMQBody
{
@@ -98,6 +101,19 @@ public class ContentHeaderBody implement
_properties.writePropertyListPayload(buffer);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ byte[] data = new byte[14];
+ BytesDataOutput buffer = new BytesDataOutput(data);
+ EncodingUtils.writeUnsignedShort(buffer, CLASS_ID);
+ EncodingUtils.writeUnsignedShort(buffer, 0);
+ buffer.writeLong(_bodySize);
+ EncodingUtils.writeUnsignedShort(buffer,
_properties.getPropertyFlags());
+ sender.send(ByteBuffer.wrap(data));
+ return 14 + _properties.writePropertyListPayload(sender);
+ }
+
public void handle(final int channelId, final
AMQVersionAwareProtocolSession session)
throws AMQException
{
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
Sun Feb 1 15:18:17 2015
@@ -27,6 +27,7 @@ import java.io.IOException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
public class HeartbeatBody implements AMQBody
{
@@ -61,6 +62,12 @@ public class HeartbeatBody implements AM
{
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ return 0l;
+ }
+
public void handle(final int channelId, final
AMQVersionAwareProtocolSession session)
throws AMQException
{
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
Sun Feb 1 15:18:17 2015
@@ -23,11 +23,14 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public class ProtocolInitiation extends AMQDataBlock implements
EncodableAMQDataBlock
{
@@ -88,6 +91,16 @@ public class ProtocolInitiation extends
buffer.write(_protocolMinor);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ byte[] data = new byte[8];
+ BytesDataOutput out = new BytesDataOutput(data);
+ writePayload(out);
+ sender.send(ByteBuffer.wrap(data));
+ return 8l;
+ }
+
public boolean equals(Object o)
{
if (!(o instanceof ProtocolInitiation))
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
Sun Feb 1 15:18:17 2015
@@ -37,10 +37,9 @@ import org.apache.qpid.protocol.Protocol
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.TransportEncryption;
-public class NonBlockingNetworkTransport implements IncomingNetworkTransport
+public class NonBlockingNetworkTransport
{
private static final org.slf4j.Logger LOGGER =
LoggerFactory.getLogger(AbstractNetworkTransport.class);
@@ -51,21 +50,6 @@ public class NonBlockingNetworkTransport
private AcceptingThread _acceptor;
private SelectorThread _selector;
- protected NonBlockingConnection createNetworkConnection(final
SocketChannel socketChannel,
- final
ServerProtocolEngine engine,
- final Integer
sendBufferSize,
- final Integer
receiveBufferSize,
- final int timeout,
- final
IdleTimeoutTicker ticker,
- final
Set<TransportEncryption> encryptionSet,
- final SSLContext
sslContext,
- final boolean
wantClientAuth,
- final boolean
needClientAuth,
- final Runnable
onTransportEncryptionAction)
- {
- return new NonBlockingConnection(socketChannel, engine,
sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext,
wantClientAuth, needClientAuth, onTransportEncryptionAction, _selector);
- }
-
public void close()
{
if(_acceptor != null)
@@ -173,59 +157,7 @@ public class NonBlockingNetworkTransport
{
socketChannel = _serverSocket.accept();
- final ServerProtocolEngine engine =
- (ServerProtocolEngine)
_factory.newProtocolEngine(socketChannel.socket()
-
.getRemoteSocketAddress());
-
- if(engine != null)
- {
-
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY,
_config.getTcpNoDelay());
- socketChannel.socket().setSoTimeout(1000 *
HANDSHAKE_TIMEOUT);
-
- final Integer sendBufferSize =
_config.getSendBufferSize();
- final Integer receiveBufferSize =
_config.getReceiveBufferSize();
-
-
socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
-
socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
-
-
- final IdleTimeoutTicker ticker = new
IdleTimeoutTicker(engine, TIMEOUT);
-
- NonBlockingConnection connection =
- createNetworkConnection(socketChannel,
- engine,
- sendBufferSize,
- receiveBufferSize,
- _timeout,
- ticker,
- _encryptionSet,
- _sslContext,
-
_config.wantClientAuth(),
-
_config.needClientAuth(),
- new Runnable()
- {
-
- @Override
- public void
run()
- {
-
engine.encryptedTransport();
- }
- });
-
- engine.setNetworkConnection(connection,
connection.getSender());
- connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
-
- ticker.setConnection(connection);
-
- connection.start();
-
- _selector.addConnection(connection);
-
- }
- else
- {
- socketChannel.close();
- }
+ acceptSocketChannel(socketChannel);
}
catch(RuntimeException e)
{
@@ -262,6 +194,64 @@ public class NonBlockingNetworkTransport
}
}
+ public void acceptSocketChannel(final SocketChannel socketChannel)
throws IOException
+ {
+ final ServerProtocolEngine engine =
+ (ServerProtocolEngine)
_factory.newProtocolEngine(socketChannel.socket()
+
.getRemoteSocketAddress());
+
+ if(engine != null)
+ {
+ socketChannel.setOption(StandardSocketOptions.TCP_NODELAY,
_config.getTcpNoDelay());
+ socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize =
_config.getReceiveBufferSize();
+
+ socketChannel.setOption(StandardSocketOptions.SO_SNDBUF,
sendBufferSize);
+ socketChannel.setOption(StandardSocketOptions.SO_RCVBUF,
receiveBufferSize);
+
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine,
TIMEOUT);
+
+ NonBlockingConnection connection =
+ new NonBlockingConnection(socketChannel,
+ engine,
+ sendBufferSize,
+ receiveBufferSize,
+ _timeout,
+ ticker,
+ _encryptionSet,
+ _sslContext,
+ _config.wantClientAuth(),
+ _config.needClientAuth(),
+ new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+
engine.encryptedTransport();
+ }
+ },
+ _selector);
+
+ engine.setNetworkConnection(connection,
connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+
+ ticker.setConnection(connection);
+
+ connection.start();
+
+ _selector.addConnection(connection);
+
+ }
+ else
+ {
+ socketChannel.close();
+ }
+ }
+
private void closeSocketIfNecessary(final Socket socket)
{
if(socket != null)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]