Author: slebresne
Date: Mon Sep 5 14:55:28 2011
New Revision: 1165306
URL: http://svn.apache.org/viewvc?rev=1165306&view=rev
Log:
Handle large rows with single-pass streaming
patch by yukim; reviewed by slebresne for CASSANDRA-3003
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1165306&r1=1165305&r2=1165306&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Sep 5 14:55:28 2011
@@ -12,7 +12,7 @@
* don't bother persisting columns shadowed by a row tombstone (CASSANDRA-2589)
* reset CF and SC deletion times after gc_grace (CASSANDRA-2317)
* optimize away seek when compacting wide rows (CASSANDRA-2879)
- * single-pass streaming (CASSANDRA-2677)
+ * single-pass streaming (CASSANDRA-2677, 3003)
* use reference counting for deleting sstables instead of relying on GC
(CASSANDRA-2521)
* store hints as serialized mutations instead of pointers to data row
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1165306&r1=1165305&r2=1165306&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java Mon Sep
5 14:55:28 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
-import java.util.Map;
import org.apache.log4j.Logger;
@@ -70,7 +69,9 @@ public class CounterColumn extends Colum
public static CounterColumn create(ByteBuffer name, ByteBuffer value, long
timestamp, long timestampOfLastDelete, boolean fromRemote)
{
- if (fromRemote)
+ // #elt being negative means we have to clean delta
+ short count = value.getShort(value.position());
+ if (fromRemote || count < 0)
value = CounterContext.instance().clearAllDelta(value);
return new CounterColumn(name, value, timestamp,
timestampOfLastDelete);
}
@@ -285,4 +286,8 @@ public class CounterColumn extends Colum
}
}
+ public IColumn markDeltaToBeCleared()
+ {
+ return new CounterColumn(name,
contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1165306&r1=1165305&r2=1165306&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
Mon Sep 5 14:55:28 2011
@@ -130,7 +130,7 @@ public class CounterContext implements I
private static int headerLength(ByteBuffer context)
{
- return HEADER_SIZE_LENGTH + context.getShort(context.position()) *
HEADER_ELT_LENGTH;
+ return HEADER_SIZE_LENGTH +
Math.abs(context.getShort(context.position())) * HEADER_ELT_LENGTH;
}
private static int compareId(ByteBuffer bb1, int pos1, ByteBuffer bb2, int
pos2)
@@ -442,6 +442,28 @@ public class CounterContext implements I
}
/**
+ * Mark context to delete delta afterward.
+ * Marking is done by multiply #elt by -1 to preserve header length
+ * and #elt count in order to clear all delta later.
+ *
+ * @param context a counter context
+ * @return context that marked to delete delta
+ */
+ public ByteBuffer markDeltaToBeCleared(ByteBuffer context)
+ {
+ int headerLength = headerLength(context);
+ if (headerLength == 0)
+ return context;
+
+ ByteBuffer marked = context.duplicate();
+ short count = context.getShort(context.position());
+ // negate #elt to mark as deleted, without changing its size.
+ if (count > 0)
+ marked.putShort(marked.position(), (short) (count * -1));
+ return marked;
+ }
+
+ /**
* Remove all the delta of a context (i.e, set an empty header).
*
* @param context a counter context
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1165306&r1=1165305&r2=1165306&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Mon Sep 5 14:55:28 2011
@@ -36,7 +36,6 @@ import org.apache.cassandra.db.compactio
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -194,6 +193,71 @@ public class SSTableWriter extends SSTab
afterAppend(decoratedKey, currentPosition);
}
+ public long appendFromStream(DecoratedKey key, CFMetaData metadata, long
dataSize, DataInput in) throws IOException
+ {
+ long currentPosition = beforeAppend(key);
+ ByteBufferUtil.writeWithShortLength(key.key, dataFile.stream);
+ long dataStart = dataFile.getFilePointer();
+
+ // write row size
+ dataFile.stream.writeLong(dataSize);
+
+ // write BF
+ int bfSize = in.readInt();
+ dataFile.stream.writeInt(bfSize);
+ for (int i = 0; i < bfSize; i++)
+ dataFile.stream.writeByte(in.readByte());
+
+ // write index
+ int indexSize = in.readInt();
+ dataFile.stream.writeInt(indexSize);
+ for (int i = 0; i < indexSize; i++)
+ dataFile.stream.writeByte(in.readByte());
+
+ // cf data
+ dataFile.stream.writeInt(in.readInt());
+ dataFile.stream.writeLong(in.readLong());
+
+ // column size
+ int columnCount = in.readInt();
+ dataFile.stream.writeInt(columnCount);
+
+ // deserialize each column to obtain maxTimestamp and immediately
serialize it.
+ long maxTimestamp = Long.MIN_VALUE;
+ ColumnFamily cf = ColumnFamily.create(metadata,
ArrayBackedSortedColumns.FACTORY);
+ for (int i = 0; i < columnCount; i++)
+ {
+ // deserialize column with fromRemote false, in order to keep size
of streamed column
+ IColumn column = cf.getColumnSerializer().deserialize(in, false,
Integer.MIN_VALUE);
+ if (column instanceof CounterColumn)
+ {
+ column = ((CounterColumn) column).markDeltaToBeCleared();
+ }
+ else if (column instanceof SuperColumn)
+ {
+ SuperColumn sc = (SuperColumn) column;
+ for (IColumn subColumn : sc.getSubColumns())
+ {
+ if (subColumn instanceof CounterColumn)
+ {
+ IColumn marked = ((CounterColumn)
subColumn).markDeltaToBeCleared();
+ sc.replace(subColumn, marked);
+ }
+ }
+ }
+ maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp());
+ cf.getColumnSerializer().serialize(column, dataFile.stream);
+ }
+
+ assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
+ : "incorrect row data size " + dataSize + " written to " +
dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart
+ 8));
+ sstableMetadataCollector.updateMaxTimestamp(maxTimestamp);
+ sstableMetadataCollector.addRowSize(dataFile.getFilePointer() -
currentPosition);
+ sstableMetadataCollector.addColumnCount(columnCount);
+ afterAppend(key, currentPosition);
+ return currentPosition;
+ }
+
public void updateMaxTimestamp(long timestamp)
{
sstableMetadataCollector.updateMaxTimestamp(timestamp);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1165306&r1=1165305&r2=1165306&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Mon Sep 5 14:55:28 2011
@@ -122,64 +122,27 @@ public class IncomingStreamReader
in.reset(0);
key =
SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc,
ByteBufferUtil.readWithShortLength(in));
long dataSize = SSTableReader.readRowSize(in,
localFile.desc);
- ColumnFamily cf = null;
- if (cfs.metadata.getDefaultValidator().isCommutative())
+
+ ColumnFamily cached = cfs.getRawCachedRow(key);
+ if (cached != null && remoteFile.type == OperationType.AES
&& dataSize <= DatabaseDescriptor.getInMemoryCompactionLimit())
{
- // take care of counter column family
+ // need to update row cache
if (controller == null)
controller = new CompactionController(cfs,
Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
SSTableIdentityIterator iter = new
SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
- AbstractCompactedRow row =
controller.getCompactedRow(iter);
+ PrecompactedRow row = new PrecompactedRow(controller,
Collections.singletonList(iter));
writer.append(row);
// row append does not update the max timestamp on its
own
writer.updateMaxTimestamp(row.maxTimestamp());
- if (row instanceof PrecompactedRow)
- {
- // we do not purge so we should not get a null here
- cf = ((PrecompactedRow)row).getFullColumnFamily();
- }
+ // update cache
+ ColumnFamily cf = row.getFullColumnFamily();
+ cfs.updateRowCache(key, cf);
}
else
{
- // skip BloomFilter
- IndexHelper.skipBloomFilter(in);
- // skip Index
- IndexHelper.skipIndex(in);
-
- // restore ColumnFamily
- cf = ColumnFamily.create(cfs.metadata);
-
ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
- ColumnFamily.serializer().deserializeColumns(in, cf,
true);
-
- // write key and cf
- writer.append(key, cf);
- }
-
- // update cache
- ColumnFamily cached = cfs.getRawCachedRow(key);
- if (cached != null)
- {
- switch (remoteFile.type)
- {
- case AES:
- if (dataSize >
DatabaseDescriptor.getInMemoryCompactionLimit())
- {
- // We have a key in cache for a very big
row, that is fishy. We don't fail here however because that would prevent the
sstable
- // from being build (and there is no real
point anyway), so we just invalidate the row for correction and log a warning.
- logger.warn("Found a cached row over the
in memory compaction limit during post-streaming rebuilt; it is highly
recommended to avoid huge row on column family with row cache enabled.");
- cfs.invalidateCachedRow(key);
- }
- else
- {
- assert cf != null;
- cfs.updateRowCache(key, cf);
- }
- break;
- default:
- cfs.invalidateCachedRow(key);
- break;
- }
+ writer.appendFromStream(key, cfs.metadata, dataSize,
in);
+ cfs.invalidateCachedRow(key);
}
bytesRead += in.getBytesRead();