Garbage-collecting compaction operation and schema option.

patch by Branimir Lambov; reviewed by Marcus Eriksson for CASSANDRA-7019


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d40ac784
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d40ac784
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d40ac784

Branch: refs/heads/trunk
Commit: d40ac784d3a8ddaf71a2df8b21745827392294cc
Parents: 2939080
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Wed Dec 23 12:50:58 2015 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Mon Aug 1 14:20:50 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  22 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |   6 +
 .../compaction/AbstractCompactionStrategy.java  |   2 +
 .../db/compaction/CompactionController.java     |  68 +++-
 .../db/compaction/CompactionIterator.java       | 243 +++++++++++-
 .../db/compaction/CompactionManager.java        |  39 +-
 .../DateTieredCompactionStrategy.java           |   2 +-
 .../compaction/LeveledCompactionStrategy.java   |  12 +-
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../cassandra/db/compaction/Scrubber.java       |  46 ++-
 .../SizeTieredCompactionStrategy.java           |   3 +-
 .../TimeWindowCompactionStrategy.java           |   4 +-
 .../cassandra/db/compaction/Verifier.java       |   2 +-
 .../org/apache/cassandra/db/rows/Cells.java     |  80 ++++
 src/java/org/apache/cassandra/db/rows/Rows.java |  77 ++++
 .../cassandra/db/rows/UnfilteredSerializer.java |  54 +++
 .../cassandra/index/sasi/SASIIndexBuilder.java  |   2 +-
 .../io/sstable/SSTableIdentityIterator.java     |  81 +++-
 .../io/sstable/SSTableSimpleIterator.java       |  75 ++++
 .../io/sstable/format/SSTableReader.java        |  18 +-
 .../io/sstable/format/big/BigTableReader.java   |  13 +-
 .../io/sstable/format/big/BigTableScanner.java  |   2 +-
 .../cassandra/schema/CompactionParams.java      |  27 +-
 .../cassandra/service/StorageService.java       |  14 +
 .../cassandra/service/StorageServiceMBean.java  |   6 +
 .../org/apache/cassandra/tools/NodeProbe.java   |  17 +-
 .../org/apache/cassandra/tools/NodeTool.java    |   1 +
 .../tools/nodetool/GarbageCollect.java          |  64 ++++
 .../cassandra/cql3/GcCompactionBench.java       | 374 ++++++++++++++++++
 .../apache/cassandra/cql3/GcCompactionTest.java | 364 ++++++++++++++++++
 .../selection/SelectionColumnMappingTest.java   |   2 +-
 .../db/compaction/CompactionIteratorTest.java   | 377 +++++++++++++++++++
 .../rows/UnfilteredRowIteratorsMergeTest.java   |  55 +--
 .../db/rows/UnfilteredRowsGenerator.java        | 340 +++++++++++++++++
 35 files changed, 2399 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 36a21e6..760cc58 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
  * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE 
statements (CASSANDRA-7190)
  * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
  * Support filtering on non-PRIMARY KEY columns in the CREATE

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 8580c7c..85f2767 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -19,6 +19,28 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - Compaction can now take into account overlapping tables that don't take 
part
+     in the compaction to look for deleted or overwritten data in the 
compacted tables.
+     Then such data is found, it can be safely discarded, which in turn should 
enable
+     the removal of tombstones over that data.
+
+     The behavior can be engaged in two ways:
+       - as a "nodetool garbagecollect -g CELL/ROW" operation, which applies
+         single-table compaction on all sstables to discard deleted data in 
one step.
+       - as a "provide_overlapping_tombstones:CELL/ROW/NONE" compaction 
strategy flag,
+         which uses overlapping tables as a source of deletions/overwrites 
during all
+         compactions.
+     The argument specifies the granularity at which deleted data is to be 
found:
+       - If ROW is specified, only whole deleted rows (or sets of rows) will be
+         discarded.
+       - If CELL is specified, any columns whose value is overwritten or 
deleted
+         will also be discarded.
+       - NONE (default) specifies the old behavior, overlapping tables are not 
used to
+         decide when to discard data.
+     Which option to use depends on your workload, both ROW and CELL increase 
the
+     disk load on compaction (especially with the size-tiered compaction 
strategy),
+     with CELL being more resource-intensive. Both should lead to better read
+     performance if deleting rows (resp. overwriting or deleting cells) is 
common.
    - Prepared statements are now persisted in the table prepared_statements in
      the system keyspace. Upon startup, this table is used to preload all
      previously prepared statements - i.e. in many cases clients do not need to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 594da98..20dac1e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -71,6 +71,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.metrics.TableMetrics.Sampler;
 import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
@@ -1580,6 +1581,11 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return CompactionManager.instance.relocateSSTables(this, jobs);
     }
 
+    public CompactionManager.AllSSTableOpStatus garbageCollect(TombstoneOption 
tombstoneOption, int jobs) throws ExecutionException, InterruptedException
+    {
+        return CompactionManager.instance.performGarbageCollection(this, 
tombstoneOption, jobs);
+    }
+
     public void markObsolete(Collection<SSTableReader> sstables, OperationType 
compactionType)
     {
         assert !sstables.isEmpty();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 4728ec3..83592f0 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -492,6 +493,7 @@ public abstract class AbstractCompactionStrategy
         uncheckedOptions.remove(LOG_ALL_OPTION);
         uncheckedOptions.remove(COMPACTION_ENABLED);
         uncheckedOptions.remove(ONLY_PURGE_REPAIRED_TOMBSTONES);
+        
uncheckedOptions.remove(CompactionParams.Option.PROVIDE_OVERLAPPING_TOMBSTONES.toString());
         return uncheckedOptions;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionController.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e42e7a1..08ad0c0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -20,18 +20,23 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 
 import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
-
 import org.apache.cassandra.utils.OverlapIterator;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -53,6 +58,10 @@ public class CompactionController implements AutoCloseable
     private Refs<SSTableReader> overlappingSSTables;
     private OverlapIterator<PartitionPosition, SSTableReader> overlapIterator;
     private final Iterable<SSTableReader> compacting;
+    private final RateLimiter limiter;
+    private final long minTimestamp;
+    final TombstoneOption tombstoneOption;
+    final Map<SSTableReader, FileDataInput> openDataFiles = new HashMap<>();
 
     public final int gcBefore;
 
@@ -63,11 +72,23 @@ public class CompactionController implements AutoCloseable
 
     public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> 
compacting, int gcBefore)
     {
+        this(cfs, compacting, gcBefore,
+             CompactionManager.instance.getRateLimiter(),
+             
cfs.getCompactionStrategyManager().getCompactionParams().tombstoneOption());
+    }
+
+    public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> 
compacting, int gcBefore, RateLimiter limiter, TombstoneOption tombstoneOption)
+    {
         assert cfs != null;
         this.cfs = cfs;
         this.gcBefore = gcBefore;
         this.compacting = compacting;
+        this.limiter = limiter;
         compactingRepaired = compacting != null && 
compacting.stream().allMatch(SSTableReader::isRepaired);
+        this.tombstoneOption = tombstoneOption;
+        this.minTimestamp = compacting != null && !compacting.isEmpty()       
// check needed for test
+                          ? 
compacting.stream().mapToLong(SSTableReader::getMinTimestamp).min().getAsLong()
+                          : 0;
         refreshOverlaps();
         if (NEVER_PURGE_TOMBSTONES)
             logger.warn("You are running with 
-Dcassandra.never_purge_tombstones=true, this is dangerous!");
@@ -97,7 +118,7 @@ public class CompactionController implements AutoCloseable
             return;
 
         if (this.overlappingSSTables != null)
-            overlappingSSTables.release();
+            close();
 
         if (compacting == null)
             overlappingSSTables = 
Refs.tryRef(Collections.<SSTableReader>emptyList());
@@ -228,6 +249,9 @@ public class CompactionController implements AutoCloseable
     {
         if (overlappingSSTables != null)
             overlappingSSTables.release();
+
+        FileUtils.closeQuietly(openDataFiles.values());
+        openDataFiles.clear();
     }
 
     public boolean compactingRepaired()
@@ -235,4 +259,38 @@ public class CompactionController implements AutoCloseable
         return 
!cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones() || 
compactingRepaired;
     }
 
+    boolean provideTombstoneSources()
+    {
+        return tombstoneOption != TombstoneOption.NONE;
+    }
+
+    // caller must close iterators
+    public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey key, 
boolean tombstoneOnly)
+    {
+        if (!provideTombstoneSources() || !compactingRepaired() || 
NEVER_PURGE_TOMBSTONES)
+            return null;
+        overlapIterator.update(key);
+        return Iterables.filter(Iterables.transform(overlapIterator.overlaps(),
+                                                    reader -> 
getShadowIterator(reader, key, tombstoneOnly)),
+                                Predicates.notNull());
+    }
+
+    @SuppressWarnings("resource") // caller to close
+    private UnfilteredRowIterator getShadowIterator(SSTableReader reader, 
DecoratedKey key, boolean tombstoneOnly)
+    {
+        if (reader.isMarkedSuspect() ||
+            reader.getMaxTimestamp() <= minTimestamp ||
+            tombstoneOnly && !reader.hasTombstones())
+            return null;
+        RowIndexEntry<?> position = reader.getPosition(key, 
SSTableReader.Operator.EQ);
+        if (position == null)
+            return null;
+        FileDataInput dfile = openDataFiles.computeIfAbsent(reader, 
this::openDataFile);
+        return reader.simpleIterator(dfile, key, position, tombstoneOnly);
+    }
+
+    private FileDataInput openDataFile(SSTableReader reader)
+    {
+        return limiter != null ? reader.openDataReader(limiter) : 
reader.openDataReader();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 0111aec..c4edfa6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -17,14 +17,13 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.Ordering;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.partitions.PurgeFunction;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
@@ -33,6 +32,7 @@ import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.index.transactions.CompactionTransaction;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.metrics.CompactionMetrics;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 
 /**
  * Merge multiple iterators over the content of sstable into a "compacted" 
iterator.
@@ -52,7 +52,6 @@ import org.apache.cassandra.metrics.CompactionMetrics;
  */
 public class CompactionIterator extends CompactionInfo.Holder implements 
UnfilteredPartitionIterator
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(CompactionIterator.class);
     private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;
 
     private final OperationType type;
@@ -104,6 +103,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                                              ? 
EmptyIterators.unfilteredPartition(controller.cfs.metadata, false)
                                              : 
UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
         boolean isForThrift = merged.isForThrift(); // to stop capture of 
iterator in Purger, which is confusing for debug
+        merged = Transformation.apply(merged, new GarbageSkipper(controller, 
nowInSec));
         this.compacted = Transformation.apply(merged, new Purger(isForThrift, 
controller, nowInSec));
     }
 
@@ -313,4 +313,237 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             return maxPurgeableTimestamp;
         }
     }
+
+    /**
+     * Unfiltered row iterator that removes deleted data as provided by a 
"tombstone source" for the partition.
+     * The result produced by this iterator is such that when merged with 
tombSource it produces the same output
+     * as the merge of dataSource and tombSource.
+     */
+    private static class GarbageSkippingUnfilteredRowIterator extends 
WrappingUnfilteredRowIterator
+    {
+        final UnfilteredRowIterator tombSource;
+        final DeletionTime partitionLevelDeletion;
+        final Row staticRow;
+        final ColumnFilter cf;
+        final int nowInSec;
+        final CFMetaData metadata;
+        final boolean cellLevelGC;
+
+        DeletionTime tombOpenDeletionTime = DeletionTime.LIVE;
+        DeletionTime dataOpenDeletionTime = DeletionTime.LIVE;
+        DeletionTime openDeletionTime = DeletionTime.LIVE;
+        DeletionTime partitionDeletionTime;
+        DeletionTime activeDeletionTime;
+        Unfiltered tombNext = null;
+        Unfiltered dataNext = null;
+        Unfiltered next = null;
+
+        /**
+         * Construct an iterator that filters out data shadowed by the 
provided "tombstone source".
+         *
+         * @param dataSource The input row. The result is a filtered version 
of this.
+         * @param tombSource Tombstone source, i.e. iterator used to identify 
deleted data in the input row.
+         * @param nowInSec Current time, used in choosing the winner when cell 
expiration is involved.
+         * @param cellLevelGC If false, the iterator will only look at 
row-level deletion times and tombstones.
+         *                    If true, deleted or overwritten cells within a 
surviving row will also be removed.
+         */
+        protected GarbageSkippingUnfilteredRowIterator(UnfilteredRowIterator 
dataSource, UnfilteredRowIterator tombSource, int nowInSec, boolean cellLevelGC)
+        {
+            super(dataSource);
+            this.tombSource = tombSource;
+            this.nowInSec = nowInSec;
+            this.cellLevelGC = cellLevelGC;
+            metadata = dataSource.metadata();
+            cf = ColumnFilter.all(metadata);
+
+            activeDeletionTime = partitionDeletionTime = 
tombSource.partitionLevelDeletion();
+
+            // Only preserve partition level deletion if not shadowed. (Note: 
Shadowing deletion must not be copied.)
+            this.partitionLevelDeletion = 
dataSource.partitionLevelDeletion().supersedes(tombSource.partitionLevelDeletion())
 ?
+                    dataSource.partitionLevelDeletion() :
+                    DeletionTime.LIVE;
+
+            Row dataStaticRow = garbageFilterRow(dataSource.staticRow(), 
tombSource.staticRow());
+            this.staticRow = dataStaticRow != null ? dataStaticRow : 
Rows.EMPTY_STATIC_ROW;
+
+            tombNext = advance(tombSource);
+            dataNext = advance(dataSource);
+        }
+
+        private static Unfiltered advance(UnfilteredRowIterator source)
+        {
+            return source.hasNext() ? source.next() : null;
+        }
+
+        @Override
+        public DeletionTime partitionLevelDeletion()
+        {
+            return partitionLevelDeletion;
+        }
+
+        public void close()
+        {
+            super.close();
+            tombSource.close();
+        }
+
+        @Override
+        public Row staticRow()
+        {
+            return staticRow;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            // Produce the next element. This may consume multiple elements 
from both inputs until we find something
+            // from dataSource that is still live. We track the currently open 
deletion in both sources, as well as the
+            // one we have last issued to the output. The tombOpenDeletionTime 
is used to filter out content; the others
+            // to decide whether or not a tombstone is superseded, and to be 
able to surface (the rest of) a deletion
+            // range from the input when a suppressing deletion ends.
+            while (next == null && dataNext != null)
+            {
+                int cmp = tombNext == null ? -1 : 
metadata.comparator.compare(dataNext, tombNext);
+                if (cmp < 0)
+                {
+                    if (dataNext.isRow())
+                        next = ((Row) dataNext).filter(cf, activeDeletionTime, 
false, metadata);
+                    else
+                        next = processDataMarker();
+                }
+                else if (cmp == 0)
+                {
+                    if (dataNext.isRow())
+                    {
+                        next = garbageFilterRow((Row) dataNext, (Row) 
tombNext);
+                    }
+                    else
+                    {
+                        tombOpenDeletionTime = 
updateOpenDeletionTime(tombOpenDeletionTime, tombNext);
+                        activeDeletionTime = 
Ordering.natural().max(partitionDeletionTime,
+                                                                    
tombOpenDeletionTime);
+                        next = processDataMarker();
+                    }
+                }
+                else // (cmp > 0)
+                {
+                    if (tombNext.isRangeTombstoneMarker())
+                    {
+                        tombOpenDeletionTime = 
updateOpenDeletionTime(tombOpenDeletionTime, tombNext);
+                        activeDeletionTime = 
Ordering.natural().max(partitionDeletionTime,
+                                                                    
tombOpenDeletionTime);
+                        boolean supersededBefore = openDeletionTime.isLive();
+                        boolean supersededAfter = 
!dataOpenDeletionTime.supersedes(activeDeletionTime);
+                        // If a range open was not issued because it was 
superseded and the deletion isn't superseded any more, we need to open it now.
+                        if (supersededBefore && !supersededAfter)
+                            next = new 
RangeTombstoneBoundMarker(((RangeTombstoneMarker) 
tombNext).closeBound(false).invert(), dataOpenDeletionTime);
+                        // If the deletion begins to be superseded, we don't 
close the range yet. This can save us a close/open pair if it ends after the 
superseding range.
+                    }
+                }
+
+                if (next instanceof RangeTombstoneMarker)
+                    openDeletionTime = 
updateOpenDeletionTime(openDeletionTime, next);
+
+                if (cmp <= 0)
+                    dataNext = advance(wrapped);
+                if (cmp >= 0)
+                    tombNext = advance(tombSource);
+            }
+            return next != null;
+        }
+
+        protected Row garbageFilterRow(Row dataRow, Row tombRow)
+        {
+            if (cellLevelGC)
+            {
+                return Rows.removeShadowedCells(dataRow, tombRow, 
activeDeletionTime, nowInSec);
+            }
+            else
+            {
+                DeletionTime deletion = 
Ordering.natural().max(tombRow.deletion().time(),
+                                                               
activeDeletionTime);
+                return dataRow.filter(cf, deletion, false, metadata);
+            }
+        }
+
+        /**
+         * Decide how to act on a tombstone marker from the input iterator. We 
can decide what to issue depending on
+         * whether or not the ranges before and after the marker are 
superseded/live -- if none are, we can reuse the
+         * marker; if both are, the marker can be ignored; otherwise we issue 
a corresponding start/end marker.
+         */
+        private RangeTombstoneMarker processDataMarker()
+        {
+            dataOpenDeletionTime = 
updateOpenDeletionTime(dataOpenDeletionTime, dataNext);
+            boolean supersededBefore = openDeletionTime.isLive();
+            boolean supersededAfter = 
!dataOpenDeletionTime.supersedes(activeDeletionTime);
+            RangeTombstoneMarker marker = (RangeTombstoneMarker) dataNext;
+            if (!supersededBefore)
+                if (!supersededAfter)
+                    return marker;
+                else
+                    return new 
RangeTombstoneBoundMarker(marker.closeBound(false), 
marker.closeDeletionTime(false));
+            else
+                if (!supersededAfter)
+                    return new 
RangeTombstoneBoundMarker(marker.openBound(false), 
marker.openDeletionTime(false));
+                else
+                    return null;
+        }
+
+        @Override
+        public Unfiltered next()
+        {
+            if (!hasNext())
+                throw new IllegalStateException();
+
+            Unfiltered v = next;
+            next = null;
+            return v;
+        }
+
+        private DeletionTime updateOpenDeletionTime(DeletionTime 
openDeletionTime, Unfiltered next)
+        {
+            RangeTombstoneMarker marker = (RangeTombstoneMarker) next;
+            assert openDeletionTime.isLive() == !marker.isClose(false);
+            assert openDeletionTime.isLive() || 
openDeletionTime.equals(marker.closeDeletionTime(false));
+            return marker.isOpen(false) ? marker.openDeletionTime(false) : 
DeletionTime.LIVE;
+        }
+    }
+
+    /**
+     * Partition transformation applying GarbageSkippingUnfilteredRowIterator, 
obtaining tombstone sources for each
+     * partition using the controller's shadowSources method.
+     */
+    private static class GarbageSkipper extends 
Transformation<UnfilteredRowIterator>
+    {
+        final int nowInSec;
+        final CompactionController controller;
+        final boolean cellLevelGC;
+
+        private GarbageSkipper(CompactionController controller, int nowInSec)
+        {
+            this.controller = controller;
+            this.nowInSec = nowInSec;
+            cellLevelGC = controller.tombstoneOption == TombstoneOption.CELL;
+        }
+
+        @Override
+        protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
+        {
+            Iterable<UnfilteredRowIterator> sources = 
controller.shadowSources(partition.partitionKey(), !cellLevelGC);
+            if (sources == null)
+                return partition;
+            List<UnfilteredRowIterator> iters = new ArrayList<>();
+            for (UnfilteredRowIterator iter : sources)
+            {
+                if (!iter.isEmpty())
+                    iters.add(iter);
+                else
+                    iter.close();
+            }
+            if (iters.isEmpty())
+                return partition;
+
+            return new GarbageSkippingUnfilteredRowIterator(partition, 
UnfilteredRowIterators.merge(iters, nowInSec), nowInSec, cellLevelGC);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 519ff05..1cfc76b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -63,6 +63,7 @@ import 
org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
@@ -442,7 +443,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             public Iterable<SSTableReader> filterSSTables(LifecycleTransaction 
transaction)
             {
                 List<SSTableReader> sortedSSTables = 
Lists.newArrayList(transaction.originals());
-                Collections.sort(sortedSSTables, new 
SSTableReader.SizeComparator());
+                Collections.sort(sortedSSTables, SSTableReader.sizeComparator);
                 return sortedSSTables;
             }
 
@@ -455,6 +456,42 @@ public class CompactionManager implements 
CompactionManagerMBean
         }, jobs, OperationType.CLEANUP);
     }
 
+    public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore 
cfStore, TombstoneOption tombstoneOption, int jobs) throws 
InterruptedException, ExecutionException
+    {
+        assert !cfStore.isIndex();
+
+        return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
+        {
+            @Override
+            public Iterable<SSTableReader> filterSSTables(LifecycleTransaction 
transaction)
+            {
+                Iterable<SSTableReader> originals = transaction.originals();
+                if 
(cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
+                    originals = Iterables.filter(originals, 
SSTableReader::isRepaired);
+                List<SSTableReader> sortedSSTables = 
Lists.newArrayList(originals);
+                Collections.sort(sortedSSTables, 
SSTableReader.maxTimestampComparator);
+                return sortedSSTables;
+            }
+
+            @Override
+            public void execute(LifecycleTransaction txn) throws IOException
+            {
+                logger.debug("Garbage collecting {}", txn.originals());
+                CompactionTask task = new CompactionTask(cfStore, txn, 
getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()))
+                {
+                    @Override
+                    protected CompactionController 
getCompactionController(Set<SSTableReader> toCompact)
+                    {
+                        return new CompactionController(cfStore, toCompact, 
gcBefore, getRateLimiter(), tombstoneOption);
+                    }
+                };
+                task.setUserDefined(true);
+                task.setCompactionType(OperationType.GARBAGE_COLLECT);
+                task.execute(metrics);
+            }
+        }, jobs, OperationType.GARBAGE_COLLECT);
+    }
+
     public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, 
int jobs) throws ExecutionException, InterruptedException
     {
         if (!cfs.getPartitioner().splitter().isPresent())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index cfe0121..5442a2d 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -138,7 +138,7 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
         if (sstablesWithTombstones.isEmpty())
             return Collections.emptyList();
 
-        return 
Collections.singletonList(Collections.min(sstablesWithTombstones, new 
SSTableReader.SizeComparator()));
+        return 
Collections.singletonList(Collections.min(sstablesWithTombstones, 
SSTableReader.sizeComparator));
     }
 
     private List<SSTableReader> 
getCompactionCandidates(Iterable<SSTableReader> candidateSSTables, long now, 
int base)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index ec5e1d9..25c5d20 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -145,7 +145,17 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
     @Override
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> 
sstables, int gcBefore)
     {
-        throw new UnsupportedOperationException("LevelDB compaction strategy 
does not allow user-specified compactions");
+        if (sstables.size() != 1)
+            throw new UnsupportedOperationException("LevelDB compaction 
strategy does not allow user-specified compactions");
+
+        LifecycleTransaction transaction = 
cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (transaction == null)
+        {
+            logger.trace("Unable to mark {} for compaction; probably a 
background compaction got to it first.  You can disable background compactions 
temporarily if this is a problem", sstables);
+            return null;
+        }
+        int level = sstables.iterator().next().getSSTableLevel();
+        return getCompactionTask(transaction, gcBefore, level == 0 ? 
Integer.MAX_VALUE : getMaxSSTableBytes());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java 
b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 84a34c9..27b8530 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -38,7 +38,8 @@ public enum OperationType
     WRITE("Write"),
     VIEW_BUILD("View build"),
     INDEX_SUMMARY("Index summary redistribution"),
-    RELOCATE("Relocate sstables to correct disk");
+    RELOCATE("Relocate sstables to correct disk"),
+    GARBAGE_COLLECT("Remove deleted data");
 
     public final String type;
     public final String fileName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java 
b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index a9cb211..2cfc75d 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -210,7 +210,8 @@ public class Scrubber implements Closeable
                     if (indexFile != null && dataStart != dataStartFromIndex)
                         outputHandler.warn(String.format("Data file row 
position %d differs from index file row position %d", dataStart, 
dataStartFromIndex));
 
-                    try (UnfilteredRowIterator iterator = withValidation(new 
RowMergingSSTableIterator(sstable, dataFile, key), dataFile.getPath()))
+                    try (UnfilteredRowIterator iterator = withValidation(new 
RowMergingSSTableIterator(SSTableIdentityIterator.create(sstable, dataFile, 
key)),
+                                                                         
dataFile.getPath()))
                     {
                         if (prevKey != null && prevKey.compareTo(key) > 0)
                         {
@@ -241,7 +242,7 @@ public class Scrubber implements Closeable
                         {
                             dataFile.seek(dataStartFromIndex);
 
-                            try (UnfilteredRowIterator iterator = 
withValidation(new SSTableIdentityIterator(sstable, dataFile, key), 
dataFile.getPath()))
+                            try (UnfilteredRowIterator iterator = 
withValidation(SSTableIdentityIterator.create(sstable, dataFile, key), 
dataFile.getPath()))
                             {
                                 if (prevKey != null && prevKey.compareTo(key) 
> 0)
                                 {
@@ -471,38 +472,43 @@ public class Scrubber implements Closeable
      *
      * For more details, refer to CASSANDRA-12144.
      */
-    private static class RowMergingSSTableIterator extends 
SSTableIdentityIterator
+    private static class RowMergingSSTableIterator extends 
WrappingUnfilteredRowIterator
     {
-        RowMergingSSTableIterator(SSTableReader sstable, RandomAccessReader 
file, DecoratedKey key)
+        Unfiltered nextToOffer = null;
+
+        RowMergingSSTableIterator(UnfilteredRowIterator source)
         {
-            super(sstable, file, key);
+            super(source);
         }
 
         @Override
-        protected Unfiltered doCompute()
+        public boolean hasNext()
         {
-            if (!iterator.hasNext())
-                return endOfData();
+            return nextToOffer != null || wrapped.hasNext();
+        }
 
-            Unfiltered next = iterator.next();
-            if (!next.isRow())
-                return next;
+        @Override
+        public Unfiltered next()
+        {
+            Unfiltered next = nextToOffer != null ? nextToOffer : 
wrapped.next();
 
-            while (iterator.hasNext())
+            if (next.isRow())
             {
-                Unfiltered peek = iterator.peek();
-                // If there was a duplicate row, merge it.
-                if (next.clustering().equals(peek.clustering()) && 
peek.isRow())
+                while (wrapped.hasNext())
                 {
-                    iterator.next(); // Make sure that the peeked item was 
consumed.
+                    Unfiltered peek = wrapped.next();
+                    if (!peek.isRow() || 
!next.clustering().equals(peek.clustering()))
+                    {
+                        nextToOffer = peek; // Offer peek in next call
+                        return next;
+                    }
+    
+                    // Duplicate row, merge it.
                     next = Rows.merge((Row) next, (Row) peek, 
FBUtilities.nowInSeconds());
                 }
-                else
-                {
-                    break;
-                }
             }
 
+            nextToOffer = null;
             return next;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 7cca4a7..8302a9b 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -101,8 +101,7 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
         if (sstablesWithTombstones.isEmpty())
             return Collections.emptyList();
 
-        Collections.sort(sstablesWithTombstones, new 
SSTableReader.SizeComparator());
-        return Collections.singletonList(sstablesWithTombstones.get(0));
+        return 
Collections.singletonList(Collections.max(sstablesWithTombstones, 
SSTableReader.sizeComparator));
     }
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 55daaa1..fd53930 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -143,7 +143,7 @@ public class TimeWindowCompactionStrategy extends 
AbstractCompactionStrategy
         if (sstablesWithTombstones.isEmpty())
             return Collections.emptyList();
 
-        return 
Collections.singletonList(Collections.min(sstablesWithTombstones, new 
SSTableReader.SizeComparator()));
+        return 
Collections.singletonList(Collections.min(sstablesWithTombstones, 
SSTableReader.sizeComparator));
     }
 
     private List<SSTableReader> 
getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
@@ -314,7 +314,7 @@ public class TimeWindowCompactionStrategy extends 
AbstractCompactionStrategy
         List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
 
         // Trim the largest sstables off the end to meet the maxThreshold
-        Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
+        Collections.sort(ssTableReaders, SSTableReader.sizeComparator);
 
         return ImmutableList.copyOf(Iterables.limit(ssTableReaders, 
maxThreshold));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java 
b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 91c7ad7..17b1187 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -187,7 +187,7 @@ public class Verifier implements Closeable
                         markAndThrow();
 
                     //mimic the scrub read path
-                    try (UnfilteredRowIterator iterator = new 
SSTableIdentityIterator(sstable, dataFile, key))
+                    try (UnfilteredRowIterator iterator = 
SSTableIdentityIterator.create(sstable, dataFile, key))
                     {
                     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/rows/Cells.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java 
b/src/java/org/apache/cassandra/db/rows/Cells.java
index 54df26e..38bde16 100644
--- a/src/java/org/apache/cassandra/db/rows/Cells.java
+++ b/src/java/org/apache/cassandra/db/rows/Cells.java
@@ -233,6 +233,86 @@ public abstract class Cells
         return timeDelta;
     }
 
+    /**
+     * Adds to the builder a representation of the given existing cell that, 
when merged/reconciled with the given
+     * update cell, produces the same result as merging the original with the 
update.
+     * <p>
+     * For simple cells that is either the original cell (if still live), or 
nothing (if shadowed).
+     *
+     * @param existing the pre-existing cell, the one that is updated.
+     * @param update the newly added cell, the update. This can be {@code 
null} out
+     * of convenience, in which case this function simply copy {@code 
existing} to
+     * {@code writer}.
+     * @param deletion the deletion time that applies to the cells being 
considered.
+     * This deletion time may delete both {@code existing} or {@code update}.
+     * @param builder the row builder to which the result of the filtering is 
written.
+     * @param nowInSec the current time in seconds (which plays a role during 
reconciliation
+     * because deleted cells always have precedence on timestamp equality and 
deciding if a
+     * cell is a live or not depends on the current time due to expiring 
cells).
+     */
+    public static void addNonShadowed(Cell existing,
+                                      Cell update,
+                                      DeletionTime deletion,
+                                      Row.Builder builder,
+                                      int nowInSec)
+    {
+        if (deletion.deletes(existing))
+            return;
+
+        Cell reconciled = reconcile(existing, update, nowInSec);
+        if (reconciled != update)
+            builder.addCell(existing);
+    }
+
+    /**
+     * Adds to the builder a representation of the given existing cell that, 
when merged/reconciled with the given
+     * update cell, produces the same result as merging the original with the 
update.
+     * <p>
+     * For simple cells that is either the original cell (if still live), or 
nothing (if shadowed).
+     *
+     * @param column the complex column the cells are for.
+     * @param existing the pre-existing cells, the ones that are updated.
+     * @param update the newly added cells, the update. This can be {@code 
null} out
+     * of convenience, in which case this function simply copy the cells from
+     * {@code existing} to {@code writer}.
+     * @param deletion the deletion time that applies to the cells being 
considered.
+     * This deletion time may delete both {@code existing} or {@code update}.
+     * @param builder the row builder to which the result of the filtering is 
written.
+     * @param nowInSec the current time in seconds (which plays a role during 
reconciliation
+     * because deleted cells always have precedence on timestamp equality and 
deciding if a
+     * cell is a live or not depends on the current time due to expiring 
cells).
+     */
+    public static void addNonShadowedComplex(ColumnDefinition column,
+                                             Iterator<Cell> existing,
+                                             Iterator<Cell> update,
+                                             DeletionTime deletion,
+                                             Row.Builder builder,
+                                             int nowInSec)
+    {
+        Comparator<CellPath> comparator = column.cellPathComparator();
+        Cell nextExisting = getNext(existing);
+        Cell nextUpdate = getNext(update);
+        while (nextExisting != null)
+        {
+            int cmp = nextUpdate == null ? -1 : 
comparator.compare(nextExisting.path(), nextUpdate.path());
+            if (cmp < 0)
+            {
+                addNonShadowed(nextExisting, null, deletion, builder, 
nowInSec);
+                nextExisting = getNext(existing);
+            }
+            else if (cmp == 0)
+            {
+                addNonShadowed(nextExisting, nextUpdate, deletion, builder, 
nowInSec);
+                nextExisting = getNext(existing);
+                nextUpdate = getNext(update);
+            }
+            else
+            {
+                nextUpdate = getNext(update);
+            }
+        }
+    }
+
     private static Cell getNext(Iterator<Cell> iterator)
     {
         return iterator == null || !iterator.hasNext() ? null : 
iterator.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java 
b/src/java/org/apache/cassandra/db/rows/Rows.java
index 4f6c8d2..e6d9062 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -25,6 +25,7 @@ import com.google.common.collect.PeekingIterator;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.db.rows.Row.Deletion;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.WrappedInt;
 
@@ -311,4 +312,80 @@ public abstract class Rows
         }
         return timeDelta;
     }
+
+    /**
+     * Returns a row that is obtained from the given existing row by removing 
everything that is shadowed by data in
+     * the update row. In other words, produces the smallest result row such 
that
+     * {@code merge(result, update, nowInSec) == merge(existing, update, 
nowInSec)} after filtering by rangeDeletion.
+     *
+     * @param existing source row
+     * @param update shadowing row
+     * @param rangeDeletion extra {@code DeletionTime} from covering tombstone
+     * @param nowInSec the current time in seconds (which plays a role during 
reconciliation
+     * because deleted cells always have precedence on timestamp equality and 
deciding if a
+     * cell is a live or not depends on the current time due to expiring 
cells).
+     */
+    public static Row removeShadowedCells(Row existing, Row update, 
DeletionTime rangeDeletion, int nowInSec)
+    {
+        Row.Builder builder = BTreeRow.sortedBuilder();
+        Clustering clustering = existing.clustering();
+        builder.newRow(clustering);
+
+        DeletionTime deletion = update.deletion().time();
+        if (rangeDeletion.supersedes(deletion))
+            deletion = rangeDeletion;
+
+        LivenessInfo existingInfo = existing.primaryKeyLivenessInfo();
+        if (!deletion.deletes(existingInfo))
+            builder.addPrimaryKeyLivenessInfo(existingInfo);
+        Row.Deletion rowDeletion = existing.deletion();
+        if (!deletion.supersedes(rowDeletion.time()))
+            builder.addRowDeletion(rowDeletion);
+
+        Iterator<ColumnData> a = existing.iterator();
+        Iterator<ColumnData> b = update.iterator();
+        ColumnData nexta = a.hasNext() ? a.next() : null, nextb = b.hasNext() 
? b.next() : null;
+        while (nexta != null)
+        {
+            int comparison = nextb == null ? -1 : 
nexta.column.compareTo(nextb.column);
+            if (comparison <= 0)
+            {
+                ColumnData cura = nexta;
+                ColumnDefinition column = cura.column;
+                ColumnData curb = comparison == 0 ? nextb : null;
+                if (column.isSimple())
+                {
+                    Cells.addNonShadowed((Cell) cura, (Cell) curb, deletion, 
builder, nowInSec);
+                }
+                else
+                {
+                    ComplexColumnData existingData = (ComplexColumnData) cura;
+                    ComplexColumnData updateData = (ComplexColumnData) curb;
+
+                    DeletionTime existingDt = existingData.complexDeletion();
+                    DeletionTime updateDt = updateData == null ? 
DeletionTime.LIVE : updateData.complexDeletion();
+
+                    DeletionTime maxDt = updateDt.supersedes(deletion) ? 
updateDt : deletion;
+                    if (existingDt.supersedes(maxDt))
+                    {
+                        builder.addComplexDeletion(column, existingDt);
+                        maxDt = existingDt;
+                    }
+
+                    Iterator<Cell> existingCells = existingData.iterator();
+                    Iterator<Cell> updateCells = updateData == null ? null : 
updateData.iterator();
+                    Cells.addNonShadowedComplex(column, existingCells, 
updateCells, maxDt, builder, nowInSec);
+                }
+                nexta = a.hasNext() ? a.next() : null;
+                if (curb != null)
+                    nextb = b.hasNext() ? b.next() : null;
+            }
+            else
+            {
+                nextb = b.hasNext() ? b.next() : null;
+            }
+        }
+        Row row = builder.build();
+        return row != null && !row.isEmpty() ? row : null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index ed6bd12..db18859 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -24,9 +24,11 @@ import com.google.common.collect.Collections2;
 import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Row.Deletion;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.WrappedException;
 
@@ -450,6 +452,58 @@ public class UnfilteredSerializer
         }
     }
 
+    public Unfiltered deserializeTombstonesOnly(FileDataInput in, 
SerializationHeader header, SerializationHelper helper)
+    throws IOException
+    {
+        while (true)
+        {
+            int flags = in.readUnsignedByte();
+            if (isEndOfPartition(flags))
+                return null;
+
+            int extendedFlags = readExtendedFlags(in, flags);
+
+            if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+            {
+                ClusteringBoundOrBoundary bound = 
ClusteringBoundOrBoundary.serializer.deserialize(in, helper.version, 
header.clusteringTypes());
+                return deserializeMarkerBody(in, header, bound);
+            }
+            else
+            {
+                assert !isStatic(extendedFlags); // deserializeStaticRow 
should be used for that.
+                if ((flags & HAS_DELETION) != 0)
+                {
+                    assert header.isForSSTable();
+                    boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
+                    boolean hasTTL = (flags & HAS_TTL) != 0;
+                    boolean deletionIsShadowable = (extendedFlags & 
HAS_SHADOWABLE_DELETION) != 0;
+                    Clustering clustering = 
Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes());
+                    long nextPosition = in.readUnsignedVInt() + 
in.getFilePointer();
+                    in.readUnsignedVInt(); // skip previous unfiltered size
+                    if (hasTimestamp)
+                    {
+                        header.readTimestamp(in);
+                        if (hasTTL)
+                        {
+                            header.readTTL(in);
+                            header.readLocalDeletionTime(in);
+                        }
+                    }
+
+                    Deletion deletion = new 
Row.Deletion(header.readDeletionTime(in), deletionIsShadowable);
+                    in.seek(nextPosition);
+                    return BTreeRow.emptyDeletedRow(clustering, deletion);
+                }
+                else
+                {
+                    Clustering.serializer.skip(in, helper.version, 
header.clusteringTypes());
+                    skipRowBody(in);
+                    // Continue with next item.
+                }
+            }
+        }
+    }
+
     public Row deserializeStaticRow(DataInputPlus in, SerializationHeader 
header, SerializationHelper helper)
     throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java 
b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index de8d69b..1173d40 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -96,7 +96,7 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
                             dataFile.seek(indexEntry.position);
                             ByteBufferUtil.readWithShortLength(dataFile); // 
key
 
-                            try (SSTableIdentityIterator partition = new 
SSTableIdentityIterator(sstable, dataFile, key))
+                            try (SSTableIdentityIterator partition = 
SSTableIdentityIterator.create(sstable, dataFile, key))
                             {
                                 // if the row has statics attached, it has to 
be indexed separately
                                 
indexWriter.nextUnfilteredCluster(partition.staticRow());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index a5af334..2a79f88 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -19,15 +19,15 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.*;
 
-import org.apache.cassandra.utils.AbstractIterator;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> 
implements Comparable<SSTableIdentityIterator>, UnfilteredRowIterator
+public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterator>, UnfilteredRowIterator
 {
     private final SSTableReader sstable;
     private final DecoratedKey key;
@@ -37,29 +37,51 @@ public class SSTableIdentityIterator extends 
AbstractIterator<Unfiltered> implem
     protected final SSTableSimpleIterator iterator;
     private final Row staticRow;
 
-    /**
-     * Used to iterate through the columns of a row.
-     * @param sstable SSTable we are reading ffrom.
-     * @param file Reading using this file.
-     * @param key Key of this row.
-     */
-    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader 
file, DecoratedKey key)
+    public SSTableIdentityIterator(SSTableReader sstable, DecoratedKey key, 
DeletionTime partitionLevelDeletion,
+            String filename, SSTableSimpleIterator iterator) throws IOException
     {
+        super();
         this.sstable = sstable;
-        this.filename = file.getPath();
         this.key = key;
+        this.partitionLevelDeletion = partitionLevelDeletion;
+        this.filename = filename;
+        this.iterator = iterator;
+        this.staticRow = iterator.readStaticRow();
+    }
 
+    public static SSTableIdentityIterator create(SSTableReader sstable, 
RandomAccessReader file, DecoratedKey key)
+    {
         try
         {
-            this.partitionLevelDeletion = 
DeletionTime.serializer.deserialize(file);
+            DeletionTime partitionLevelDeletion = 
DeletionTime.serializer.deserialize(file);
             SerializationHelper helper = new 
SerializationHelper(sstable.metadata, 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL);
-            this.iterator = SSTableSimpleIterator.create(sstable.metadata, 
file, sstable.header, helper, partitionLevelDeletion);
-            this.staticRow = iterator.readStaticRow();
+            SSTableSimpleIterator iterator = 
SSTableSimpleIterator.create(sstable.metadata, file, sstable.header, helper, 
partitionLevelDeletion);
+            return new SSTableIdentityIterator(sstable, key, 
partitionLevelDeletion, file.getPath(), iterator);
         }
         catch (IOException e)
         {
             sstable.markSuspect();
-            throw new CorruptSSTableException(e, filename);
+            throw new CorruptSSTableException(e, file.getPath());
+        }
+    }
+
+    public static SSTableIdentityIterator create(SSTableReader sstable, 
FileDataInput dfile, RowIndexEntry<?> indexEntry, DecoratedKey key, boolean 
tombstoneOnly)
+    {
+        try
+        {
+            dfile.seek(indexEntry.position);
+            ByteBufferUtil.skipShortLength(dfile); // Skip partition key
+            DeletionTime partitionLevelDeletion = 
DeletionTime.serializer.deserialize(dfile);
+            SerializationHelper helper = new 
SerializationHelper(sstable.metadata, 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL);
+            SSTableSimpleIterator iterator = tombstoneOnly
+                    ? 
SSTableSimpleIterator.createTombstoneOnly(sstable.metadata, dfile, 
sstable.header, helper, partitionLevelDeletion)
+                    : SSTableSimpleIterator.create(sstable.metadata, dfile, 
sstable.header, helper, partitionLevelDeletion);
+            return new SSTableIdentityIterator(sstable, key, 
partitionLevelDeletion, dfile.getPath(), iterator);
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, dfile.getPath());
         }
     }
 
@@ -93,7 +115,32 @@ public class SSTableIdentityIterator extends 
AbstractIterator<Unfiltered> implem
         return staticRow;
     }
 
-    protected Unfiltered computeNext()
+    public boolean hasNext()
+    {
+        try
+        {
+            return iterator.hasNext();
+        }
+        catch (IndexOutOfBoundsException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, filename);
+        }
+        catch (IOError e)
+        {
+            if (e.getCause() instanceof IOException)
+            {
+                sstable.markSuspect();
+                throw new CorruptSSTableException((Exception)e.getCause(), 
filename);
+            }
+            else
+            {
+                throw e;
+            }
+        }
+    }
+
+    public Unfiltered next()
     {
         try
         {
@@ -120,7 +167,7 @@ public class SSTableIdentityIterator extends 
AbstractIterator<Unfiltered> implem
 
     protected Unfiltered doCompute()
     {
-        return iterator.hasNext() ? iterator.next() : endOfData();
+        return iterator.next();
     }
 
     public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index 2d4314e..ce42126 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.net.MessagingService;
 
 /**
@@ -59,6 +60,14 @@ public abstract class SSTableSimpleIterator extends 
AbstractIterator<Unfiltered>
             return new CurrentFormatIterator(metadata, in, header, helper);
     }
 
+    public static SSTableSimpleIterator createTombstoneOnly(CFMetaData 
metadata, DataInputPlus in, SerializationHeader header, SerializationHelper 
helper, DeletionTime partitionDeletion)
+    {
+        if (helper.version < MessagingService.VERSION_30)
+            return new OldFormatTombstoneIterator(metadata, in, helper, 
partitionDeletion);
+        else
+            return new CurrentFormatTombstoneIterator(metadata, in, header, 
helper);
+    }
+
     public abstract Row readStaticRow() throws IOException;
 
     private static class CurrentFormatIterator extends SSTableSimpleIterator
@@ -93,6 +102,41 @@ public abstract class SSTableSimpleIterator extends 
AbstractIterator<Unfiltered>
         }
     }
 
+    private static class CurrentFormatTombstoneIterator extends 
SSTableSimpleIterator
+    {
+        private final SerializationHeader header;
+
+        private CurrentFormatTombstoneIterator(CFMetaData metadata, 
DataInputPlus in, SerializationHeader header, SerializationHelper helper)
+        {
+            super(metadata, in, helper);
+            this.header = header;
+        }
+
+        public Row readStaticRow() throws IOException
+        {
+            if (header.hasStatic())
+            {
+                Row staticRow = 
UnfilteredSerializer.serializer.deserializeStaticRow(in, header, helper);
+                if (!staticRow.deletion().isLive())
+                    return BTreeRow.emptyDeletedRow(staticRow.clustering(), 
staticRow.deletion());
+            }
+            return Rows.EMPTY_STATIC_ROW;
+        }
+
+        protected Unfiltered computeNext()
+        {
+            try
+            {
+                Unfiltered unfiltered = 
UnfilteredSerializer.serializer.deserializeTombstonesOnly((FileDataInput) in, 
header, helper);
+                return unfiltered == null ? endOfData() : unfiltered;
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+    }
+
     private static class OldFormatIterator extends SSTableSimpleIterator
     {
         private final UnfilteredDeserializer deserializer;
@@ -163,4 +207,35 @@ public abstract class SSTableSimpleIterator extends 
AbstractIterator<Unfiltered>
 
     }
 
+    private static class OldFormatTombstoneIterator extends OldFormatIterator
+    {
+        private OldFormatTombstoneIterator(CFMetaData metadata, DataInputPlus 
in, SerializationHelper helper, DeletionTime partitionDeletion)
+        {
+            super(metadata, in, helper, partitionDeletion);
+        }
+
+        public Row readStaticRow() throws IOException
+        {
+            Row row = super.readStaticRow();
+            if (!row.deletion().isLive())
+                return BTreeRow.emptyDeletedRow(row.clustering(), 
row.deletion());
+            return Rows.EMPTY_STATIC_ROW;
+        }
+
+        protected Unfiltered computeNext()
+        {
+            while (true)
+            {
+                Unfiltered unfiltered = super.computeNext();
+                if (unfiltered == null || unfiltered.isRangeTombstoneMarker())
+                    return unfiltered;
+
+                Row row = (Row) unfiltered;
+                if (!row.deletion().isLive())
+                    return BTreeRow.emptyDeletedRow(row.clustering(), 
row.deletion());
+                // Otherwise read next.
+            }
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index d26edfa..32d3156 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -167,6 +167,14 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
 
     public static final Ordering<SSTableReader> sstableOrdering = 
Ordering.from(sstableComparator);
 
+    public static final Comparator<SSTableReader> sizeComparator = new 
Comparator<SSTableReader>()
+    {
+        public int compare(SSTableReader o1, SSTableReader o2)
+        {
+            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
+        }
+    };
+
     /**
      * maxDataAge is a timestamp in local server time (e.g. 
System.currentTimeMilli) which represents an upper bound
      * to the newest piece of data stored in the sstable. In other words, this 
sstable does not contain items created
@@ -1529,6 +1537,8 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
     public abstract UnfilteredRowIterator iterator(DecoratedKey key, Slices 
slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
     public abstract UnfilteredRowIterator iterator(FileDataInput file, 
DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter 
selectedColumns, boolean reversed, boolean isForThrift);
 
+    public abstract UnfilteredRowIterator simpleIterator(FileDataInput file, 
DecoratedKey key, RowIndexEntry indexEntry, boolean tombstoneOnly);
+
     /**
      * Finds and returns the first key beyond a given token in this SSTable or 
null if no such key exists.
      */
@@ -2016,14 +2026,6 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         return 0;
     }
 
-    public static class SizeComparator implements Comparator<SSTableReader>
-    {
-        public int compare(SSTableReader o1, SSTableReader o2)
-        {
-            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
-        }
-    }
-
     public EncodingStats stats()
     {
         // We could return sstable.header.stats(), but this may not be as 
accurate than the actual sstable stats (see

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 7a7ce8c..8c64b01 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -30,13 +30,11 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
@@ -120,6 +118,13 @@ public class BigTableReader extends SSTableReader
     }
 
 
+    @SuppressWarnings("resource") // caller to close
+    @Override
+    public UnfilteredRowIterator simpleIterator(FileDataInput dfile, 
DecoratedKey key, RowIndexEntry position, boolean tombstoneOnly)
+    {
+        return SSTableIdentityIterator.create(this, dfile, position, key, 
tombstoneOnly);
+    }
+
     /**
      * @param key The key to apply as the rhs to the given Operator. A 'fake' 
key is allowed to
      * allow key selection by token bounds but only if op != * EQ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 6d31844..66213a6 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -331,7 +331,7 @@ public class BigTableScanner implements ISSTableScanner
                             {
                                 dfile.seek(currentEntry.position);
                                 ByteBufferUtil.skipShortLength(dfile); // key
-                                return new SSTableIdentityIterator(sstable, 
dfile, partitionKey());
+                                return SSTableIdentityIterator.create(sstable, 
dfile, partitionKey());
                             }
 
                             ClusteringIndexFilter filter = 
dataRange.clusteringIndexFilter(partitionKey());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/schema/CompactionParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/CompactionParams.java 
b/src/java/org/apache/cassandra/schema/CompactionParams.java
index 720efa3..73271f1 100644
--- a/src/java/org/apache/cassandra/schema/CompactionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompactionParams.java
@@ -45,7 +45,8 @@ public final class CompactionParams
         CLASS,
         ENABLED,
         MIN_THRESHOLD,
-        MAX_THRESHOLD;
+        MAX_THRESHOLD,
+        PROVIDE_OVERLAPPING_TOMBSTONES;
 
         @Override
         public String toString()
@@ -54,27 +55,38 @@ public final class CompactionParams
         }
     }
 
+    public enum TombstoneOption
+    {
+        NONE,
+        ROW,
+        CELL;
+    }
+
     public static final int DEFAULT_MIN_THRESHOLD = 4;
     public static final int DEFAULT_MAX_THRESHOLD = 32;
 
     public static final boolean DEFAULT_ENABLED = true;
+    public static final TombstoneOption DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES 
=
+            
TombstoneOption.valueOf(System.getProperty("default.provide.overlapping.tombstones",
 TombstoneOption.NONE.toString()).toUpperCase());
 
     public static final Map<String, String> DEFAULT_THRESHOLDS =
         ImmutableMap.of(Option.MIN_THRESHOLD.toString(), 
Integer.toString(DEFAULT_MIN_THRESHOLD),
                         Option.MAX_THRESHOLD.toString(), 
Integer.toString(DEFAULT_MAX_THRESHOLD));
 
     public static final CompactionParams DEFAULT =
-        new CompactionParams(SizeTieredCompactionStrategy.class, 
DEFAULT_THRESHOLDS, DEFAULT_ENABLED);
+        new CompactionParams(SizeTieredCompactionStrategy.class, 
DEFAULT_THRESHOLDS, DEFAULT_ENABLED, DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES);
 
     private final Class<? extends AbstractCompactionStrategy> klass;
     private final ImmutableMap<String, String> options;
     private final boolean isEnabled;
+    private final TombstoneOption tombstoneOption;
 
-    private CompactionParams(Class<? extends AbstractCompactionStrategy> 
klass, Map<String, String> options, boolean isEnabled)
+    private CompactionParams(Class<? extends AbstractCompactionStrategy> 
klass, Map<String, String> options, boolean isEnabled, TombstoneOption 
tombstoneOption)
     {
         this.klass = klass;
         this.options = ImmutableMap.copyOf(options);
         this.isEnabled = isEnabled;
+        this.tombstoneOption = tombstoneOption;
     }
 
     public static CompactionParams create(Class<? extends 
AbstractCompactionStrategy> klass, Map<String, String> options)
@@ -82,6 +94,8 @@ public final class CompactionParams
         boolean isEnabled = options.containsKey(Option.ENABLED.toString())
                           ? 
Boolean.parseBoolean(options.get(Option.ENABLED.toString()))
                           : DEFAULT_ENABLED;
+        TombstoneOption tombstoneOption = 
TombstoneOption.valueOf(options.getOrDefault(Option.PROVIDE_OVERLAPPING_TOMBSTONES.toString(),
+                                                                               
        DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES.toString()).toUpperCase());
 
         Map<String, String> allOptions = new HashMap<>(options);
         if (supportsThresholdParams(klass))
@@ -90,7 +104,7 @@ public final class CompactionParams
             allOptions.putIfAbsent(Option.MAX_THRESHOLD.toString(), 
Integer.toString(DEFAULT_MAX_THRESHOLD));
         }
 
-        return new CompactionParams(klass, allOptions, isEnabled);
+        return new CompactionParams(klass, allOptions, isEnabled, 
tombstoneOption);
     }
 
     public static CompactionParams scts(Map<String, String> options)
@@ -119,6 +133,11 @@ public final class CompactionParams
              : Integer.parseInt(threshold);
     }
 
+    public TombstoneOption tombstoneOption()
+    {
+        return tombstoneOption;
+    }
+
     public void validate()
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index d64fc04..0a35296 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -75,6 +75,7 @@ import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.repair.*;
 import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.service.paxos.CommitVerbHandler;
 import org.apache.cassandra.service.paxos.PrepareVerbHandler;
@@ -2809,6 +2810,19 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return status.statusCode;
     }
 
+    public int garbageCollect(String tombstoneOptionString, int jobs, String 
keyspaceName, String ... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
+    {
+        TombstoneOption tombstoneOption = 
TombstoneOption.valueOf(tombstoneOptionString);
+        CompactionManager.AllSSTableOpStatus status = 
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
+        for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, 
keyspaceName, columnFamilies))
+        {
+            CompactionManager.AllSSTableOpStatus oneStatus = 
cfs.garbageCollect(tombstoneOption, jobs);
+            if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
+                status = oneStatus;
+        }
+        return status.statusCode;
+    }
+
     /**
      * Takes the snapshot of a multiple column family from different 
keyspaces. A snapshot name must be specified.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index f7da817..abb10c1 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -290,6 +290,12 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public int upgradeSSTables(String keyspaceName, boolean 
excludeCurrentVersion, int jobs, String... tableNames) throws IOException, 
ExecutionException, InterruptedException;
 
     /**
+     * Rewrites all sstables from the given tables to remove deleted data.
+     * The tombstone option defines the granularity of the procedure: ROW 
removes deleted partitions and rows, CELL also removes overwritten or deleted 
cells.
+     */
+    public int garbageCollect(String tombstoneOption, int jobs, String 
keyspaceName, String... tableNames) throws IOException, ExecutionException, 
InterruptedException;
+
+    /**
      * Flush all memtables for the given column families, or all 
columnfamilies for the given keyspace
      * if none are explicitly listed.
      * @param keyspaceName

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 84eeb04..c33dfa4 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -73,6 +73,7 @@ import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.metrics.ThreadPoolMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.CacheServiceMBean;
 import org.apache.cassandra.service.GCInspector;
@@ -259,6 +260,11 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, 
jobs, tableNames);
     }
 
+    public int garbageCollect(String tombstoneOption, int jobs, String 
keyspaceName, String... tableNames) throws IOException, ExecutionException, 
InterruptedException
+    {
+        return ssProxy.garbageCollect(tombstoneOption, jobs, keyspaceName, 
tableNames);
+    }
+
     private void checkJobs(PrintStream out, int jobs)
     {
         if (jobs > DatabaseDescriptor.getConcurrentCompactors())
@@ -301,7 +307,16 @@ public class NodeProbe implements AutoCloseable
         if (upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, 
tableNames) != 0)
         {
             failed = true;
-            out.println("Aborted upgrading sstables for atleast one table in 
keyspace "+keyspaceName+", check server logs for more information.");
+            out.println("Aborted upgrading sstables for at least one table in 
keyspace " + keyspaceName + ", check server logs for more information.");
+        }
+    }
+
+    public void garbageCollect(PrintStream out, String tombstoneOption, int 
jobs, String keyspaceName, String... tableNames) throws IOException, 
ExecutionException, InterruptedException
+    {
+        if (garbageCollect(tombstoneOption, jobs, keyspaceName, tableNames) != 
0)
+        {
+            failed = true;
+            out.println("Aborted garbage collection for at least one table in 
keyspace " + keyspaceName + ", check server logs for more information.");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index 8640b58..cde4ee5 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -64,6 +64,7 @@ public class NodeTool
                 Verify.class,
                 Flush.class,
                 UpgradeSSTable.class,
+                GarbageCollect.class,
                 DisableAutoCompaction.class,
                 EnableAutoCompaction.class,
                 CompactionStats.class,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java 
b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
new file mode 100644
index 0000000..37daf09
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.command.Arguments;
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "garbagecollect", description = "Remove deleted data from one 
or more tables")
+public class GarbageCollect extends NodeToolCmd
+{
+    @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace 
followed by one or many tables")
+    private List<String> args = new ArrayList<>();
+
+    @Option(title = "granularity",
+        name = {"-g", "--granularity"},
+        allowedValues = {"ROW", "CELL"},
+        description = "Granularity of garbage removal. ROW (default) removes 
deleted partitions and rows, CELL also removes overwritten or deleted cells.")
+    private String tombstoneOption = "ROW";
+
+    @Option(title = "jobs",
+            name = {"-j", "--jobs"},
+            description = "Number of sstables to cleanup simultanously, set to 
0 to use all available compaction threads")
+    private int jobs = 2;
+
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        List<String> keyspaces = parseOptionalKeyspace(args, probe);
+        String[] tableNames = parseOptionalTables(args);
+
+        for (String keyspace : keyspaces)
+        {
+            try
+            {
+                probe.garbageCollect(System.out, tombstoneOption, jobs, 
keyspace, tableNames);
+            } catch (Exception e)
+            {
+                throw new RuntimeException("Error occurred during garbage 
collection", e);
+            }
+        }
+    }
+}
\ No newline at end of file

Reply via email to