Author: gdusbabek
Date: Thu Dec 2 15:26:13 2010
New Revision: 1041409
URL: http://svn.apache.org/viewvc?rev=1041409&view=rev
Log:
remove preserialized arg from write path, initialize preserialized RM with raw
bytes during deserialization. patch by gdusbabek, reviewed by jbellis.
CASSANDRA-1800
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1041409&r1=1041408&r2=1041409&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Dec
2 15:26:13 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.db;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -45,7 +46,7 @@ import org.apache.commons.lang.StringUti
public class RowMutation
{
- private static ICompactSerializer<RowMutation> serializer_;
+ private static RowMutationSerializer serializer_;
public static final String HINT = "HINT";
static
@@ -195,7 +196,7 @@ public class RowMutation
*/
public void apply() throws IOException
{
- Table.open(table_).apply(this, getSerializedBuffer(), true);
+ Table.open(table_).apply(this, true);
}
/*
@@ -238,7 +239,7 @@ public class RowMutation
return rm;
}
- private synchronized byte[] getSerializedBuffer() throws IOException
+ public synchronized byte[] getSerializedBuffer() throws IOException
{
if (preserializedBuffer == null)
{
@@ -309,6 +310,13 @@ public class RowMutation
rm.delete(new QueryPath(cfName, del.super_column), del.timestamp);
}
}
+
+ static RowMutation fromBytes(byte[] raw) throws IOException
+ {
+ RowMutation rm = serializer_.deserialize(new DataInputStream(new
ByteArrayInputStream(raw)));
+ rm.preserializedBuffer = raw;
+ return rm;
+ }
}
class RowMutationSerializer implements ICompactSerializer<RowMutation>
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1041409&r1=1041408&r2=1041409&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Thu Dec 2 15:26:13 2010
@@ -44,12 +44,9 @@ public class RowMutationVerbHandler impl
public void doVerb(Message message)
{
- byte[] bytes = message.getMessageBody();
- ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
-
try
{
- RowMutation rm = RowMutation.serializer().deserialize(new
DataInputStream(buffer));
+ RowMutation rm = RowMutation.fromBytes(message.getMessageBody());
if (logger_.isDebugEnabled())
logger_.debug("Applying " + rm);
@@ -70,7 +67,7 @@ public class RowMutationVerbHandler impl
}
}
- Table.open(rm.getTable()).apply(rm, bytes, true);
+ Table.open(rm.getTable()).apply(rm, true);
WriteResponse response = new WriteResponse(rm.getTable(),
rm.key(), true);
Message responseMessage =
WriteResponse.makeWriteResponseMessage(message, response);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1041409&r1=1041408&r2=1041409&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Dec 2
15:26:13 2010
@@ -379,7 +379,7 @@ public class Table
* Once this happens the data associated with the individual column
families
* is also written to the column family store's memtable.
*/
- public void apply(RowMutation mutation, byte[] serializedMutation, boolean
writeCommitLog) throws IOException
+ public void apply(RowMutation mutation, boolean writeCommitLog) throws
IOException
{
List<Memtable> memtablesToFlush = Collections.emptyList();
if (logger.isDebugEnabled())
@@ -390,7 +390,7 @@ public class Table
try
{
if (writeCommitLog)
- CommitLog.instance.add(mutation, serializedMutation);
+ CommitLog.instance.add(mutation);
DecoratedKey key =
StorageService.getPartitioner().decorateKey(mutation.key());
for (ColumnFamily cf : mutation.getColumnFamilies())
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1041409&r1=1041408&r2=1041409&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Thu Dec 2 15:26:13 2010
@@ -325,7 +325,7 @@ public class CommitLog
}
if (!newRm.isEmpty())
{
- Table.open(newRm.getTable()).apply(newRm,
null, false);
+ Table.open(newRm.getTable()).apply(newRm,
false);
}
}
};
@@ -392,9 +392,9 @@ public class CommitLog
* of any problems. This way we can assume that the subsequent commit log
* entry will override the garbage left over by the previous write.
*/
- public void add(RowMutation rowMutation, byte[] serializedRow) throws
IOException
+ public void add(RowMutation rowMutation) throws IOException
{
- executor.add(new LogRecordAdder(rowMutation, serializedRow));
+ executor.add(new LogRecordAdder(rowMutation));
}
/*
@@ -494,19 +494,17 @@ public class CommitLog
class LogRecordAdder implements Callable, Runnable
{
final RowMutation rowMutation;
- final byte[] serializedRow;
- LogRecordAdder(RowMutation rm, byte[] serializedRow)
+ LogRecordAdder(RowMutation rm)
{
this.rowMutation = rm;
- this.serializedRow = serializedRow;
}
public void run()
{
try
{
- currentSegment().write(rowMutation, serializedRow);
+ currentSegment().write(rowMutation);
// roll log if necessary
if (currentSegment().length() >= SEGMENT_SIZE)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1041409&r1=1041408&r2=1041409&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Thu Dec 2 15:26:13 2010
@@ -75,7 +75,7 @@ public class CommitLogSegment
return new BufferedRandomAccessFile(file, "rw", 128 * 1024);
}
- public CommitLogSegment.CommitLogContext write(RowMutation rowMutation,
byte[] serializedRow) throws IOException
+ public CommitLogSegment.CommitLogContext write(RowMutation rowMutation)
throws IOException
{
long currentPosition = -1L;
try
@@ -107,7 +107,7 @@ public class CommitLogSegment
// write mutation, w/ checksum on the size and data
Checksum checksum = new CRC32();
-
+ byte[] serializedRow = rowMutation.getSerializedBuffer();
checksum.update(serializedRow.length);
logWriter.writeInt(serializedRow.length);
logWriter.writeLong(checksum.getValue());