Author: jbellis
Date: Fri Aug  5 20:19:45 2011
New Revision: 1154369

URL: http://svn.apache.org/viewvc?rev=1154369&view=rev
Log:
refactorings and corner-case bug fixes:
- avoid modifying the List of rows after passing it to a LazilyCompactedRow
- account for possibility that all data compacted by LCR has expired
- clean up code duplication around shouldPurge cleanup
patch by jbellis; reviewed by slebresne for CASSANDRA-2901

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/conf/cassandra.yaml
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
    
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Aug  5 20:19:45 2011
@@ -1,6 +1,7 @@
 0.8.4
  * include files-to-be-streamed in StreamInSession.getSources (CASSANDRA-2972)
 
+
 0.8.3
  * add ability to drop local reads/writes that are going to timeout
    (CASSANDRA-2943)

Modified: cassandra/branches/cassandra-0.8/conf/cassandra.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/conf/cassandra.yaml?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.8/conf/cassandra.yaml Fri Aug  5 20:19:45 
2011
@@ -272,13 +272,13 @@ column_index_size_in_kb: 64
 # will be logged specifying the row key.
 in_memory_compaction_limit_in_mb: 64
 
-# Number of compaction threads (NOT including validation "compactions"
-# for anti-entropy repair). This default to the number of processors,
-# enabling multiple compactions to execute at once. Using more than one
-# thread is highly recommended to preserve read performance in a mixed
-# read/write workload as this avoids sstables from accumulating during long
-# running compactions. The default is usually fine and if you experience
-# problems with compaction running too slowly or too fast, you should look at
+# Number of simultaneous compactions to allow, NOT including
+# validation "compactions" for anti-entropy repair. This defaults to
+# the number of cores. This can help preserve read performance in a
+# mixed read/write workload, by mitigating the tendency of small
+# sstables to accumulate during a single long running compactions. The
+# default is usually fine and if you experience problems with
+# compaction running too slowly or too fast, you should look at
 # compaction_throughput_mb_per_sec first.
 #
 # Uncomment to make compaction mono-threaded.

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 Fri Aug  5 20:19:45 2011
@@ -46,6 +46,34 @@ import org.slf4j.LoggerFactory;
 public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
 {
     protected static Logger logger = 
LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
+    public static final RejectedExecutionHandler blockingExecutionHandler = 
new RejectedExecutionHandler()
+    {
+        public void rejectedExecution(Runnable task, ThreadPoolExecutor 
executor)
+        {
+            ((DebuggableThreadPoolExecutor) executor).onInitialRejection(task);
+            BlockingQueue<Runnable> queue = executor.getQueue();
+            while (true)
+            {
+                if (executor.isShutdown())
+                {
+                    ((DebuggableThreadPoolExecutor) 
executor).onFinalRejection(task);
+                    throw new RejectedExecutionException("ThreadPoolExecutor 
has shut down");
+                }
+                try
+                {
+                    if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+                    {
+                        ((DebuggableThreadPoolExecutor) 
executor).onFinalAccept(task);
+                        break;
+                    }
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
+        }
+    };
 
     public DebuggableThreadPoolExecutor(String threadPoolName, int priority)
     {
@@ -67,34 +95,7 @@ public class DebuggableThreadPoolExecuto
         // we'll just override this with a handler that retries until it gets 
in.  ugly, but effective.
         // (there is an extensive analysis of the options here at
         //  
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
-        this.setRejectedExecutionHandler(new RejectedExecutionHandler()
-        {
-            public void rejectedExecution(Runnable task, ThreadPoolExecutor 
executor)
-            {
-                
((DebuggableThreadPoolExecutor)executor).onInitialRejection(task);
-                BlockingQueue<Runnable> queue = executor.getQueue();
-                while (true)
-                {
-                    if (executor.isShutdown())
-                    {
-                        
((DebuggableThreadPoolExecutor)executor).onFinalRejection(task);
-                        throw new 
RejectedExecutionException("ThreadPoolExecutor has shut down");
-                    }
-                    try
-                    {
-                        if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
-                        {
-                            
((DebuggableThreadPoolExecutor)executor).onFinalAccept(task);
-                            break;
-                        }
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new AssertionError(e);
-                    }
-                }
-            }
-        });
+        this.setRejectedExecutionHandler(blockingExecutionHandler);
     }
 
     protected void onInitialRejection(Runnable task) {}

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
 Fri Aug  5 20:19:45 2011
@@ -120,7 +120,7 @@ implements Closeable, CompactionInfo.Hol
 
         try
         {
-            AbstractCompactedRow compactedRow = 
controller.getCompactedRow(rows);
+            AbstractCompactedRow compactedRow = controller.getCompactedRow(new 
ArrayList<SSTableIdentityIterator>(rows));
             if (compactedRow.isEmpty())
             {
                 controller.invalidateCachedRow(compactedRow.key);

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
 Fri Aug  5 20:19:45 2011
@@ -25,7 +25,6 @@ import java.io.DataOutput;
 import java.io.IOError;
 import java.io.IOException;
 import java.security.MessageDigest;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -33,7 +32,11 @@ import com.google.common.base.Predicates
 import com.google.common.collect.Iterators;
 import org.apache.commons.collections.iterators.CollatingIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -53,23 +56,25 @@ import org.apache.cassandra.utils.Reduci
  */
 public class LazilyCompactedRow extends AbstractCompactedRow implements 
IIterableColumns
 {
+    private static Logger logger = 
LoggerFactory.getLogger(LazilyCompactedRow.class);
+
     private final List<SSTableIdentityIterator> rows;
     private final CompactionController controller;
     private final boolean shouldPurge;
     private final DataOutputBuffer headerBuffer;
     private ColumnFamily emptyColumnFamily;
-    private LazyColumnIterator iter;
+    private LazyColumnIterator reducer;
     private int columnCount;
     private long columnSerializedSize;
 
     public LazilyCompactedRow(CompactionController controller, 
List<SSTableIdentityIterator> rows)
     {
         super(rows.get(0).getKey());
+        this.rows = rows;
         this.controller = controller;
         this.shouldPurge = controller.shouldPurge(key);
-        this.rows = new ArrayList<SSTableIdentityIterator>(rows);
 
-        for (SSTableIdentityIterator row : rows)
+        for (IColumnIterator row : rows)
         {
             ColumnFamily cf = row.getColumnFamily();
 
@@ -83,9 +88,10 @@ public class LazilyCompactedRow extends 
         headerBuffer = new DataOutputBuffer();
         ColumnIndexer.serialize(this, headerBuffer);
         // reach into iterator used by ColumnIndexer to get column count and 
size
-        columnCount = iter.size;
-        columnSerializedSize = iter.serializedSize;
-        iter = null;
+        // (however, if there are zero columns, iterator() will not be called 
by ColumnIndexer and reducer will be null)
+        columnCount = reducer == null ? 0 : reducer.size;
+        columnSerializedSize = reducer == null ? 0 : reducer.serializedSize;
+        reducer = null;
     }
 
     public void write(DataOutput out) throws IOException
@@ -94,6 +100,9 @@ public class LazilyCompactedRow extends 
         ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut);
 
         long dataSize = headerBuffer.getLength() + clockOut.getLength() + 
columnSerializedSize;
+        if (logger.isDebugEnabled())
+            logger.debug(String.format("header / clock / column sizes are %s / 
%s / %s",
+                         headerBuffer.getLength(), clockOut.getLength(), 
columnSerializedSize));
         assert dataSize > 0;
         out.writeLong(dataSize);
         out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
@@ -106,6 +115,9 @@ public class LazilyCompactedRow extends 
             IColumn column = iter.next();
             emptyColumnFamily.getColumnSerializer().serialize(column, out);
         }
+        long secondPassColumnSize = reducer == null ? 0 : 
reducer.serializedSize;
+        assert secondPassColumnSize == columnSerializedSize
+               : "originally calculated column size of " + 
columnSerializedSize + " but now it is " + secondPassColumnSize;
     }
 
     public void update(MessageDigest digest)
@@ -157,8 +169,8 @@ public class LazilyCompactedRow extends 
         {
             row.reset();
         }
-        iter = new LazyColumnIterator(new 
CollatingIterator(getComparator().columnComparator, rows));
-        return Iterators.filter(iter, Predicates.notNull());
+        reducer = new LazyColumnIterator(new 
CollatingIterator(getComparator().columnComparator, rows));
+        return Iterators.filter(reducer, Predicates.notNull());
     }
 
     public int columnCount()
@@ -190,18 +202,13 @@ public class LazilyCompactedRow extends 
 
         protected IColumn getReduced()
         {
-            assert container != null;
-            IColumn reduced = container.iterator().next();
-            ColumnFamily purged = shouldPurge ? 
ColumnFamilyStore.removeDeleted(container, controller.gcBefore) : container;
-            if (shouldPurge && purged != null && 
purged.metadata().getDefaultValidator().isCommutative())
-            {
-                CounterColumn.removeOldShards(purged, controller.gcBefore);
-            }
+            ColumnFamily purged = 
PrecompactedRow.removeDeletedAndOldShards(shouldPurge, controller, container);
             if (purged == null || !purged.iterator().hasNext())
             {
                 container.clear();
                 return null;
             }
+            IColumn reduced = purged.iterator().next();
             container.clear();
             serializedSize += reduced.serializedSize();
             size++;

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
 Fri Aug  5 20:19:45 2011
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.CounterCo
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import sun.tools.tree.ThisExpression;
 
 /**
  * PrecompactedRow merges its rows in its constructor in memory.
@@ -55,11 +56,28 @@ public class PrecompactedRow extends Abs
         this.gcBefore = Integer.MAX_VALUE;
     }
 
+    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, 
CompactionController controller, ColumnFamily cf)
+    {
+        return removeDeletedAndOldShards(controller.shouldPurge(key), 
controller, cf);
+    }
+
+    public static ColumnFamily removeDeletedAndOldShards(boolean shouldPurge, 
CompactionController controller, ColumnFamily cf)
+    {
+        ColumnFamily compacted = shouldPurge ? 
ColumnFamilyStore.removeDeleted(cf, controller.gcBefore) : cf;
+        if (shouldPurge && compacted != null && 
compacted.metadata().getDefaultValidator().isCommutative())
+            CounterColumn.removeOldShards(compacted, controller.gcBefore);
+        return compacted;
+    }
+
     public PrecompactedRow(CompactionController controller, 
List<SSTableIdentityIterator> rows)
     {
         super(rows.get(0).getKey());
-        this.gcBefore = controller.gcBefore;
+        gcBefore = controller.gcBefore;
+        compactedCf = removeDeletedAndOldShards(rows.get(0).getKey(), 
controller, merge(rows));
+    }
 
+    private static ColumnFamily merge(List<SSTableIdentityIterator> rows)
+    {
         ColumnFamily cf = null;
         for (SSTableIdentityIterator row : rows)
         {
@@ -70,7 +88,7 @@ public class PrecompactedRow extends Abs
             }
             catch (IOException e)
             {
-                logger.error("Skipping row " + key + " in " + row.getPath(), 
e);
+                logger.error("Skipping row " + row.getKey() + " in " + 
row.getPath(), e);
                 continue;
             }
             if (cf == null)
@@ -82,45 +100,36 @@ public class PrecompactedRow extends Abs
                 cf.addAll(thisCF);
             }
         }
-        boolean shouldPurge = controller.shouldPurge(key);
-        compactedCf = shouldPurge ? ColumnFamilyStore.removeDeleted(cf, 
controller.gcBefore) : cf;
-        if (shouldPurge && compactedCf != null && 
compactedCf.metadata().getDefaultValidator().isCommutative())
-        {
-            CounterColumn.removeOldShards(compactedCf, controller.gcBefore);
-        }
+        return cf;
     }
 
     public void write(DataOutput out) throws IOException
     {
-        if (compactedCf != null)
-        {
-            DataOutputBuffer buffer = new DataOutputBuffer();
-            DataOutputBuffer headerBuffer = new DataOutputBuffer();
-            ColumnIndexer.serialize(compactedCf, headerBuffer);
-            ColumnFamily.serializer().serializeForSSTable(compactedCf, buffer);
-            out.writeLong(headerBuffer.getLength() + buffer.getLength());
-            out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
-            out.write(buffer.getData(), 0, buffer.getLength());
-        }
+        assert compactedCf != null;
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        DataOutputBuffer headerBuffer = new DataOutputBuffer();
+        ColumnIndexer.serialize(compactedCf, headerBuffer);
+        ColumnFamily.serializer().serializeForSSTable(compactedCf, buffer);
+        out.writeLong(headerBuffer.getLength() + buffer.getLength());
+        out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
+        out.write(buffer.getData(), 0, buffer.getLength());
     }
 
     public void update(MessageDigest digest)
     {
-        if (compactedCf != null)
+        assert compactedCf != null;
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        try
         {
-            DataOutputBuffer buffer = new DataOutputBuffer();
-            try
-            {
-                ColumnFamily.serializer().serializeCFInfo(compactedCf, buffer);
-                buffer.writeInt(compactedCf.getColumnCount());
-                digest.update(buffer.getData(), 0, buffer.getLength());
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            compactedCf.updateDigest(digest);
+            ColumnFamily.serializer().serializeCFInfo(compactedCf, buffer);
+            buffer.writeInt(compactedCf.getColumnCount());
+            digest.update(buffer.getData(), 0, buffer.getLength());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
         }
+        compactedCf.updateDigest(digest);
     }
 
     public boolean isEmpty()

Modified: 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
 Fri Aug  5 20:19:45 2011
@@ -32,6 +32,7 @@ import org.apache.cassandra.CleanupHelpe
 import org.apache.cassandra.Util;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.PrecompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
@@ -150,7 +151,8 @@ public abstract class AntiEntropyService
         validator.prepare(store);
 
         // add a row
-        validator.add(new PrecompactedRow(new DecoratedKey(mid, 
ByteBufferUtil.bytes("inconceivable!")), null));
+        validator.add(new PrecompactedRow(new DecoratedKey(mid, 
ByteBufferUtil.bytes("inconceivable!")),
+                                          new 
ColumnFamily(DatabaseDescriptor.getCFMetaData(tablename, cfname))));
         validator.completeTree();
 
         // confirm that the tree was validated


Reply via email to