Author: jbellis
Date: Fri Aug  5 20:22:29 2011
New Revision: 1154371

URL: http://svn.apache.org/viewvc?rev=1154371&view=rev
Log:
merge from 0.8

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/contrib/   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
    
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug  5 20:22:29 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7:1026516-1151306
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1153724,1154006,1154219
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1153724,1154006,1154219,1154369
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689

Modified: cassandra/trunk/conf/cassandra.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1154371&r1=1154370&r2=1154371&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Fri Aug  5 20:22:29 2011
@@ -279,13 +279,13 @@ column_index_size_in_kb: 64
 # will be logged specifying the row key.
 in_memory_compaction_limit_in_mb: 64
 
-# Number of compaction threads (NOT including validation "compactions"
-# for anti-entropy repair). This default to the number of processors,
-# enabling multiple compactions to execute at once. Using more than one
-# thread is highly recommended to preserve read performance in a mixed
-# read/write workload as this avoids sstables from accumulating during long
-# running compactions. The default is usually fine and if you experience
-# problems with compaction running too slowly or too fast, you should look at
+# Number of simultaneous compactions to allow, NOT including
+# validation "compactions" for anti-entropy repair. This defaults to
+# the number of cores. This can help preserve read performance in a
+# mixed read/write workload, by mitigating the tendency of small
+# sstables to accumulate during a single long running compactions. The
+# default is usually fine and if you experience problems with
+# compaction running too slowly or too fast, you should look at
 # compaction_throughput_mb_per_sec first.
 #
 # Uncomment to make compaction mono-threaded.

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug  5 20:22:29 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
 /cassandra/branches/cassandra-0.7/contrib:1026516-1151306
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1153724,1154006,1154219
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1153724,1154006,1154219,1154369
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug  5 20:22:29 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1151306
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1153724,1154006,1154219
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1153724,1154006,1154219,1154369
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug  5 20:22:29 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1151306
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1153724,1154006,1154219
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1153724,1154006,1154219,1154369
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug  5 20:22:29 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1151306
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1153724,1154006,1154219
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1153724,1154006,1154219,1154369
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug  5 20:22:29 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1151306
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1153724,1154006,1154219
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1153724,1154006,1154219,1154369
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug  5 20:22:29 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1151306
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1153724,1154006,1154219
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1153724,1154006,1154219,1154369
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1154371&r1=1154370&r2=1154371&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 Fri Aug  5 20:22:29 2011
@@ -46,6 +46,34 @@ import org.slf4j.LoggerFactory;
 public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
 {
     protected static Logger logger = 
LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
+    public static final RejectedExecutionHandler blockingExecutionHandler = 
new RejectedExecutionHandler()
+    {
+        public void rejectedExecution(Runnable task, ThreadPoolExecutor 
executor)
+        {
+            ((DebuggableThreadPoolExecutor) executor).onInitialRejection(task);
+            BlockingQueue<Runnable> queue = executor.getQueue();
+            while (true)
+            {
+                if (executor.isShutdown())
+                {
+                    ((DebuggableThreadPoolExecutor) 
executor).onFinalRejection(task);
+                    throw new RejectedExecutionException("ThreadPoolExecutor 
has shut down");
+                }
+                try
+                {
+                    if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+                    {
+                        ((DebuggableThreadPoolExecutor) 
executor).onFinalAccept(task);
+                        break;
+                    }
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
+        }
+    };
 
     public DebuggableThreadPoolExecutor(String threadPoolName, int priority)
     {
@@ -67,34 +95,7 @@ public class DebuggableThreadPoolExecuto
         // we'll just override this with a handler that retries until it gets 
in.  ugly, but effective.
         // (there is an extensive analysis of the options here at
         //  
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
-        this.setRejectedExecutionHandler(new RejectedExecutionHandler()
-        {
-            public void rejectedExecution(Runnable task, ThreadPoolExecutor 
executor)
-            {
-                
((DebuggableThreadPoolExecutor)executor).onInitialRejection(task);
-                BlockingQueue<Runnable> queue = executor.getQueue();
-                while (true)
-                {
-                    if (executor.isShutdown())
-                    {
-                        
((DebuggableThreadPoolExecutor)executor).onFinalRejection(task);
-                        throw new 
RejectedExecutionException("ThreadPoolExecutor has shut down");
-                    }
-                    try
-                    {
-                        if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
-                        {
-                            
((DebuggableThreadPoolExecutor)executor).onFinalAccept(task);
-                            break;
-                        }
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new AssertionError(e);
-                    }
-                }
-            }
-        });
+        this.setRejectedExecutionHandler(blockingExecutionHandler);
     }
 
     protected void onInitialRejection(Runnable task) {}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java?rev=1154371&r1=1154370&r2=1154371&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
 Fri Aug  5 20:22:29 2011
@@ -181,7 +181,7 @@ implements CloseableIterator<AbstractCom
 
             try
             {
-                AbstractCompactedRow compactedRow = 
controller.getCompactedRow(rows);
+                AbstractCompactedRow compactedRow = 
controller.getCompactedRow(new ArrayList<SSTableIdentityIterator>(rows));
                 if (compactedRow.isEmpty())
                 {
                     controller.invalidateCachedRow(compactedRow.key);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1154371&r1=1154370&r2=1154371&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
 Fri Aug  5 20:22:29 2011
@@ -25,14 +25,16 @@ import java.io.DataOutput;
 import java.io.IOError;
 import java.io.IOException;
 import java.security.MessageDigest;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -52,6 +54,8 @@ import org.apache.cassandra.utils.MergeI
  */
 public class LazilyCompactedRow extends AbstractCompactedRow implements 
IIterableColumns
 {
+    private static Logger logger = 
LoggerFactory.getLogger(LazilyCompactedRow.class);
+
     private final List<SSTableIdentityIterator> rows;
     private final CompactionController controller;
     private final boolean shouldPurge;
@@ -65,11 +69,11 @@ public class LazilyCompactedRow extends 
     public LazilyCompactedRow(CompactionController controller, 
List<SSTableIdentityIterator> rows)
     {
         super(rows.get(0).getKey());
+        this.rows = rows;
         this.controller = controller;
         this.shouldPurge = controller.shouldPurge(key);
-        this.rows = new ArrayList<SSTableIdentityIterator>(rows);
 
-        for (SSTableIdentityIterator row : rows)
+        for (IColumnIterator row : rows)
         {
             ColumnFamily cf = row.getColumnFamily();
 
@@ -83,9 +87,10 @@ public class LazilyCompactedRow extends 
         headerBuffer = new DataOutputBuffer();
         ColumnIndexer.serialize(this, headerBuffer);
         // reach into the reducer used during iteration to get column count, 
size, max column timestamp
-        columnCount = reducer.size;
-        columnSerializedSize = reducer.serializedSize;
-        maxTimestamp = reducer.maxTimestampSeen;
+        // (however, if there are zero columns, iterator() will not be called 
by ColumnIndexer and reducer will be null)
+        columnCount = reducer == null ? 0 : reducer.size;
+        columnSerializedSize = reducer == null ? 0 : reducer.serializedSize;
+        maxTimestamp = reducer == null ? Long.MIN_VALUE : 
reducer.maxTimestampSeen;
         reducer = null;
     }
 
@@ -95,6 +100,9 @@ public class LazilyCompactedRow extends 
         ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut);
 
         long dataSize = headerBuffer.getLength() + clockOut.getLength() + 
columnSerializedSize;
+        if (logger.isDebugEnabled())
+            logger.debug(String.format("header / clock / column sizes are %s / 
%s / %s",
+                         headerBuffer.getLength(), clockOut.getLength(), 
columnSerializedSize));
         assert dataSize > 0;
         out.writeLong(dataSize);
         out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
@@ -107,6 +115,9 @@ public class LazilyCompactedRow extends 
             IColumn column = iter.next();
             emptyColumnFamily.getColumnSerializer().serialize(column, out);
         }
+        long secondPassColumnSize = reducer == null ? 0 : 
reducer.serializedSize;
+        assert secondPassColumnSize == columnSerializedSize
+               : "originally calculated column size of " + 
columnSerializedSize + " but now it is " + secondPassColumnSize;
 
         return dataSize;
     }
@@ -187,18 +198,13 @@ public class LazilyCompactedRow extends 
 
         protected IColumn getReduced()
         {
-            assert container != null;
-            IColumn reduced = container.iterator().next();
-            ColumnFamily purged = shouldPurge ? 
ColumnFamilyStore.removeDeleted(container, controller.gcBefore) : container;
-            if (shouldPurge && purged != null && 
purged.metadata().getDefaultValidator().isCommutative())
-            {
-                CounterColumn.removeOldShards(purged, controller.gcBefore);
-            }
+            ColumnFamily purged = 
PrecompactedRow.removeDeletedAndOldShards(shouldPurge, controller, container);
             if (purged == null || !purged.iterator().hasNext())
             {
                 container.clear();
                 return null;
             }
+            IColumn reduced = purged.iterator().next();
             container.clear();
             serializedSize += reduced.serializedSize();
             size++;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1154371&r1=1154370&r2=1154371&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
 Fri Aug  5 20:22:29 2011
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.CounterCo
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import sun.tools.tree.ThisExpression;
 
 /**
  * PrecompactedRow merges its rows in its constructor in memory.
@@ -55,11 +56,28 @@ public class PrecompactedRow extends Abs
         this.gcBefore = Integer.MAX_VALUE;
     }
 
+    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, 
CompactionController controller, ColumnFamily cf)
+    {
+        return removeDeletedAndOldShards(controller.shouldPurge(key), 
controller, cf);
+    }
+
+    public static ColumnFamily removeDeletedAndOldShards(boolean shouldPurge, 
CompactionController controller, ColumnFamily cf)
+    {
+        ColumnFamily compacted = shouldPurge ? 
ColumnFamilyStore.removeDeleted(cf, controller.gcBefore) : cf;
+        if (shouldPurge && compacted != null && 
compacted.metadata().getDefaultValidator().isCommutative())
+            CounterColumn.removeOldShards(compacted, controller.gcBefore);
+        return compacted;
+    }
+
     public PrecompactedRow(CompactionController controller, 
List<SSTableIdentityIterator> rows)
     {
         super(rows.get(0).getKey());
-        this.gcBefore = controller.gcBefore;
+        gcBefore = controller.gcBefore;
+        compactedCf = removeDeletedAndOldShards(rows.get(0).getKey(), 
controller, merge(rows));
+    }
 
+    private static ColumnFamily merge(List<SSTableIdentityIterator> rows)
+    {
         ColumnFamily cf = null;
         for (SSTableIdentityIterator row : rows)
         {
@@ -70,7 +88,7 @@ public class PrecompactedRow extends Abs
             }
             catch (IOException e)
             {
-                logger.error("Skipping row " + key + " in " + row.getPath(), 
e);
+                logger.error("Skipping row " + row.getKey() + " in " + 
row.getPath(), e);
                 continue;
             }
             if (cf == null)
@@ -82,48 +100,38 @@ public class PrecompactedRow extends Abs
                 cf.addAll(thisCF);
             }
         }
-        boolean shouldPurge = controller.shouldPurge(key);
-        compactedCf = shouldPurge ? ColumnFamilyStore.removeDeleted(cf, 
controller.gcBefore) : cf;
-        if (shouldPurge && compactedCf != null && 
compactedCf.metadata().getDefaultValidator().isCommutative())
-        {
-            CounterColumn.removeOldShards(compactedCf, controller.gcBefore);
-        }
+        return cf;
     }
 
     public long write(DataOutput out) throws IOException
     {
-        if (compactedCf != null)
-        {
-            DataOutputBuffer buffer = new DataOutputBuffer();
-            DataOutputBuffer headerBuffer = new DataOutputBuffer();
-            ColumnIndexer.serialize(compactedCf, headerBuffer);
-            ColumnFamily.serializer().serializeForSSTable(compactedCf, buffer);
-            int dataSize = headerBuffer.getLength() + buffer.getLength();
-            out.writeLong(dataSize);
-            out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
-            out.write(buffer.getData(), 0, buffer.getLength());
-            return dataSize;
-        }
-        return 0;
+        assert compactedCf != null;
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        DataOutputBuffer headerBuffer = new DataOutputBuffer();
+        ColumnIndexer.serialize(compactedCf, headerBuffer);
+        ColumnFamily.serializer().serializeForSSTable(compactedCf, buffer);
+        int dataSize = headerBuffer.getLength() + buffer.getLength();
+        out.writeLong(dataSize);
+        out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
+        out.write(buffer.getData(), 0, buffer.getLength());
+        return dataSize;
     }
 
     public void update(MessageDigest digest)
     {
-        if (compactedCf != null)
+        assert compactedCf != null;
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        try
         {
-            DataOutputBuffer buffer = new DataOutputBuffer();
-            try
-            {
-                ColumnFamily.serializer().serializeCFInfo(compactedCf, buffer);
-                buffer.writeInt(compactedCf.getColumnCount());
-                digest.update(buffer.getData(), 0, buffer.getLength());
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            compactedCf.updateDigest(digest);
+            ColumnFamily.serializer().serializeCFInfo(compactedCf, buffer);
+            buffer.writeInt(compactedCf.getColumnCount());
+            digest.update(buffer.getData(), 0, buffer.getLength());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
         }
+        compactedCf.updateDigest(digest);
     }
 
     public boolean isEmpty()

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1154371&r1=1154370&r2=1154371&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
 Fri Aug  5 20:22:29 2011
@@ -32,6 +32,7 @@ import org.apache.cassandra.CleanupHelpe
 import org.apache.cassandra.Util;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.PrecompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
@@ -150,7 +151,8 @@ public abstract class AntiEntropyService
         validator.prepare(store);
 
         // add a row
-        validator.add(new PrecompactedRow(new DecoratedKey(mid, 
ByteBufferUtil.bytes("inconceivable!")), null));
+        validator.add(new PrecompactedRow(new DecoratedKey(mid, 
ByteBufferUtil.bytes("inconceivable!")),
+                                          new 
ColumnFamily(DatabaseDescriptor.getCFMetaData(tablename, cfname))));
         validator.completeTree();
 
         // confirm that the tree was validated


Reply via email to