Author: brandonwilliams
Date: Wed Aug  3 21:40:34 2011
New Revision: 1153678

URL: http://svn.apache.org/viewvc?rev=1153678&view=rev
Log:
Reduce copies on read/write paths.
Patch by jbellis reviewed by brandonwilliams for CASSANDRA-1788

Added:
    cassandra/trunk/test/unit/org/apache/cassandra/net/
    cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java   
(with props)
Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java
    
cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java
    
cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java?rev=1153678&r1=1153677&r2=1153678&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
 Wed Aug  3 21:40:34 2011
@@ -21,6 +21,8 @@ package org.apache.cassandra.net;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
 import java.net.InetAddress;
 
 public class CompactEndpointSerializationHelper
@@ -38,4 +40,12 @@ public class CompactEndpointSerializatio
         dis.readFully(bytes, 0, bytes.length);
         return InetAddress.getByAddress(bytes);
     }
+
+    public static int serializedSize(InetAddress from)
+    {
+        if (from instanceof Inet4Address)
+            return 1 + 4;
+        assert from instanceof Inet6Address;
+        return 1 + 16;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1153678&r1=1153677&r2=1153678&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Wed Aug  3 
21:40:34 2011
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class Header
 {
@@ -88,6 +89,21 @@ public class Header
     {
         details_.remove(key);
     }
+
+    public int serializedSize()
+    {
+        int size = 0;
+        size += CompactEndpointSerializationHelper.serializedSize(getFrom());
+        size += 4;
+        size += 4;
+        for (String key : details_.keySet())
+        {
+            size += 2 + FBUtilities.encodedUTF8Length(key);
+            byte[] value = details_.get(key);
+            size += 4 + value.length;
+        }
+        return size;
+    }
 }
 
 class HeaderSerializer implements ICompactSerializer<Header>
@@ -96,13 +112,8 @@ class HeaderSerializer implements ICompa
     {           
         CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
         dos.writeInt(t.getVerb().ordinal());
-        
-        /* Serialize the message header */
-        int size = t.details_.size();
-        dos.writeInt(size);
-        Set<String> keys = t.details_.keySet();
-        
-        for( String key : keys )
+        dos.writeInt(t.details_.size());
+        for (String key : t.details_.keySet())
         {
             dos.writeUTF(key);
             byte[] value = t.details_.get(key);
@@ -115,8 +126,6 @@ class HeaderSerializer implements ICompa
     {
         InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);
         int verbOrdinal = dis.readInt();
-        
-        /* Deserializing the message header */
         int size = dis.readInt();
         Map<String, byte[]> details = new Hashtable<String, byte[]>(size);
         for ( int i = 0; i < size; ++i )
@@ -127,7 +136,6 @@ class HeaderSerializer implements ICompa
             dis.readFully(bytes);
             details.put(key, bytes);
         }
-        
         return new Header(from, StorageService.VERBS[verbOrdinal], details);
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1153678&r1=1153677&r2=1153678&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
Wed Aug  3 21:40:34 2011
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.streaming.IncomingStreamReader;
 import org.apache.cassandra.streaming.StreamHeader;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class IncomingTcpConnection extends Thread
 {
@@ -125,22 +126,28 @@ public class IncomingTcpConnection exten
 
     private Message receiveMessage(DataInputStream input, int version) throws 
IOException
     {
-        int size = input.readInt();
-        byte[] contentBytes = new byte[size];
+        int totalSize = input.readInt();
+        String id = input.readUTF();
+        Header header = Header.serializer().deserialize(input, version);
+
+        int bodySize = input.readInt();
+        byte[] body = new byte[bodySize];
         // readFully allocates a direct buffer the size of the chunk it is 
asked to read,
-        // so we cap that at CHUNK_SIZE. See 
https://issues.apache.org/jira/browse/CASSANDRA-2654
-        int remainder = size % CHUNK_SIZE;
-        for (int offset = 0; offset < size - remainder; offset += CHUNK_SIZE)
-            input.readFully(contentBytes, offset, CHUNK_SIZE);
-        input.readFully(contentBytes, size - remainder, remainder);
+        // so we cap that at CHUNK_SIZE.  See 
https://issues.apache.org/jira/browse/CASSANDRA-2654
+        int remainder = bodySize % CHUNK_SIZE;
+        for (int offset = 0; offset < bodySize - remainder; offset += 
CHUNK_SIZE)
+            input.readFully(body, offset, CHUNK_SIZE);
+        input.readFully(body, bodySize - remainder, remainder);
+        // earlier versions would send unnecessary bytes left over at the end 
of a buffer, too
+        int remaining = totalSize - 
OutboundTcpConnection.messageLength(header, id, body);
+        if (remaining > 0)
+            input.skip(remaining);
 
         // for non-streaming connections, continue to read the messages (and 
ignore them) until sender
         // starts sending correct-version messages (which it can do without 
reconnecting -- version is per-Message)
         if (version <= MessagingService.version_)
         {
-            DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(contentBytes));
-            String id = dis.readUTF();
-            Message message = Message.serializer().deserialize(dis, version);
+            Message message = new Message(header, body, version);
             MessagingService.instance().receive(message, id);
             return message;
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1153678&r1=1153677&r2=1153678&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Wed Aug  3 
21:40:34 2011
@@ -30,23 +30,11 @@ import org.apache.cassandra.utils.FBUtil
 
 public class Message
 {
-    private static ICompactSerializer<Message> serializer_;
-
-    static
-    {
-        serializer_ = new MessageSerializer();        
-    }
-    
-    public static ICompactSerializer<Message> serializer()
-    {
-        return serializer_;
-    }
-    
     final Header header_;
     private final byte[] body_;
     private final transient int version;
 
-    private Message(Header header, byte[] body, int version)
+    Message(Header header, byte[] body, int version)
     {
         assert header != null;
         assert body != null;
@@ -127,25 +115,4 @@ public class Message
                .append(separator);
         return sbuf.toString();
     }
-    
-    private static class MessageSerializer implements 
ICompactSerializer<Message>
-    {
-        public void serialize(Message t, DataOutputStream dos, int version) 
throws IOException
-        {
-            assert t.getVersion() == version : "internode protocol version 
mismatch"; // indicates programmer error.
-            Header.serializer().serialize( t.header_, dos, version);
-            byte[] bytes = t.getMessageBody();
-            dos.writeInt(bytes.length);
-            dos.write(bytes);
-        }
-    
-        public Message deserialize(DataInputStream dis, int version) throws 
IOException
-        {
-            Header header = Header.serializer().deserialize(dis, version);
-            int size = dis.readInt();
-            byte[] bytes = new byte[size];
-            dis.readFully(bytes);
-            return new Message(header, bytes, version);
-        }
-    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1153678&r1=1153677&r2=1153678&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed 
Aug  3 21:40:34 2011
@@ -63,11 +63,10 @@ public final class MessagingService impl
     public static final int VERSION_080 = 2;
     public static final int version_ = 3; // 8 bits, so don't waste versions
 
-    //TODO: make this parameter dynamic somehow.  Not sure if config is 
appropriate.
-    private SerializerType serializerType_ = SerializerType.BINARY;
+    static SerializerType serializerType_ = SerializerType.BINARY;
 
     /** we preface every message with this number so the recipient can 
validate the sender is sane */
-    private static final int PROTOCOL_MAGIC = 0xCA552DFA;
+    static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
     /* This records all the results mapped by message Id */
     private final ExpiringMap<String, Pair<InetAddress, IMessageCallback>> 
callbacks;
@@ -380,26 +379,10 @@ public final class MessagingService impl
         }
 
         // get pooled connection (really, connection queue)
-        OutboundTcpConnection connection = getConnection(to, message);
-
-        // pack message with header in a bytebuffer
-        byte[] data;
-        try
-        {
-            DataOutputBuffer buffer = new DataOutputBuffer();
-            buffer.writeUTF(id);
-            Message.serializer().serialize(message, buffer, 
message.getVersion());
-            data = buffer.getData();
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        assert data.length > 0;
-        ByteBuffer buffer = packIt(data , false, message.getVersion());
+        OutboundTcpConnection connection = getConnection(to, processedMessage);
 
         // write it
-        connection.write(buffer);
+        connection.enqueue(processedMessage, id);
     }
 
     public IAsyncResult sendRR(Message message, InetAddress to)
@@ -492,36 +475,6 @@ public final class MessagingService impl
     {
         return x >>> (p + 1) - n & ~(-1 << n);
     }
-        
-    public ByteBuffer packIt(byte[] bytes, boolean compress, int version)
-    {
-        /*
-             Setting up the protocol header. This is 4 bytes long
-             represented as an integer. The first 2 bits indicate
-             the serializer type. The 3rd bit indicates if compression
-             is turned on or off. It is turned off by default. The 4th
-             bit indicates if we are in streaming mode. It is turned off
-             by default. The 5th-8th bits are reserved for future use.
-             The next 8 bits indicate a version number. Remaining 15 bits
-             are not used currently.
-        */
-        int header = 0;
-        // Setting up the serializer bit
-        header |= serializerType_.ordinal();
-        // set compression bit.
-        if (compress)
-            header |= 4;
-        // Setting up the version bit
-        header |= (version << 8);
-
-        ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + 4 + bytes.length);
-        buffer.putInt(PROTOCOL_MAGIC);
-        buffer.putInt(header);
-        buffer.putInt(bytes.length);
-        buffer.put(bytes);
-        buffer.flip();
-        return buffer;
-    }
 
     public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean 
compress, int version)
     {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1153678&r1=1153677&r2=1153678&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
Wed Aug  3 21:40:34 2011
@@ -26,30 +26,36 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
-import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 public class OutboundTcpConnection extends Thread
 {
     private static final Logger logger = 
LoggerFactory.getLogger(OutboundTcpConnection.class);
 
-    public static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
+    private static final Message CLOSE_SENTINEL = new 
Message(FBUtilities.getLocalAddress(),
+                                                              
StorageService.Verb.INTERNAL_RESPONSE,
+                                                              
ArrayUtils.EMPTY_BYTE_ARRAY,
+                                                              
MessagingService.version_);
+
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
 
     private InetAddress endpoint;
-    private final BlockingQueue<ByteBuffer> queue = new 
LinkedBlockingQueue<ByteBuffer>();
-    private DataOutputStream output;
+    private final BlockingQueue<Pair<Message, String>> queue = new 
LinkedBlockingQueue<Pair<Message, String>>();
+    private DataOutputStream out;
+
     private Socket socket;
     private long completedCount;
 
@@ -64,11 +70,11 @@ public class OutboundTcpConnection exten
         this.endpoint = remoteEndPoint;
     }
 
-    public void write(ByteBuffer buffer)
+    public void enqueue(Message message, String id)
     {
         try
         {
-            queue.put(buffer);
+            queue.put(Pair.create(message, id));
         }
         catch (InterruptedException e)
         {
@@ -79,21 +85,23 @@ public class OutboundTcpConnection exten
     void closeSocket()
     {
         queue.clear();
-        write(CLOSE_SENTINEL);
+        enqueue(CLOSE_SENTINEL, null);
     }
 
     public void run()
     {
         while (true)
         {
-            ByteBuffer bb = take();
-            if (bb == CLOSE_SENTINEL)
+            Pair<Message, String> pair = take();
+            Message m = pair.left;
+            String id = pair.right;
+            if (m == CLOSE_SENTINEL)
             {
                 disconnect();
                 continue;
             }
             if (socket != null || connect())
-                writeConnected(bb);
+                writeConnected(m, id);
             else
                 // clear out the queue, else gossip messages back up.
                 queue.clear();            
@@ -110,14 +118,14 @@ public class OutboundTcpConnection exten
         return completedCount;
     }
 
-    private void writeConnected(ByteBuffer bb)
+    private void writeConnected(Message message, String id)
     {
         try
         {
-            ByteBufferUtil.write(bb, output);
+            write(message, id, out);
             if (queue.peek() == null)
             {
-                output.flush();
+                out.flush();
             }
         }
         catch (IOException e)
@@ -128,6 +136,51 @@ public class OutboundTcpConnection exten
         }
     }
 
+    static void write(Message message, String id, DataOutputStream out)
+    {
+        /*
+         Setting up the protocol header. This is 4 bytes long
+         represented as an integer. The first 2 bits indicate
+         the serializer type. The 3rd bit indicates if compression
+         is turned on or off. It is turned off by default. The 4th
+         bit indicates if we are in streaming mode. It is turned off
+         by default. The 5th-8th bits are reserved for future use.
+         The next 8 bits indicate a version number. Remaining 15 bits
+         are not used currently.
+        */
+        int header = 0;
+        // Setting up the serializer bit
+        header |= MessagingService.serializerType_.ordinal();
+        // set compression bit.
+        if (false)
+            header |= 4;
+        // Setting up the version bit
+        header |= (MessagingService.version_ << 8);
+
+        try
+        {
+            out.writeInt(MessagingService.PROTOCOL_MAGIC);
+            out.writeInt(header);
+            // compute total Message length for compatibility w/ 0.8 and 
earlier
+            byte[] bytes = message.getMessageBody();
+            int total = messageLength(message.header_, id, bytes);
+            out.writeInt(total);
+            out.writeUTF(id);
+            Header.serializer().serialize(message.header_, out, 
message.getVersion());
+            out.writeInt(bytes.length);
+            out.write(bytes);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static int messageLength(Header header, String id, byte[] bytes)
+    {
+        return 2 + FBUtilities.encodedUTF8Length(id) + header.serializedSize() 
+ 4 + bytes.length;
+    }
+
     private void disconnect()
     {
         if (socket != null)
@@ -141,7 +194,7 @@ public class OutboundTcpConnection exten
                 if (logger.isDebugEnabled())
                     logger.debug("exception closing connection to " + 
endpoint, e);
             }
-            output = null;
+            out = null;
             socket = null;
         }
 
@@ -149,19 +202,19 @@ public class OutboundTcpConnection exten
         Gossiper.instance.resetVersion(endpoint);
     }
 
-    private ByteBuffer take()
+    private Pair<Message, String> take()
     {
-        ByteBuffer bb;
+        Pair<Message, String> pair;
         try
         {
-            bb = queue.take();
+            pair = queue.take();
             completedCount++;
         }
         catch (InterruptedException e)
         {
             throw new AssertionError(e);
         }
-        return bb;
+        return pair;
     }
 
     private boolean connect()
@@ -185,7 +238,7 @@ public class OutboundTcpConnection exten
 
                 socket.setKeepAlive(true);
                 socket.setTcpNoDelay(true);
-                output = new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream(), 4096));
+                out = new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream(), 4096));
                 return true;
             }
             catch (IOException e)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1153678&r1=1153677&r2=1153678&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
 Wed Aug  3 21:40:34 2011
@@ -56,8 +56,8 @@ public class OutboundTcpConnectionPool
     public void reset(InetAddress remoteEP)
     {
         ackCon.setEndPoint(remoteEP);
-        ackCon.write(OutboundTcpConnection.CLOSE_SENTINEL);
+        ackCon.closeSocket();
         cmdCon.setEndPoint(remoteEP);
-        cmdCon.write(OutboundTcpConnection.CLOSE_SENTINEL);
+        cmdCon.closeSocket();
     }
 }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java?rev=1153678&r1=1153677&r2=1153678&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java 
Wed Aug  3 21:40:34 2011
@@ -29,6 +29,7 @@ import org.apache.cassandra.dht.Abstract
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageSerializer;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.SlicePredicate;
@@ -48,6 +49,8 @@ import java.util.HashMap;
 
 public class SerializationsTest extends AbstractSerializationsTester
 {
+    private static MessageSerializer messageSerializer = new 
MessageSerializer();
+
     private void testRangeSliceCommandWrite() throws IOException
     {
         ByteBuffer startCol = ByteBufferUtil.bytes("Start");
@@ -73,12 +76,12 @@ public class SerializationsTest extends 
         
         DataOutputStream dout = getOutput("db.RangeSliceCommand.bin");
         
-        Message.serializer().serialize(namesCmd, dout, getVersion());
-        Message.serializer().serialize(emptyRangeCmd, dout, getVersion());
-        Message.serializer().serialize(regRangeCmd, dout, getVersion());
-        Message.serializer().serialize(namesCmdSup, dout, getVersion());
-        Message.serializer().serialize(emptyRangeCmdSup, dout, getVersion());
-        Message.serializer().serialize(regRangeCmdSup, dout, getVersion());
+        messageSerializer.serialize(namesCmd, dout, getVersion());
+        messageSerializer.serialize(emptyRangeCmd, dout, getVersion());
+        messageSerializer.serialize(regRangeCmd, dout, getVersion());
+        messageSerializer.serialize(namesCmdSup, dout, getVersion());
+        messageSerializer.serialize(emptyRangeCmdSup, dout, getVersion());
+        messageSerializer.serialize(regRangeCmdSup, dout, getVersion());
         dout.close();
     }
     
@@ -91,7 +94,7 @@ public class SerializationsTest extends 
         DataInputStream in = getInput("db.RangeSliceCommand.bin");
         for (int i = 0; i < 6; i++)
         {
-            Message msg = Message.serializer().deserialize(in, getVersion());
+            Message msg = messageSerializer.deserialize(in, getVersion());
             RangeSliceCommand cmd = RangeSliceCommand.read(msg);
         }
         in.close();
@@ -107,8 +110,8 @@ public class SerializationsTest extends 
         SliceByNamesReadCommand.serializer().serialize(superCmd, out, 
getVersion());
         ReadCommand.serializer().serialize(standardCmd, out, getVersion());
         ReadCommand.serializer().serialize(superCmd, out, getVersion());
-        Message.serializer().serialize(standardCmd.getMessage(getVersion()), 
out, getVersion());
-        Message.serializer().serialize(superCmd.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(standardCmd.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(superCmd.getMessage(getVersion()), out, 
getVersion());
         out.close();
     }
     
@@ -123,8 +126,8 @@ public class SerializationsTest extends 
         assert SliceByNamesReadCommand.serializer().deserialize(in, 
getVersion()) != null;
         assert ReadCommand.serializer().deserialize(in, getVersion()) != null;
         assert ReadCommand.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
         in.close();
     }
     
@@ -137,8 +140,8 @@ public class SerializationsTest extends 
         SliceFromReadCommand.serializer().serialize(superCmd, out, 
getVersion());
         ReadCommand.serializer().serialize(standardCmd, out, getVersion());
         ReadCommand.serializer().serialize(superCmd, out, getVersion());
-        Message.serializer().serialize(standardCmd.getMessage(getVersion()), 
out, getVersion());
-        Message.serializer().serialize(superCmd.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(standardCmd.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(superCmd.getMessage(getVersion()), out, 
getVersion());
         out.close();
     }
     
@@ -153,8 +156,8 @@ public class SerializationsTest extends 
         assert SliceFromReadCommand.serializer().deserialize(in, getVersion()) 
!= null;
         assert ReadCommand.serializer().deserialize(in, getVersion()) != null;
         assert ReadCommand.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
         in.close();
     }
     
@@ -201,12 +204,12 @@ public class SerializationsTest extends 
         RowMutation.serializer().serialize(standardRm, out, getVersion());
         RowMutation.serializer().serialize(superRm, out, getVersion());
         RowMutation.serializer().serialize(mixedRm, out, getVersion());
-        Message.serializer().serialize(emptyRm.getMessage(getVersion()), out, 
getVersion());
-        Message.serializer().serialize(standardRowRm.getMessage(getVersion()), 
out, getVersion());
-        Message.serializer().serialize(superRowRm.getMessage(getVersion()), 
out, getVersion());
-        Message.serializer().serialize(standardRm.getMessage(getVersion()), 
out, getVersion());
-        Message.serializer().serialize(superRm.getMessage(getVersion()), out, 
getVersion());
-        Message.serializer().serialize(mixedRm.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(emptyRm.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(standardRowRm.getMessage(getVersion()), 
out, getVersion());
+        messageSerializer.serialize(superRowRm.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(standardRm.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(superRm.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(mixedRm.getMessage(getVersion()), out, 
getVersion());
         out.close(); 
     }
     
@@ -223,12 +226,12 @@ public class SerializationsTest extends 
         assert RowMutation.serializer().deserialize(in, getVersion()) != null;
         assert RowMutation.serializer().deserialize(in, getVersion()) != null;
         assert RowMutation.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
         in.close();
     }
     
@@ -241,9 +244,9 @@ public class SerializationsTest extends 
         Truncation.serializer().serialize(tr, out, getVersion());
         TruncateResponse.serializer().serialize(aff, out, getVersion());
         TruncateResponse.serializer().serialize(neg, out, getVersion());
-        Message.serializer().serialize(tr.getMessage(getVersion()), out, 
getVersion());
-        
Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()),
 aff), out, getVersion());
-        
Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()),
 neg), out, getVersion());
+        messageSerializer.serialize(tr.getMessage(getVersion()), out, 
getVersion());
+        
messageSerializer.serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()),
 aff), out, getVersion());
+        
messageSerializer.serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()),
 neg), out, getVersion());
         // todo: notice how CF names weren't validated.
         out.close();
     }
@@ -258,9 +261,9 @@ public class SerializationsTest extends 
         assert Truncation.serializer().deserialize(in, getVersion()) != null;
         assert TruncateResponse.serializer().deserialize(in, getVersion()) != 
null;
         assert TruncateResponse.serializer().deserialize(in, getVersion()) != 
null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
         in.close();
     }
     

Added: cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java?rev=1153678&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java 
(added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java 
Wed Aug  3 21:40:34 2011
@@ -0,0 +1,28 @@
+package org.apache.cassandra.net;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+public class MessageSerializer implements ICompactSerializer<Message>
+{
+    public void serialize(Message t, DataOutputStream dos, int version) throws 
IOException
+    {
+        assert t.getVersion() == version : "internode protocol version 
mismatch"; // indicates programmer error.
+        Header.serializer().serialize( t.header_, dos, version);
+        byte[] bytes = t.getMessageBody();
+        dos.writeInt(bytes.length);
+        dos.write(bytes);
+    }
+
+    public Message deserialize(DataInputStream dis, int version) throws 
IOException
+    {
+        Header header = Header.serializer().deserialize(dis, version);
+        int size = dis.readInt();
+        byte[] bytes = new byte[size];
+        dis.readFully(bytes);
+        return new Message(header, bytes, version);
+    }
+}

Propchange: 
cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java?rev=1153678&r1=1153677&r2=1153678&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java 
(original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java 
Wed Aug  3 21:40:34 2011
@@ -21,32 +21,34 @@ package org.apache.cassandra.service;
  */
 
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
 import org.apache.cassandra.AbstractSerializationsTester;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.MessageSerializer;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
-import org.junit.Test;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 public class SerializationsTest extends AbstractSerializationsTester
 {
+    private static MessageSerializer messageSerializer = new 
MessageSerializer();
+
     public static Range FULL_RANGE = new 
Range(StorageService.getPartitioner().getMinimumToken(), 
StorageService.getPartitioner().getMinimumToken());
 
     private void testTreeRequestWrite() throws IOException
     {
         DataOutputStream out = getOutput("service.TreeRequest.bin");
         
AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.serialize(Statics.req, 
out, getVersion());
-        
Message.serializer().serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req,
 getVersion()), out, getVersion());
+        
messageSerializer.serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req,
 getVersion()), out, getVersion());
         out.close();
     }
     
@@ -58,7 +60,7 @@ public class SerializationsTest extends 
         
         DataInputStream in = getInput("service.TreeRequest.bin");
         assert 
AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.deserialize(in, 
getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
         in.close();
     }
     
@@ -78,8 +80,8 @@ public class SerializationsTest extends 
         DataOutputStream out = getOutput("service.TreeResponse.bin");
         AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v0, 
out, getVersion());
         AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v1, 
out, getVersion());
-        
Message.serializer().serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(),
 v0), out, getVersion());
-        
Message.serializer().serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(),
 v1), out, getVersion());
+        
messageSerializer.serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(),
 v0), out, getVersion());
+        
messageSerializer.serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(),
 v1), out, getVersion());
         out.close();
     }
     
@@ -92,8 +94,8 @@ public class SerializationsTest extends 
         DataInputStream in = getInput("service.TreeResponse.bin");
         assert 
AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in, 
getVersion()) != null;
         assert 
AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in, 
getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
         in.close();
     }
     

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1153678&r1=1153677&r2=1153678&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
 Wed Aug  3 21:40:34 2011
@@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.D
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -47,6 +48,8 @@ import java.util.*;
 
 public class SerializationsTest extends AbstractSerializationsTester
 {
+    private static MessageSerializer messageSerializer = new 
MessageSerializer();
+
     private void testPendingFileWrite() throws IOException
     {
         // make sure to test serializing null and a pf with no sstable.
@@ -116,7 +119,7 @@ public class SerializationsTest extends 
         StreamReply rep = new StreamReply("this is a file", 123L, 
StreamReply.Status.FILE_FINISHED);
         DataOutputStream out = getOutput("streaming.StreamReply.bin");
         StreamReply.serializer.serialize(rep, out, getVersion());
-        Message.serializer().serialize(rep.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(rep.getMessage(getVersion()), out, 
getVersion());
         out.close();
     }
     
@@ -128,7 +131,7 @@ public class SerializationsTest extends 
         
         DataInputStream in = getInput("streaming.StreamReply.bin");
         assert StreamReply.serializer.deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
         in.close();
     }
     
@@ -155,9 +158,9 @@ public class SerializationsTest extends 
         StreamRequestMessage.serializer().serialize(msg0, out, getVersion());
         StreamRequestMessage.serializer().serialize(msg1, out, getVersion());
         StreamRequestMessage.serializer().serialize(msg2, out, getVersion());
-        Message.serializer().serialize(msg0.getMessage(getVersion()), out, 
getVersion());
-        Message.serializer().serialize(msg1.getMessage(getVersion()), out, 
getVersion());
-        Message.serializer().serialize(msg2.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(msg0.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(msg1.getMessage(getVersion()), out, 
getVersion());
+        messageSerializer.serialize(msg2.getMessage(getVersion()), out, 
getVersion());
         out.close();
     }
     
@@ -171,9 +174,9 @@ public class SerializationsTest extends 
         assert StreamRequestMessage.serializer().deserialize(in, getVersion()) 
!= null;
         assert StreamRequestMessage.serializer().deserialize(in, getVersion()) 
!= null;
         assert StreamRequestMessage.serializer().deserialize(in, getVersion()) 
!= null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
-        assert Message.serializer().deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert messageSerializer.deserialize(in, getVersion()) != null;
         in.close();
     }
     


Reply via email to