Author: slebresne
Date: Mon Sep  5 14:55:28 2011
New Revision: 1165306

URL: http://svn.apache.org/viewvc?rev=1165306&view=rev
Log:
Handle large rows with single-pass streaming
patch by yukim; reviewed by slebresne for CASSANDRA-3003

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1165306&r1=1165305&r2=1165306&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Sep  5 14:55:28 2011
@@ -12,7 +12,7 @@
  * don't bother persisting columns shadowed by a row tombstone (CASSANDRA-2589)
  * reset CF and SC deletion times after gc_grace (CASSANDRA-2317)
  * optimize away seek when compacting wide rows (CASSANDRA-2879)
- * single-pass streaming (CASSANDRA-2677)
+ * single-pass streaming (CASSANDRA-2677, 3003)
  * use reference counting for deleting sstables instead of relying on GC
    (CASSANDRA-2521)
  * store hints as serialized mutations instead of pointers to data row

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1165306&r1=1165305&r2=1165306&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java Mon Sep 
 5 14:55:28 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
-import java.util.Map;
 
 import org.apache.log4j.Logger;
 
@@ -70,7 +69,9 @@ public class CounterColumn extends Colum
 
     public static CounterColumn create(ByteBuffer name, ByteBuffer value, long 
timestamp, long timestampOfLastDelete, boolean fromRemote)
     {
-        if (fromRemote)
+        // #elt being negative means we have to clean delta
+        short count = value.getShort(value.position());
+        if (fromRemote || count < 0)
             value = CounterContext.instance().clearAllDelta(value);
         return new CounterColumn(name, value, timestamp, 
timestampOfLastDelete);
     }
@@ -285,4 +286,8 @@ public class CounterColumn extends Colum
         }
     }
 
+    public IColumn markDeltaToBeCleared()
+    {
+        return new CounterColumn(name, 
contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1165306&r1=1165305&r2=1165306&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java 
Mon Sep  5 14:55:28 2011
@@ -130,7 +130,7 @@ public class CounterContext implements I
 
     private static int headerLength(ByteBuffer context)
     {
-        return HEADER_SIZE_LENGTH + context.getShort(context.position()) * 
HEADER_ELT_LENGTH;
+        return HEADER_SIZE_LENGTH + 
Math.abs(context.getShort(context.position())) * HEADER_ELT_LENGTH;
     }
 
     private static int compareId(ByteBuffer bb1, int pos1, ByteBuffer bb2, int 
pos2)
@@ -442,6 +442,28 @@ public class CounterContext implements I
     }
 
     /**
+     * Mark context to delete delta afterward.
+     * Marking is done by multiply #elt by -1 to preserve header length
+     * and #elt count in order to clear all delta later.
+     *
+     * @param context a counter context
+     * @return context that marked to delete delta
+     */
+    public ByteBuffer markDeltaToBeCleared(ByteBuffer context)
+    {
+        int headerLength = headerLength(context);
+        if (headerLength == 0)
+            return context;
+
+        ByteBuffer marked = context.duplicate();
+        short count = context.getShort(context.position());
+        // negate #elt to mark as deleted, without changing its size.
+        if (count > 0)
+            marked.putShort(marked.position(), (short) (count * -1));
+        return marked;
+    }
+
+    /**
      * Remove all the delta of a context (i.e, set an empty header).
      *
      * @param context a counter context

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1165306&r1=1165305&r2=1165306&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
Mon Sep  5 14:55:28 2011
@@ -36,7 +36,6 @@ import org.apache.cassandra.db.compactio
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -194,6 +193,71 @@ public class SSTableWriter extends SSTab
         afterAppend(decoratedKey, currentPosition);
     }
 
+    public long appendFromStream(DecoratedKey key, CFMetaData metadata, long 
dataSize, DataInput in) throws IOException
+    {
+        long currentPosition = beforeAppend(key);
+        ByteBufferUtil.writeWithShortLength(key.key, dataFile.stream);
+        long dataStart = dataFile.getFilePointer();
+
+        // write row size
+        dataFile.stream.writeLong(dataSize);
+
+        // write BF
+        int bfSize = in.readInt();
+        dataFile.stream.writeInt(bfSize);
+        for (int i = 0; i < bfSize; i++)
+            dataFile.stream.writeByte(in.readByte());
+
+        // write index
+        int indexSize = in.readInt();
+        dataFile.stream.writeInt(indexSize);
+        for (int i = 0; i < indexSize; i++)
+            dataFile.stream.writeByte(in.readByte());
+
+        // cf data
+        dataFile.stream.writeInt(in.readInt());
+        dataFile.stream.writeLong(in.readLong());
+
+        // column size
+        int columnCount = in.readInt();
+        dataFile.stream.writeInt(columnCount);
+
+        // deserialize each column to obtain maxTimestamp and immediately 
serialize it.
+        long maxTimestamp = Long.MIN_VALUE;
+        ColumnFamily cf = ColumnFamily.create(metadata, 
ArrayBackedSortedColumns.FACTORY);
+        for (int i = 0; i < columnCount; i++)
+        {
+            // deserialize column with fromRemote false, in order to keep size 
of streamed column
+            IColumn column = cf.getColumnSerializer().deserialize(in, false, 
Integer.MIN_VALUE);
+            if (column instanceof CounterColumn)
+            {
+                column = ((CounterColumn) column).markDeltaToBeCleared();
+            }
+            else if (column instanceof SuperColumn)
+            {
+                SuperColumn sc = (SuperColumn) column;
+                for (IColumn subColumn : sc.getSubColumns())
+                {
+                    if (subColumn instanceof CounterColumn)
+                    {
+                        IColumn marked = ((CounterColumn) 
subColumn).markDeltaToBeCleared();
+                        sc.replace(subColumn, marked);
+                    }
+                }
+            }
+            maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp());
+            cf.getColumnSerializer().serialize(column, dataFile.stream);
+        }
+
+        assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
+                : "incorrect row data size " + dataSize + " written to " + 
dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart 
+ 8));
+        sstableMetadataCollector.updateMaxTimestamp(maxTimestamp);
+        sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - 
currentPosition);
+        sstableMetadataCollector.addColumnCount(columnCount);
+        afterAppend(key, currentPosition);
+        return currentPosition;
+    }
+
     public void updateMaxTimestamp(long timestamp)
     {
         sstableMetadataCollector.updateMaxTimestamp(timestamp);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1165306&r1=1165305&r2=1165306&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Mon Sep  5 14:55:28 2011
@@ -122,64 +122,27 @@ public class IncomingStreamReader
                     in.reset(0);
                     key = 
SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, 
ByteBufferUtil.readWithShortLength(in));
                     long dataSize = SSTableReader.readRowSize(in, 
localFile.desc);
-                    ColumnFamily cf = null;
-                    if (cfs.metadata.getDefaultValidator().isCommutative())
+
+                    ColumnFamily cached = cfs.getRawCachedRow(key);
+                    if (cached != null && remoteFile.type == OperationType.AES 
&& dataSize <= DatabaseDescriptor.getInMemoryCompactionLimit())
                     {
-                        // take care of counter column family
+                        // need to update row cache
                         if (controller == null)
                             controller = new CompactionController(cfs, 
Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
                         SSTableIdentityIterator iter = new 
SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
-                        AbstractCompactedRow row = 
controller.getCompactedRow(iter);
+                        PrecompactedRow row = new PrecompactedRow(controller, 
Collections.singletonList(iter));
                         writer.append(row);
                         // row append does not update the max timestamp on its 
own
                         writer.updateMaxTimestamp(row.maxTimestamp());
 
-                        if (row instanceof PrecompactedRow)
-                        {
-                            // we do not purge so we should not get a null here
-                            cf = ((PrecompactedRow)row).getFullColumnFamily();
-                        }
+                        // update cache
+                        ColumnFamily cf = row.getFullColumnFamily();
+                        cfs.updateRowCache(key, cf);
                     }
                     else
                     {
-                        // skip BloomFilter
-                        IndexHelper.skipBloomFilter(in);
-                        // skip Index
-                        IndexHelper.skipIndex(in);
-
-                        // restore ColumnFamily
-                        cf = ColumnFamily.create(cfs.metadata);
-                        
ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
-                        ColumnFamily.serializer().deserializeColumns(in, cf, 
true);
-
-                        // write key and cf
-                        writer.append(key, cf);
-                    }
-
-                    // update cache
-                    ColumnFamily cached = cfs.getRawCachedRow(key);
-                    if (cached != null)
-                    {
-                        switch (remoteFile.type)
-                        {
-                            case AES:
-                                if (dataSize > 
DatabaseDescriptor.getInMemoryCompactionLimit())
-                                {
-                                    // We have a key in cache for a very big 
row, that is fishy. We don't fail here however because that would prevent the 
sstable
-                                    // from being build (and there is no real 
point anyway), so we just invalidate the row for correction and log a warning.
-                                    logger.warn("Found a cached row over the 
in memory compaction limit during post-streaming rebuilt; it is highly 
recommended to avoid huge row on column family with row cache enabled.");
-                                    cfs.invalidateCachedRow(key);
-                                }
-                                else
-                                {
-                                    assert cf != null;
-                                    cfs.updateRowCache(key, cf);
-                                }
-                                break;
-                            default:
-                                cfs.invalidateCachedRow(key);
-                                break;
-                        }
+                        writer.appendFromStream(key, cfs.metadata, dataSize, 
in);
+                        cfs.invalidateCachedRow(key);
                     }
 
                     bytesRead += in.getBytesRead();


Reply via email to