Author: gdusbabek
Date: Thu Dec  2 15:25:50 2010
New Revision: 1041408

URL: http://svn.apache.org/viewvc?rev=1041408&view=rev
Log:
avoid extra RM serialization on write, remove dead code. patch by gdusbabek, 
reviewed by jbellis. CASSANDRA-1800

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1041408&r1=1041407&r2=1041408&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Dec  
2 15:25:50 2010
@@ -62,6 +62,8 @@ public class RowMutation
     private ByteBuffer key_;
     // map of column family id to mutations for that column family.
     protected Map<Integer, ColumnFamily> modifications_ = new HashMap<Integer, 
ColumnFamily>();
+    
+    private byte[] preserializedBuffer = null;
 
     public RowMutation(String table, ByteBuffer key)
     {
@@ -212,10 +214,7 @@ public class RowMutation
 
     public Message makeRowMutationMessage(StorageService.Verb verb) throws 
IOException
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), verb, 
bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), verb, 
getSerializedBuffer());
     }
 
     public static RowMutation getRowMutationFromMutations(String keyspace, 
ByteBuffer key, Map<String, List<Mutation>> cfmap)
@@ -239,37 +238,17 @@ public class RowMutation
         return rm;
     }
     
-    public static RowMutation getRowMutation(String table, ByteBuffer key, 
Map<String, List<ColumnOrSuperColumn>> cfmap)
+    private synchronized byte[] getSerializedBuffer() throws IOException
     {
-        RowMutation rm = new RowMutation(table, key);
-        for (Map.Entry<String, List<ColumnOrSuperColumn>> entry : 
cfmap.entrySet())
+        if (preserializedBuffer == null)
         {
-            String cfName = entry.getKey();
-            for (ColumnOrSuperColumn cosc : entry.getValue())
-            {
-                if (cosc.column == null)
-                {
-                    assert cosc.super_column != null;
-                    for (org.apache.cassandra.thrift.Column column : 
cosc.super_column.columns)
-                    {
-                        rm.add(new QueryPath(cfName, cosc.super_column.name, 
column.name), column.value, column.timestamp, column.ttl);
-                    }
-                }
-                else
-                {
-                    assert cosc.super_column == null;
-                    rm.add(new QueryPath(cfName, null, cosc.column.name), 
cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
-                }
-            }
+            ByteArrayOutputStream bout = new ByteArrayOutputStream();
+            DataOutputStream dout = new DataOutputStream(bout);
+            RowMutation.serializer().serialize(this, dout);
+            dout.close();
+            preserializedBuffer = bout.toByteArray();
         }
-        return rm;
-    }
-    
-    public DataOutputBuffer getSerializedBuffer() throws IOException
-    {
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        RowMutation.serializer().serialize(this, buffer);
-        return buffer;
+        return preserializedBuffer;
     }
 
     public String toString()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1041408&r1=1041407&r2=1041408&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Dec  2 
15:25:50 2010
@@ -379,7 +379,7 @@ public class Table
      * Once this happens the data associated with the individual column 
families
      * is also written to the column family store's memtable.
     */
-    public void apply(RowMutation mutation, Object serializedMutation, boolean 
writeCommitLog) throws IOException
+    public void apply(RowMutation mutation, byte[] serializedMutation, boolean 
writeCommitLog) throws IOException
     {
         List<Memtable> memtablesToFlush = Collections.emptyList();
         if (logger.isDebugEnabled())

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1041408&r1=1041407&r2=1041408&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
Thu Dec  2 15:25:50 2010
@@ -392,7 +392,7 @@ public class CommitLog
      * of any problems. This way we can assume that the subsequent commit log
      * entry will override the garbage left over by the previous write.
     */
-    public void add(RowMutation rowMutation, Object serializedRow) throws 
IOException
+    public void add(RowMutation rowMutation, byte[] serializedRow) throws 
IOException
     {
         executor.add(new LogRecordAdder(rowMutation, serializedRow));
     }
@@ -494,9 +494,9 @@ public class CommitLog
     class LogRecordAdder implements Callable, Runnable
     {
         final RowMutation rowMutation;
-        final Object serializedRow;
+        final byte[] serializedRow;
 
-        LogRecordAdder(RowMutation rm, Object serializedRow)
+        LogRecordAdder(RowMutation rm, byte[] serializedRow)
         {
             this.rowMutation = rm;
             this.serializedRow = serializedRow;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1041408&r1=1041407&r2=1041408&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
 Thu Dec  2 15:25:50 2010
@@ -75,7 +75,7 @@ public class CommitLogSegment
         return new BufferedRandomAccessFile(file, "rw", 128 * 1024);
     }
 
-    public CommitLogSegment.CommitLogContext write(RowMutation rowMutation, 
Object serializedRow) throws IOException
+    public CommitLogSegment.CommitLogContext write(RowMutation rowMutation, 
byte[] serializedRow) throws IOException
     {
         long currentPosition = -1L;
         try
@@ -106,23 +106,13 @@ public class CommitLogSegment
             }
 
             // write mutation, w/ checksum on the size and data
-            byte[] bytes;
             Checksum checksum = new CRC32();
-            if (serializedRow instanceof DataOutputBuffer)
-            {
-                bytes = ((DataOutputBuffer) serializedRow).getData();
-            }
-            else
-            {
-                assert serializedRow instanceof byte[];
-                bytes = (byte[]) serializedRow;
-            }
 
-            checksum.update(bytes.length);
-            logWriter.writeInt(bytes.length);
+            checksum.update(serializedRow.length);
+            logWriter.writeInt(serializedRow.length);
             logWriter.writeLong(checksum.getValue());
-            logWriter.write(bytes);
-            checksum.update(bytes, 0, bytes.length);
+            logWriter.write(serializedRow);
+            checksum.update(serializedRow, 0, serializedRow.length);
             logWriter.writeLong(checksum.getValue());
 
             return cLogCtx;


Reply via email to