Author: gdusbabek
Date: Thu Dec 2 15:25:50 2010
New Revision: 1041408
URL: http://svn.apache.org/viewvc?rev=1041408&view=rev
Log:
avoid extra RM serialization on write, remove dead code. patch by gdusbabek,
reviewed by jbellis. CASSANDRA-1800
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1041408&r1=1041407&r2=1041408&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Dec
2 15:25:50 2010
@@ -62,6 +62,8 @@ public class RowMutation
private ByteBuffer key_;
// map of column family id to mutations for that column family.
protected Map<Integer, ColumnFamily> modifications_ = new HashMap<Integer,
ColumnFamily>();
+
+ private byte[] preserializedBuffer = null;
public RowMutation(String table, ByteBuffer key)
{
@@ -212,10 +214,7 @@ public class RowMutation
public Message makeRowMutationMessage(StorageService.Verb verb) throws
IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), verb,
bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), verb,
getSerializedBuffer());
}
public static RowMutation getRowMutationFromMutations(String keyspace,
ByteBuffer key, Map<String, List<Mutation>> cfmap)
@@ -239,37 +238,17 @@ public class RowMutation
return rm;
}
- public static RowMutation getRowMutation(String table, ByteBuffer key,
Map<String, List<ColumnOrSuperColumn>> cfmap)
+ private synchronized byte[] getSerializedBuffer() throws IOException
{
- RowMutation rm = new RowMutation(table, key);
- for (Map.Entry<String, List<ColumnOrSuperColumn>> entry :
cfmap.entrySet())
+ if (preserializedBuffer == null)
{
- String cfName = entry.getKey();
- for (ColumnOrSuperColumn cosc : entry.getValue())
- {
- if (cosc.column == null)
- {
- assert cosc.super_column != null;
- for (org.apache.cassandra.thrift.Column column :
cosc.super_column.columns)
- {
- rm.add(new QueryPath(cfName, cosc.super_column.name,
column.name), column.value, column.timestamp, column.ttl);
- }
- }
- else
- {
- assert cosc.super_column == null;
- rm.add(new QueryPath(cfName, null, cosc.column.name),
cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
- }
- }
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ DataOutputStream dout = new DataOutputStream(bout);
+ RowMutation.serializer().serialize(this, dout);
+ dout.close();
+ preserializedBuffer = bout.toByteArray();
}
- return rm;
- }
-
- public DataOutputBuffer getSerializedBuffer() throws IOException
- {
- DataOutputBuffer buffer = new DataOutputBuffer();
- RowMutation.serializer().serialize(this, buffer);
- return buffer;
+ return preserializedBuffer;
}
public String toString()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1041408&r1=1041407&r2=1041408&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Dec 2
15:25:50 2010
@@ -379,7 +379,7 @@ public class Table
* Once this happens the data associated with the individual column
families
* is also written to the column family store's memtable.
*/
- public void apply(RowMutation mutation, Object serializedMutation, boolean
writeCommitLog) throws IOException
+ public void apply(RowMutation mutation, byte[] serializedMutation, boolean
writeCommitLog) throws IOException
{
List<Memtable> memtablesToFlush = Collections.emptyList();
if (logger.isDebugEnabled())
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1041408&r1=1041407&r2=1041408&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Thu Dec 2 15:25:50 2010
@@ -392,7 +392,7 @@ public class CommitLog
* of any problems. This way we can assume that the subsequent commit log
* entry will override the garbage left over by the previous write.
*/
- public void add(RowMutation rowMutation, Object serializedRow) throws
IOException
+ public void add(RowMutation rowMutation, byte[] serializedRow) throws
IOException
{
executor.add(new LogRecordAdder(rowMutation, serializedRow));
}
@@ -494,9 +494,9 @@ public class CommitLog
class LogRecordAdder implements Callable, Runnable
{
final RowMutation rowMutation;
- final Object serializedRow;
+ final byte[] serializedRow;
- LogRecordAdder(RowMutation rm, Object serializedRow)
+ LogRecordAdder(RowMutation rm, byte[] serializedRow)
{
this.rowMutation = rm;
this.serializedRow = serializedRow;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1041408&r1=1041407&r2=1041408&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Thu Dec 2 15:25:50 2010
@@ -75,7 +75,7 @@ public class CommitLogSegment
return new BufferedRandomAccessFile(file, "rw", 128 * 1024);
}
- public CommitLogSegment.CommitLogContext write(RowMutation rowMutation,
Object serializedRow) throws IOException
+ public CommitLogSegment.CommitLogContext write(RowMutation rowMutation,
byte[] serializedRow) throws IOException
{
long currentPosition = -1L;
try
@@ -106,23 +106,13 @@ public class CommitLogSegment
}
// write mutation, w/ checksum on the size and data
- byte[] bytes;
Checksum checksum = new CRC32();
- if (serializedRow instanceof DataOutputBuffer)
- {
- bytes = ((DataOutputBuffer) serializedRow).getData();
- }
- else
- {
- assert serializedRow instanceof byte[];
- bytes = (byte[]) serializedRow;
- }
- checksum.update(bytes.length);
- logWriter.writeInt(bytes.length);
+ checksum.update(serializedRow.length);
+ logWriter.writeInt(serializedRow.length);
logWriter.writeLong(checksum.getValue());
- logWriter.write(bytes);
- checksum.update(bytes, 0, bytes.length);
+ logWriter.write(serializedRow);
+ checksum.update(serializedRow, 0, serializedRow.length);
logWriter.writeLong(checksum.getValue());
return cLogCtx;