Author: brandonwilliams Date: Wed Aug 3 21:40:34 2011 New Revision: 1153678
URL: http://svn.apache.org/viewvc?rev=1153678&view=rev Log: Reduce copies on read/write paths. Patch by jbellis reviewed by brandonwilliams for CASSANDRA-1788 Added: cassandra/trunk/test/unit/org/apache/cassandra/net/ cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java (with props) Modified: cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java cassandra/trunk/src/java/org/apache/cassandra/net/Header.java cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/net/Message.java cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Modified: cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java?rev=1153678&r1=1153677&r2=1153678&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java Wed Aug 3 21:40:34 2011 @@ -21,6 +21,8 @@ package org.apache.cassandra.net; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.Inet4Address; +import java.net.Inet6Address; import java.net.InetAddress; public class CompactEndpointSerializationHelper @@ -38,4 +40,12 @@ public class CompactEndpointSerializatio dis.readFully(bytes, 0, bytes.length); return InetAddress.getByAddress(bytes); } + + public static int serializedSize(InetAddress from) + { + if (from instanceof Inet4Address) + return 1 + 4; + assert from instanceof Inet6Address; + return 1 + 16; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1153678&r1=1153677&r2=1153678&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Wed Aug 3 21:40:34 2011 @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.cassandra.io.ICompactSerializer; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; public class Header { @@ -88,6 +89,21 @@ public class Header { details_.remove(key); } + + public int serializedSize() + { + int size = 0; + size += CompactEndpointSerializationHelper.serializedSize(getFrom()); + size += 4; + size += 4; + for (String key : details_.keySet()) + { + size += 2 + FBUtilities.encodedUTF8Length(key); + byte[] value = details_.get(key); + size += 4 + value.length; + } + return size; + } } class HeaderSerializer implements ICompactSerializer<Header> @@ -96,13 +112,8 @@ class HeaderSerializer implements ICompa { CompactEndpointSerializationHelper.serialize(t.getFrom(), dos); dos.writeInt(t.getVerb().ordinal()); - - /* Serialize the message header */ - int size = t.details_.size(); - dos.writeInt(size); - Set<String> keys = t.details_.keySet(); - - for( String key : keys ) + dos.writeInt(t.details_.size()); + for (String key : t.details_.keySet()) { dos.writeUTF(key); byte[] value = t.details_.get(key); @@ -115,8 +126,6 @@ class HeaderSerializer implements ICompa { InetAddress from = CompactEndpointSerializationHelper.deserialize(dis); int verbOrdinal = dis.readInt(); - - /* Deserializing the message header */ int size = dis.readInt(); Map<String, byte[]> details = new Hashtable<String, byte[]>(size); for ( int i = 0; i < size; ++i ) @@ -127,7 +136,6 @@ class HeaderSerializer implements ICompa dis.readFully(bytes); details.put(key, bytes); } - return new Header(from, StorageService.VERBS[verbOrdinal], details); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1153678&r1=1153677&r2=1153678&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Aug 3 21:40:34 2011 @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.streaming.IncomingStreamReader; import org.apache.cassandra.streaming.StreamHeader; +import org.apache.cassandra.utils.FBUtilities; public class IncomingTcpConnection extends Thread { @@ -125,22 +126,28 @@ public class IncomingTcpConnection exten private Message receiveMessage(DataInputStream input, int version) throws IOException { - int size = input.readInt(); - byte[] contentBytes = new byte[size]; + int totalSize = input.readInt(); + String id = input.readUTF(); + Header header = Header.serializer().deserialize(input, version); + + int bodySize = input.readInt(); + byte[] body = new byte[bodySize]; // readFully allocates a direct buffer the size of the chunk it is asked to read, - // so we cap that at CHUNK_SIZE. See https://issues.apache.org/jira/browse/CASSANDRA-2654 - int remainder = size % CHUNK_SIZE; - for (int offset = 0; offset < size - remainder; offset += CHUNK_SIZE) - input.readFully(contentBytes, offset, CHUNK_SIZE); - input.readFully(contentBytes, size - remainder, remainder); + // so we cap that at CHUNK_SIZE. See https://issues.apache.org/jira/browse/CASSANDRA-2654 + int remainder = bodySize % CHUNK_SIZE; + for (int offset = 0; offset < bodySize - remainder; offset += CHUNK_SIZE) + input.readFully(body, offset, CHUNK_SIZE); + input.readFully(body, bodySize - remainder, remainder); + // earlier versions would send unnecessary bytes left over at the end of a buffer, too + int remaining = totalSize - OutboundTcpConnection.messageLength(header, id, body); + if (remaining > 0) + input.skip(remaining); // for non-streaming connections, continue to read the messages (and ignore them) until sender // starts sending correct-version messages (which it can do without reconnecting -- version is per-Message) if (version <= MessagingService.version_) { - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes)); - String id = dis.readUTF(); - Message message = Message.serializer().deserialize(dis, version); + Message message = new Message(header, body, version); MessagingService.instance().receive(message, id); return message; } Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1153678&r1=1153677&r2=1153678&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Wed Aug 3 21:40:34 2011 @@ -30,23 +30,11 @@ import org.apache.cassandra.utils.FBUtil public class Message { - private static ICompactSerializer<Message> serializer_; - - static - { - serializer_ = new MessageSerializer(); - } - - public static ICompactSerializer<Message> serializer() - { - return serializer_; - } - final Header header_; private final byte[] body_; private final transient int version; - private Message(Header header, byte[] body, int version) + Message(Header header, byte[] body, int version) { assert header != null; assert body != null; @@ -127,25 +115,4 @@ public class Message .append(separator); return sbuf.toString(); } - - private static class MessageSerializer implements ICompactSerializer<Message> - { - public void serialize(Message t, DataOutputStream dos, int version) throws IOException - { - assert t.getVersion() == version : "internode protocol version mismatch"; // indicates programmer error. - Header.serializer().serialize( t.header_, dos, version); - byte[] bytes = t.getMessageBody(); - dos.writeInt(bytes.length); - dos.write(bytes); - } - - public Message deserialize(DataInputStream dis, int version) throws IOException - { - Header header = Header.serializer().deserialize(dis, version); - int size = dis.readInt(); - byte[] bytes = new byte[size]; - dis.readFully(bytes); - return new Message(header, bytes, version); - } - } } Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1153678&r1=1153677&r2=1153678&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Aug 3 21:40:34 2011 @@ -63,11 +63,10 @@ public final class MessagingService impl public static final int VERSION_080 = 2; public static final int version_ = 3; // 8 bits, so don't waste versions - //TODO: make this parameter dynamic somehow. Not sure if config is appropriate. - private SerializerType serializerType_ = SerializerType.BINARY; + static SerializerType serializerType_ = SerializerType.BINARY; /** we preface every message with this number so the recipient can validate the sender is sane */ - private static final int PROTOCOL_MAGIC = 0xCA552DFA; + static final int PROTOCOL_MAGIC = 0xCA552DFA; /* This records all the results mapped by message Id */ private final ExpiringMap<String, Pair<InetAddress, IMessageCallback>> callbacks; @@ -380,26 +379,10 @@ public final class MessagingService impl } // get pooled connection (really, connection queue) - OutboundTcpConnection connection = getConnection(to, message); - - // pack message with header in a bytebuffer - byte[] data; - try - { - DataOutputBuffer buffer = new DataOutputBuffer(); - buffer.writeUTF(id); - Message.serializer().serialize(message, buffer, message.getVersion()); - data = buffer.getData(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - assert data.length > 0; - ByteBuffer buffer = packIt(data , false, message.getVersion()); + OutboundTcpConnection connection = getConnection(to, processedMessage); // write it - connection.write(buffer); + connection.enqueue(processedMessage, id); } public IAsyncResult sendRR(Message message, InetAddress to) @@ -492,36 +475,6 @@ public final class MessagingService impl { return x >>> (p + 1) - n & ~(-1 << n); } - - public ByteBuffer packIt(byte[] bytes, boolean compress, int version) - { - /* - Setting up the protocol header. This is 4 bytes long - represented as an integer. The first 2 bits indicate - the serializer type. The 3rd bit indicates if compression - is turned on or off. It is turned off by default. The 4th - bit indicates if we are in streaming mode. It is turned off - by default. The 5th-8th bits are reserved for future use. - The next 8 bits indicate a version number. Remaining 15 bits - are not used currently. - */ - int header = 0; - // Setting up the serializer bit - header |= serializerType_.ordinal(); - // set compression bit. - if (compress) - header |= 4; - // Setting up the version bit - header |= (version << 8); - - ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + 4 + bytes.length); - buffer.putInt(PROTOCOL_MAGIC); - buffer.putInt(header); - buffer.putInt(bytes.length); - buffer.put(bytes); - buffer.flip(); - return buffer; - } public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress, int version) { Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1153678&r1=1153677&r2=1153678&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Aug 3 21:40:34 2011 @@ -26,30 +26,36 @@ import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.Socket; -import java.nio.ByteBuffer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.security.SSLFactory; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; public class OutboundTcpConnection extends Thread { private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class); - public static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0); + private static final Message CLOSE_SENTINEL = new Message(FBUtilities.getLocalAddress(), + StorageService.Verb.INTERNAL_RESPONSE, + ArrayUtils.EMPTY_BYTE_ARRAY, + MessagingService.version_); + private static final int OPEN_RETRY_DELAY = 100; // ms between retries private InetAddress endpoint; - private final BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>(); - private DataOutputStream output; + private final BlockingQueue<Pair<Message, String>> queue = new LinkedBlockingQueue<Pair<Message, String>>(); + private DataOutputStream out; + private Socket socket; private long completedCount; @@ -64,11 +70,11 @@ public class OutboundTcpConnection exten this.endpoint = remoteEndPoint; } - public void write(ByteBuffer buffer) + public void enqueue(Message message, String id) { try { - queue.put(buffer); + queue.put(Pair.create(message, id)); } catch (InterruptedException e) { @@ -79,21 +85,23 @@ public class OutboundTcpConnection exten void closeSocket() { queue.clear(); - write(CLOSE_SENTINEL); + enqueue(CLOSE_SENTINEL, null); } public void run() { while (true) { - ByteBuffer bb = take(); - if (bb == CLOSE_SENTINEL) + Pair<Message, String> pair = take(); + Message m = pair.left; + String id = pair.right; + if (m == CLOSE_SENTINEL) { disconnect(); continue; } if (socket != null || connect()) - writeConnected(bb); + writeConnected(m, id); else // clear out the queue, else gossip messages back up. queue.clear(); @@ -110,14 +118,14 @@ public class OutboundTcpConnection exten return completedCount; } - private void writeConnected(ByteBuffer bb) + private void writeConnected(Message message, String id) { try { - ByteBufferUtil.write(bb, output); + write(message, id, out); if (queue.peek() == null) { - output.flush(); + out.flush(); } } catch (IOException e) @@ -128,6 +136,51 @@ public class OutboundTcpConnection exten } } + static void write(Message message, String id, DataOutputStream out) + { + /* + Setting up the protocol header. This is 4 bytes long + represented as an integer. The first 2 bits indicate + the serializer type. The 3rd bit indicates if compression + is turned on or off. It is turned off by default. The 4th + bit indicates if we are in streaming mode. It is turned off + by default. The 5th-8th bits are reserved for future use. + The next 8 bits indicate a version number. Remaining 15 bits + are not used currently. + */ + int header = 0; + // Setting up the serializer bit + header |= MessagingService.serializerType_.ordinal(); + // set compression bit. + if (false) + header |= 4; + // Setting up the version bit + header |= (MessagingService.version_ << 8); + + try + { + out.writeInt(MessagingService.PROTOCOL_MAGIC); + out.writeInt(header); + // compute total Message length for compatibility w/ 0.8 and earlier + byte[] bytes = message.getMessageBody(); + int total = messageLength(message.header_, id, bytes); + out.writeInt(total); + out.writeUTF(id); + Header.serializer().serialize(message.header_, out, message.getVersion()); + out.writeInt(bytes.length); + out.write(bytes); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public static int messageLength(Header header, String id, byte[] bytes) + { + return 2 + FBUtilities.encodedUTF8Length(id) + header.serializedSize() + 4 + bytes.length; + } + private void disconnect() { if (socket != null) @@ -141,7 +194,7 @@ public class OutboundTcpConnection exten if (logger.isDebugEnabled()) logger.debug("exception closing connection to " + endpoint, e); } - output = null; + out = null; socket = null; } @@ -149,19 +202,19 @@ public class OutboundTcpConnection exten Gossiper.instance.resetVersion(endpoint); } - private ByteBuffer take() + private Pair<Message, String> take() { - ByteBuffer bb; + Pair<Message, String> pair; try { - bb = queue.take(); + pair = queue.take(); completedCount++; } catch (InterruptedException e) { throw new AssertionError(e); } - return bb; + return pair; } private boolean connect() @@ -185,7 +238,7 @@ public class OutboundTcpConnection exten socket.setKeepAlive(true); socket.setTcpNoDelay(true); - output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096)); + out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096)); return true; } catch (IOException e) Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1153678&r1=1153677&r2=1153678&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Wed Aug 3 21:40:34 2011 @@ -56,8 +56,8 @@ public class OutboundTcpConnectionPool public void reset(InetAddress remoteEP) { ackCon.setEndPoint(remoteEP); - ackCon.write(OutboundTcpConnection.CLOSE_SENTINEL); + ackCon.closeSocket(); cmdCon.setEndPoint(remoteEP); - cmdCon.write(OutboundTcpConnection.CLOSE_SENTINEL); + cmdCon.closeSocket(); } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java?rev=1153678&r1=1153677&r2=1153678&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java Wed Aug 3 21:40:34 2011 @@ -29,6 +29,7 @@ import org.apache.cassandra.dht.Abstract import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessageSerializer; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.SlicePredicate; @@ -48,6 +49,8 @@ import java.util.HashMap; public class SerializationsTest extends AbstractSerializationsTester { + private static MessageSerializer messageSerializer = new MessageSerializer(); + private void testRangeSliceCommandWrite() throws IOException { ByteBuffer startCol = ByteBufferUtil.bytes("Start"); @@ -73,12 +76,12 @@ public class SerializationsTest extends DataOutputStream dout = getOutput("db.RangeSliceCommand.bin"); - Message.serializer().serialize(namesCmd, dout, getVersion()); - Message.serializer().serialize(emptyRangeCmd, dout, getVersion()); - Message.serializer().serialize(regRangeCmd, dout, getVersion()); - Message.serializer().serialize(namesCmdSup, dout, getVersion()); - Message.serializer().serialize(emptyRangeCmdSup, dout, getVersion()); - Message.serializer().serialize(regRangeCmdSup, dout, getVersion()); + messageSerializer.serialize(namesCmd, dout, getVersion()); + messageSerializer.serialize(emptyRangeCmd, dout, getVersion()); + messageSerializer.serialize(regRangeCmd, dout, getVersion()); + messageSerializer.serialize(namesCmdSup, dout, getVersion()); + messageSerializer.serialize(emptyRangeCmdSup, dout, getVersion()); + messageSerializer.serialize(regRangeCmdSup, dout, getVersion()); dout.close(); } @@ -91,7 +94,7 @@ public class SerializationsTest extends DataInputStream in = getInput("db.RangeSliceCommand.bin"); for (int i = 0; i < 6; i++) { - Message msg = Message.serializer().deserialize(in, getVersion()); + Message msg = messageSerializer.deserialize(in, getVersion()); RangeSliceCommand cmd = RangeSliceCommand.read(msg); } in.close(); @@ -107,8 +110,8 @@ public class SerializationsTest extends SliceByNamesReadCommand.serializer().serialize(superCmd, out, getVersion()); ReadCommand.serializer().serialize(standardCmd, out, getVersion()); ReadCommand.serializer().serialize(superCmd, out, getVersion()); - Message.serializer().serialize(standardCmd.getMessage(getVersion()), out, getVersion()); - Message.serializer().serialize(superCmd.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(standardCmd.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(superCmd.getMessage(getVersion()), out, getVersion()); out.close(); } @@ -123,8 +126,8 @@ public class SerializationsTest extends assert SliceByNamesReadCommand.serializer().deserialize(in, getVersion()) != null; assert ReadCommand.serializer().deserialize(in, getVersion()) != null; assert ReadCommand.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; in.close(); } @@ -137,8 +140,8 @@ public class SerializationsTest extends SliceFromReadCommand.serializer().serialize(superCmd, out, getVersion()); ReadCommand.serializer().serialize(standardCmd, out, getVersion()); ReadCommand.serializer().serialize(superCmd, out, getVersion()); - Message.serializer().serialize(standardCmd.getMessage(getVersion()), out, getVersion()); - Message.serializer().serialize(superCmd.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(standardCmd.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(superCmd.getMessage(getVersion()), out, getVersion()); out.close(); } @@ -153,8 +156,8 @@ public class SerializationsTest extends assert SliceFromReadCommand.serializer().deserialize(in, getVersion()) != null; assert ReadCommand.serializer().deserialize(in, getVersion()) != null; assert ReadCommand.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; in.close(); } @@ -201,12 +204,12 @@ public class SerializationsTest extends RowMutation.serializer().serialize(standardRm, out, getVersion()); RowMutation.serializer().serialize(superRm, out, getVersion()); RowMutation.serializer().serialize(mixedRm, out, getVersion()); - Message.serializer().serialize(emptyRm.getMessage(getVersion()), out, getVersion()); - Message.serializer().serialize(standardRowRm.getMessage(getVersion()), out, getVersion()); - Message.serializer().serialize(superRowRm.getMessage(getVersion()), out, getVersion()); - Message.serializer().serialize(standardRm.getMessage(getVersion()), out, getVersion()); - Message.serializer().serialize(superRm.getMessage(getVersion()), out, getVersion()); - Message.serializer().serialize(mixedRm.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(emptyRm.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(standardRowRm.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(superRowRm.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(standardRm.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(superRm.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(mixedRm.getMessage(getVersion()), out, getVersion()); out.close(); } @@ -223,12 +226,12 @@ public class SerializationsTest extends assert RowMutation.serializer().deserialize(in, getVersion()) != null; assert RowMutation.serializer().deserialize(in, getVersion()) != null; assert RowMutation.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; in.close(); } @@ -241,9 +244,9 @@ public class SerializationsTest extends Truncation.serializer().serialize(tr, out, getVersion()); TruncateResponse.serializer().serialize(aff, out, getVersion()); TruncateResponse.serializer().serialize(neg, out, getVersion()); - Message.serializer().serialize(tr.getMessage(getVersion()), out, getVersion()); - Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()), aff), out, getVersion()); - Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()), neg), out, getVersion()); + messageSerializer.serialize(tr.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()), aff), out, getVersion()); + messageSerializer.serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()), neg), out, getVersion()); // todo: notice how CF names weren't validated. out.close(); } @@ -258,9 +261,9 @@ public class SerializationsTest extends assert Truncation.serializer().deserialize(in, getVersion()) != null; assert TruncateResponse.serializer().deserialize(in, getVersion()) != null; assert TruncateResponse.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; in.close(); } Added: cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java?rev=1153678&view=auto ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java (added) +++ cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java Wed Aug 3 21:40:34 2011 @@ -0,0 +1,28 @@ +package org.apache.cassandra.net; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.cassandra.io.ICompactSerializer; + +public class MessageSerializer implements ICompactSerializer<Message> +{ + public void serialize(Message t, DataOutputStream dos, int version) throws IOException + { + assert t.getVersion() == version : "internode protocol version mismatch"; // indicates programmer error. + Header.serializer().serialize( t.header_, dos, version); + byte[] bytes = t.getMessageBody(); + dos.writeInt(bytes.length); + dos.write(bytes); + } + + public Message deserialize(DataInputStream dis, int version) throws IOException + { + Header header = Header.serializer().deserialize(dis, version); + int size = dis.readInt(); + byte[] bytes = new byte[size]; + dis.readFully(bytes); + return new Message(header, bytes, version); + } +} Propchange: cassandra/trunk/test/unit/org/apache/cassandra/net/MessageSerializer.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java?rev=1153678&r1=1153677&r2=1153678&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java Wed Aug 3 21:40:34 2011 @@ -21,32 +21,34 @@ package org.apache.cassandra.service; */ +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + import org.apache.cassandra.AbstractSerializationsTester; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.net.Message; -import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.MessageSerializer; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; -import org.junit.Test; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; public class SerializationsTest extends AbstractSerializationsTester { + private static MessageSerializer messageSerializer = new MessageSerializer(); + public static Range FULL_RANGE = new Range(StorageService.getPartitioner().getMinimumToken(), StorageService.getPartitioner().getMinimumToken()); private void testTreeRequestWrite() throws IOException { DataOutputStream out = getOutput("service.TreeRequest.bin"); AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.serialize(Statics.req, out, getVersion()); - Message.serializer().serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req, getVersion()), out, getVersion()); + messageSerializer.serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req, getVersion()), out, getVersion()); out.close(); } @@ -58,7 +60,7 @@ public class SerializationsTest extends DataInputStream in = getInput("service.TreeRequest.bin"); assert AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; in.close(); } @@ -78,8 +80,8 @@ public class SerializationsTest extends DataOutputStream out = getOutput("service.TreeResponse.bin"); AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v0, out, getVersion()); AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v1, out, getVersion()); - Message.serializer().serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(), v0), out, getVersion()); - Message.serializer().serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(), v1), out, getVersion()); + messageSerializer.serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(), v0), out, getVersion()); + messageSerializer.serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(), v1), out, getVersion()); out.close(); } @@ -92,8 +94,8 @@ public class SerializationsTest extends DataInputStream in = getInput("service.TreeResponse.bin"); assert AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in, getVersion()) != null; assert AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; in.close(); } Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1153678&r1=1153677&r2=1153678&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Wed Aug 3 21:40:34 2011 @@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.D import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessageSerializer; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -47,6 +48,8 @@ import java.util.*; public class SerializationsTest extends AbstractSerializationsTester { + private static MessageSerializer messageSerializer = new MessageSerializer(); + private void testPendingFileWrite() throws IOException { // make sure to test serializing null and a pf with no sstable. @@ -116,7 +119,7 @@ public class SerializationsTest extends StreamReply rep = new StreamReply("this is a file", 123L, StreamReply.Status.FILE_FINISHED); DataOutputStream out = getOutput("streaming.StreamReply.bin"); StreamReply.serializer.serialize(rep, out, getVersion()); - Message.serializer().serialize(rep.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(rep.getMessage(getVersion()), out, getVersion()); out.close(); } @@ -128,7 +131,7 @@ public class SerializationsTest extends DataInputStream in = getInput("streaming.StreamReply.bin"); assert StreamReply.serializer.deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; in.close(); } @@ -155,9 +158,9 @@ public class SerializationsTest extends StreamRequestMessage.serializer().serialize(msg0, out, getVersion()); StreamRequestMessage.serializer().serialize(msg1, out, getVersion()); StreamRequestMessage.serializer().serialize(msg2, out, getVersion()); - Message.serializer().serialize(msg0.getMessage(getVersion()), out, getVersion()); - Message.serializer().serialize(msg1.getMessage(getVersion()), out, getVersion()); - Message.serializer().serialize(msg2.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(msg0.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(msg1.getMessage(getVersion()), out, getVersion()); + messageSerializer.serialize(msg2.getMessage(getVersion()), out, getVersion()); out.close(); } @@ -171,9 +174,9 @@ public class SerializationsTest extends assert StreamRequestMessage.serializer().deserialize(in, getVersion()) != null; assert StreamRequestMessage.serializer().deserialize(in, getVersion()) != null; assert StreamRequestMessage.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; - assert Message.serializer().deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; + assert messageSerializer.deserialize(in, getVersion()) != null; in.close(); }