Author: rgodfrey
Date: Sun Feb  1 15:18:17 2015
New Revision: 1656312

URL: http://svn.apache.org/r1656312
Log:
Reduce copying in 0-9 path

Modified:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
 Sun Feb  1 15:18:17 2015
@@ -34,13 +34,12 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
 import org.apache.qpid.transport.network.TransportEncryption;
 import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport;
 
 class TCPandSSLTransport implements AcceptingTransport
 {
-    private IncomingNetworkTransport _networkTransport;
+    private NonBlockingNetworkTransport _networkTransport;
     private Set<Transport> _transports;
     private SSLContext _sslContext;
     private InetSocketAddress _bindingSocketAddress;
@@ -64,7 +63,7 @@ class TCPandSSLTransport implements Acce
     @Override
     public void start()
     {
-        String bindingAddress = ((AmqpPort<?>)_port).getBindingAddress();
+        String bindingAddress = _port.getBindingAddress();
         if (WILDCARD_ADDRESS.equals(bindingAddress))
         {
             bindingAddress = null;

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 Sun Feb  1 15:18:17 2015
@@ -486,7 +486,14 @@ public class AMQProtocolEngine implement
                                                                                
        serverProperties,
                                                                                
        mechanisms.getBytes(),
                                                                                
        locales.getBytes());
-            _sender.send(asByteBuffer(responseBody.generateFrame(0)));
+            try
+            {
+                responseBody.generateFrame(0).writePayload(_sender);
+            }
+            catch (IOException e)
+            {
+                throw new ServerScopedRuntimeException(e);
+            }
             _sender.flush();
 
         }
@@ -494,7 +501,14 @@ public class AMQProtocolEngine implement
         {
             _logger.info("Received unsupported protocol initiation for 
protocol version: " + getProtocolVersion());
 
-            _sender.send(asByteBuffer(new 
ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
+            try
+            {
+                new 
ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).writePayload(_sender);
+            }
+            catch (IOException ioex)
+            {
+                throw new ServerScopedRuntimeException(ioex);
+            }
             _sender.flush();
         }
     }
@@ -546,16 +560,21 @@ public class AMQProtocolEngine implement
      */
     public synchronized void writeFrame(AMQDataBlock frame)
     {
-
-        final ByteBuffer buf = asByteBuffer(frame);
-        _writtenBytes += buf.remaining();
-
         if(_logger.isDebugEnabled())
         {
             _logger.debug("SEND: " + frame);
         }
 
-        _sender.send(buf);
+        try
+        {
+            _writtenBytes += frame.writePayload(_sender);
+        }
+        catch (IOException e)
+        {
+            throw new ServerScopedRuntimeException(e);
+        }
+
+
         final long time = System.currentTimeMillis();
         _lastIoTime = time;
         _lastWriteTime.set(time);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
 Sun Feb  1 15:18:17 2015
@@ -42,6 +42,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.util.GZIPUtils;
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -255,6 +256,15 @@ public class ProtocolOutputConverterImpl
             }
         }
 
+        @Override
+        public long writePayload(final ByteBufferSender sender) throws 
IOException
+        {
+            ByteBuffer buf = _message.getContent(_offset, _length);
+            long size = buf.remaining();
+            sender.send(buf.duplicate());
+            return size;
+        }
+
         public void handle(int channelId, AMQVersionAwareProtocolSession 
amqProtocolSession) throws AMQException
         {
             throw new UnsupportedOperationException();
@@ -346,6 +356,15 @@ public class ProtocolOutputConverterImpl
             _underlyingBody.writePayload(buffer);
         }
 
+        public long writePayload(ByteBufferSender sender) throws IOException
+        {
+            if(_underlyingBody == null)
+            {
+                _underlyingBody = createAMQBody();
+            }
+            return _underlyingBody.writePayload(sender);
+        }
+
         public void handle(final int channelId, final 
AMQVersionAwareProtocolSession amqMinaProtocolSession)
             throws AMQException
         {
@@ -449,6 +468,18 @@ public class ProtocolOutputConverterImpl
         }
 
         @Override
+        public long writePayload(final ByteBufferSender sender) throws 
IOException
+        {
+            long size = (new AMQFrame(_channel, 
_methodBody)).writePayload(sender);
+
+            size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
+
+            size += (new AMQFrame(_channel, 
_contentBody)).writePayload(sender);
+
+            return size;
+        }
+
+        @Override
         public String toString()
         {
             StringBuilder builder = new StringBuilder();
@@ -490,6 +521,14 @@ public class ProtocolOutputConverterImpl
         }
 
         @Override
+        public long writePayload(final ByteBufferSender sender) throws 
IOException
+        {
+            long size = (new AMQFrame(_channel, 
_methodBody)).writePayload(sender);
+            size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
+            return size;
+        }
+
+        @Override
         public String toString()
         {
             StringBuilder builder = new StringBuilder();

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
 Sun Feb  1 15:18:17 2015
@@ -20,12 +20,13 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+
 public interface AMQBody
 {
     public  byte getFrameType();
@@ -39,4 +40,6 @@ public interface AMQBody
     public void writePayload(DataOutput buffer) throws IOException;
     
     void handle(final int channelId, final AMQVersionAwareProtocolSession 
amqProtocolSession) throws AMQException;
+
+    long writePayload(ByteBufferSender sender) throws IOException;
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
 Sun Feb  1 15:18:17 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.framing;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.qpid.transport.ByteBufferSender;
+
 
 /**
  * A data block represents something that has a size in bytes and the ability 
to write itself to a byte
@@ -44,4 +46,6 @@ public abstract class AMQDataBlock imple
      */
     public abstract void writePayload(DataOutput buffer) throws IOException;
 
+    public abstract long writePayload(ByteBufferSender sender) throws 
IOException;
+
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
 Sun Feb  1 15:18:17 2015
@@ -22,6 +22,10 @@ package org.apache.qpid.framing;
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
 
 public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
 {
@@ -57,6 +61,25 @@ public class AMQFrame extends AMQDataBlo
         buffer.writeByte(FRAME_END_BYTE);
     }
 
+    private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { 
FRAME_END_BYTE };
+
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        byte[] frameHeader = new byte[7];
+        BytesDataOutput buffer = new BytesDataOutput(frameHeader);
+
+        buffer.writeByte(_bodyFrame.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, _channel);
+        EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
+        sender.send(ByteBuffer.wrap(frameHeader));
+
+        long size = 8 + _bodyFrame.writePayload(sender);
+
+        sender.send(ByteBuffer.wrap(FRAME_END_BYTE_ARRAY));
+        return size;
+    }
+
     public final int getChannel()
     {
         return _channel;

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
 Sun Feb  1 15:18:17 2015
@@ -24,12 +24,15 @@ package org.apache.qpid.framing;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
 
 public abstract class AMQMethodBodyImpl implements AMQMethodBody
 {
@@ -105,6 +108,16 @@ public abstract class AMQMethodBodyImpl
         writeMethodPayload(buffer);
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        final int size = getSize();
+        byte[] bytes = new byte[size];
+        BytesDataOutput buffer = new BytesDataOutput(bytes);
+        writePayload(buffer);
+        sender.send(ByteBuffer.wrap(bytes));
+        return size;
+    }
 
     protected int getSizeOf(AMQShortString string)
     {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
 Sun Feb  1 15:18:17 2015
@@ -23,10 +23,14 @@ package org.apache.qpid.framing;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
+
 public class BasicContentHeaderProperties
 {
     //persistent & non-persistent constants, values as per JMS DeliveryMode
@@ -314,6 +318,26 @@ public class BasicContentHeaderPropertie
         }
     }
 
+
+    public long writePropertyListPayload(final ByteBufferSender sender) throws 
IOException
+    {
+        if(useEncodedForm())
+        {
+            sender.send(ByteBuffer.wrap(_encodedForm));
+            return _encodedForm.length;
+        }
+        else
+        {
+            int propertyListSize = getPropertyListSize();
+            byte[] data = new byte[propertyListSize];
+            BytesDataOutput out = new BytesDataOutput(data);
+            writePropertyListPayload(out);
+            sender.send(ByteBuffer.wrap(data));
+            return propertyListSize;
+        }
+
+    }
+
     public void populatePropertiesFromBuffer(DataInput buffer, int 
propertyFlags, int size) throws AMQFrameDecodingException, IOException
     {
         _propertyFlags = propertyFlags;

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
 Sun Feb  1 15:18:17 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.framing;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.qpid.transport.ByteBufferSender;
+
 public class CompositeAMQDataBlock extends AMQDataBlock implements 
EncodableAMQDataBlock
 {
 
@@ -58,6 +60,17 @@ public class CompositeAMQDataBlock exten
         }
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        long size = 0l;
+        for (int i = 0; i < _blocks.length; i++)
+        {
+            size += _blocks[i].writePayload(sender);
+        }
+        return size;
+    }
+
     public String toString()
     {
         if (_blocks == null)

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
 Sun Feb  1 15:18:17 2015
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
 
 public class ContentBody implements AMQBody
 {
@@ -72,6 +73,20 @@ public class ContentBody implements AMQB
         session.contentBodyReceived(channelId, this);
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        if(_payload != null)
+        {
+            sender.send(ByteBuffer.wrap(_payload));
+            return _payload.length;
+        }
+        else
+        {
+            return 0l;
+        }
+    }
+
     public byte[] getPayload()
     {
         return _payload;
@@ -133,6 +148,23 @@ public class ContentBody implements AMQB
             }
         }
 
+        @Override
+        public long writePayload(final ByteBufferSender sender) throws 
IOException
+        {
+            if(_buf.hasArray())
+            {
+                sender.send(ByteBuffer.wrap(_buf.array(), _buf.arrayOffset() + 
_offset, _length));
+            }
+            else
+            {
+                ByteBuffer buf = _buf.duplicate();
+
+                buf.position(_offset);
+                buf.limit(_offset+_length);
+                sender.send(buf);
+            }
+            return _length;
+        }
 
         public void handle(int channelId, AMQVersionAwareProtocolSession 
amqProtocolSession) throws AMQException
         {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
 Sun Feb  1 15:18:17 2015
@@ -24,10 +24,13 @@ import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
 
 public class ContentHeaderBody implements AMQBody
 {
@@ -98,6 +101,19 @@ public class ContentHeaderBody implement
         _properties.writePropertyListPayload(buffer);
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        byte[] data = new byte[14];
+        BytesDataOutput buffer = new BytesDataOutput(data);
+        EncodingUtils.writeUnsignedShort(buffer, CLASS_ID);
+        EncodingUtils.writeUnsignedShort(buffer, 0);
+        buffer.writeLong(_bodySize);
+        EncodingUtils.writeUnsignedShort(buffer, 
_properties.getPropertyFlags());
+        sender.send(ByteBuffer.wrap(data));
+        return 14 + _properties.writePropertyListPayload(sender);
+    }
+
     public void handle(final int channelId, final 
AMQVersionAwareProtocolSession session)
             throws AMQException
     {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
 Sun Feb  1 15:18:17 2015
@@ -27,6 +27,7 @@ import java.io.IOException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
 
 public class HeartbeatBody implements AMQBody
 {
@@ -61,6 +62,12 @@ public class HeartbeatBody implements AM
     {
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        return 0l;
+    }
+
     public void handle(final int channelId, final 
AMQVersionAwareProtocolSession session)
             throws AMQException
     {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
 Sun Feb  1 15:18:17 2015
@@ -23,11 +23,14 @@ package org.apache.qpid.framing;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
 
 public class ProtocolInitiation extends AMQDataBlock implements 
EncodableAMQDataBlock
 {
@@ -88,6 +91,16 @@ public class ProtocolInitiation extends
         buffer.write(_protocolMinor);
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        byte[] data = new byte[8];
+        BytesDataOutput out = new BytesDataOutput(data);
+        writePayload(out);
+        sender.send(ByteBuffer.wrap(data));
+        return 8l;
+    }
+
     public boolean equals(Object o)
     {
         if (!(o instanceof ProtocolInitiation))

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?rev=1656312&r1=1656311&r2=1656312&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
 Sun Feb  1 15:18:17 2015
@@ -37,10 +37,9 @@ import org.apache.qpid.protocol.Protocol
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
 import org.apache.qpid.transport.network.TransportEncryption;
 
-public class NonBlockingNetworkTransport implements IncomingNetworkTransport
+public class NonBlockingNetworkTransport
 {
 
     private static final org.slf4j.Logger LOGGER = 
LoggerFactory.getLogger(AbstractNetworkTransport.class);
@@ -51,21 +50,6 @@ public class NonBlockingNetworkTransport
     private AcceptingThread _acceptor;
     private SelectorThread _selector;
 
-    protected NonBlockingConnection createNetworkConnection(final 
SocketChannel socketChannel,
-                                                            final 
ServerProtocolEngine engine,
-                                                            final Integer 
sendBufferSize,
-                                                            final Integer 
receiveBufferSize,
-                                                            final int timeout,
-                                                            final 
IdleTimeoutTicker ticker,
-                                                            final 
Set<TransportEncryption> encryptionSet,
-                                                            final SSLContext 
sslContext,
-                                                            final boolean 
wantClientAuth,
-                                                            final boolean 
needClientAuth,
-                                                            final Runnable 
onTransportEncryptionAction)
-    {
-        return new NonBlockingConnection(socketChannel, engine, 
sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, 
wantClientAuth, needClientAuth, onTransportEncryptionAction, _selector);
-    }
-
     public void close()
     {
         if(_acceptor != null)
@@ -173,59 +157,7 @@ public class NonBlockingNetworkTransport
                     {
                         socketChannel = _serverSocket.accept();
 
-                        final ServerProtocolEngine engine =
-                                (ServerProtocolEngine) 
_factory.newProtocolEngine(socketChannel.socket()
-                                                                               
           .getRemoteSocketAddress());
-
-                        if(engine != null)
-                        {
-                            
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, 
_config.getTcpNoDelay());
-                            socketChannel.socket().setSoTimeout(1000 * 
HANDSHAKE_TIMEOUT);
-
-                            final Integer sendBufferSize = 
_config.getSendBufferSize();
-                            final Integer receiveBufferSize = 
_config.getReceiveBufferSize();
-
-                            
socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
-                            
socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
-
-
-                            final IdleTimeoutTicker ticker = new 
IdleTimeoutTicker(engine, TIMEOUT);
-
-                            NonBlockingConnection connection =
-                                    createNetworkConnection(socketChannel,
-                                                            engine,
-                                                            sendBufferSize,
-                                                            receiveBufferSize,
-                                                            _timeout,
-                                                            ticker,
-                                                            _encryptionSet,
-                                                            _sslContext,
-                                                            
_config.wantClientAuth(),
-                                                            
_config.needClientAuth(),
-                                                            new Runnable()
-                                                            {
-
-                                                                @Override
-                                                                public void 
run()
-                                                                {
-                                                                    
engine.encryptedTransport();
-                                                                }
-                                                            });
-
-                            engine.setNetworkConnection(connection, 
connection.getSender());
-                            connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
-
-                            ticker.setConnection(connection);
-
-                            connection.start();
-
-                            _selector.addConnection(connection);
-
-                        }
-                        else
-                        {
-                            socketChannel.close();
-                        }
+                        acceptSocketChannel(socketChannel);
                     }
                     catch(RuntimeException e)
                     {
@@ -262,6 +194,64 @@ public class NonBlockingNetworkTransport
             }
         }
 
+        public void acceptSocketChannel(final SocketChannel socketChannel) 
throws IOException
+        {
+            final ServerProtocolEngine engine =
+                    (ServerProtocolEngine) 
_factory.newProtocolEngine(socketChannel.socket()
+                                                                              
.getRemoteSocketAddress());
+
+            if(engine != null)
+            {
+                socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, 
_config.getTcpNoDelay());
+                socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+
+                final Integer sendBufferSize = _config.getSendBufferSize();
+                final Integer receiveBufferSize = 
_config.getReceiveBufferSize();
+
+                socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 
sendBufferSize);
+                socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
receiveBufferSize);
+
+
+                final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, 
TIMEOUT);
+
+                NonBlockingConnection connection =
+                        new NonBlockingConnection(socketChannel,
+                                                  engine,
+                                                  sendBufferSize,
+                                                  receiveBufferSize,
+                                                  _timeout,
+                                                  ticker,
+                                                  _encryptionSet,
+                                                  _sslContext,
+                                                  _config.wantClientAuth(),
+                                                  _config.needClientAuth(),
+                                                  new Runnable()
+                                                  {
+
+                                                      @Override
+                                                      public void run()
+                                                      {
+                                                          
engine.encryptedTransport();
+                                                      }
+                                                  },
+                                                  _selector);
+
+                engine.setNetworkConnection(connection, 
connection.getSender());
+                connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+
+                ticker.setConnection(connection);
+
+                connection.start();
+
+                _selector.addConnection(connection);
+
+            }
+            else
+            {
+                socketChannel.close();
+            }
+        }
+
         private void closeSocketIfNecessary(final Socket socket)
         {
             if(socket != null)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to