Updated Branches:
  refs/heads/trunk dbcf00df8 -> 637af9d78

optimize batchlog flushing to skip successful batches
patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4667


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/637af9d7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/637af9d7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/637af9d7

Branch: refs/heads/trunk
Commit: 637af9d78272d4c849d865a5cfe4b041ba86f35e
Parents: dbcf00d
Author: Jonathan Ellis <[email protected]>
Authored: Wed Oct 3 12:47:45 2012 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Wed Oct 3 16:30:13 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/config/CFMetaData.java    |    3 +-
 .../org/apache/cassandra/db/BatchlogManager.java   |   31 ++++++++++++---
 src/java/org/apache/cassandra/db/Memtable.java     |    8 ++++
 4 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3fa5997..01e03d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-beta2
+ * optimize batchlog flushing to skip successful batches (CASSANDRA-4667)
  * include metadata for system keyspace itself in schema tables 
(CASSANDRA-4416)
  * add check to PropertyFileSnitch to verify presence of location for
    local node (CASSANDRA-4728)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index 8aa4422..6abeb33 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -198,7 +198,8 @@ public final class CFMetaData
                                                             + "id uuid PRIMARY 
KEY,"
                                                             + "written_at 
timestamp,"
                                                             + "data blob"
-                                                            + ") WITH 
COMMENT='uncommited batches' AND gc_grace_seconds=0");
+                                                            + ") WITH 
COMMENT='uncommited batches' AND gc_grace_seconds=0 "
+                                                            + "AND 
COMPACTION={'class' : 'SizeTieredCompactionStrategy', 'min_threshold' : 2}");
 
     public static final CFMetaData RangeXfersCf = compile(17, "CREATE TABLE " 
+ SystemTable.RANGE_XFERS_CF + " ("
                                                               + "token_bytes 
blob PRIMARY KEY,"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java 
b/src/java/org/apache/cassandra/db/BatchlogManager.java
index f732ece..e2a9c0d 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -46,12 +48,15 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.WrappedRunnable;
 
 public class BatchlogManager implements BatchlogManagerMBean
 {
@@ -81,9 +86,9 @@ public class BatchlogManager implements BatchlogManagerMBean
             throw new RuntimeException(e);
         }
 
-        Runnable runnable = new Runnable()
+        Runnable runnable = new WrappedRunnable()
         {
-            public void run()
+            public void runMayThrow() throws ExecutionException, 
InterruptedException
             {
                 replayAllFailedBatches();
             }
@@ -114,9 +119,9 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public void forceBatchlogReplay()
     {
-        Runnable runnable = new Runnable()
+        Runnable runnable = new WrappedRunnable()
         {
-            public void run()
+            public void runMayThrow() throws ExecutionException, 
InterruptedException
             {
                 replayAllFailedBatches();
             }
@@ -158,7 +163,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         return ByteBuffer.wrap(bos.toByteArray());
     }
 
-    private void replayAllFailedBatches()
+    private void replayAllFailedBatches() throws ExecutionException, 
InterruptedException
     {
         if (!isReplaying.compareAndSet(false, true))
             return;
@@ -176,6 +181,8 @@ public class BatchlogManager implements BatchlogManagerMBean
                 if (writtenAt == null || System.currentTimeMillis() > 
LongType.instance.compose(writtenAt.value()) + TIMEOUT)
                     replayBatch(row.key);
             }
+
+            cleanup();
         }
         finally
         {
@@ -192,7 +199,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         logger.debug("Replaying batch {}", uuid);
 
         ColumnFamilyStore store = 
Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
-        QueryFilter filter = QueryFilter.getNamesFilter(key, new 
QueryPath(SystemTable.BATCHLOG_CF), DATA);
+        QueryFilter filter = QueryFilter.getIdentityFilter(key, new 
QueryPath(SystemTable.BATCHLOG_CF));
         ColumnFamily batch = store.getColumnFamily(filter);
 
         if (batch == null || batch.isMarkedForDelete())
@@ -257,4 +264,16 @@ public class BatchlogManager implements 
BatchlogManagerMBean
         AbstractBounds<RowPosition> range = new 
Range<RowPosition>(minPosition, minPosition, partitioner);
         return store.getRangeSlice(null, range, Integer.MAX_VALUE, 
columnFilter, null);
     }
+
+    /** force flush + compaction to reclaim space from replayed batches */
+    private void cleanup() throws ExecutionException, InterruptedException
+    {
+        ColumnFamilyStore cfs = 
Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
+        cfs.forceBlockingFlush();
+        Collection<Descriptor> descriptors = new ArrayList<Descriptor>();
+        for (SSTableReader sstr : cfs.getSSTables())
+            descriptors.add(sstr.descriptor);
+        if (!descriptors.isEmpty()) // don't pollute the logs if there is 
nothing to compact.
+            CompactionManager.instance.submitUserDefined(cfs, descriptors, 
Integer.MAX_VALUE).get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index 4424811..4631690 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -440,6 +440,14 @@ public class Memtable
                     ColumnFamily cf = entry.getValue();
                     if (cf.isMarkedForDelete())
                     {
+                        // When every node is up, there's no reason to write 
batchlog data out to sstables
+                        // (which in turn incurs cost like compaction) since 
the BL write + delete cancel each other out,
+                        // and BL data is strictly local, so we don't need to 
preserve tombstones for repair.
+                        // If we have a data row + row level tombstone, then 
writing it is effectively an expensive no-op so we skip it.
+                        // See CASSANDRA-4667.
+                        if (cfs.columnFamily.equals(SystemTable.BATCHLOG_CF) 
&& cfs.table.name.equals(Table.SYSTEM_KS) && !cf.isEmpty())
+                            continue;
+
                         // Pedantically, you could purge column level 
tombstones that are past GcGRace when writing to the SSTable.
                         // But it can result in unexpected behaviour where 
deletes never make it to disk,
                         // as they are lost and so cannot override existing 
column values. So we only remove deleted columns if there

Reply via email to