This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d7258ac8f31 Add table metric PurgeableTombstoneScannedHistogram and a 
tracing event for scanned purgeable tombstones
d7258ac8f31 is described below

commit d7258ac8f31eedc9c28f8c5e381893d192209d48
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Sun Dec 8 16:10:01 2024 +0300

    Add table metric PurgeableTombstoneScannedHistogram and a tracing event for 
scanned purgeable tombstones
    
    patch by Dmitry Konstantinov; reviewed by Chris Lohfink, Stefan Miklosovic 
for CASSANDRA-20132
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |  14 +
 conf/cassandra_latest.yaml                         |  14 +
 .../pages/managing/operating/metrics.adoc          |   3 +
 src/java/org/apache/cassandra/config/Config.java   |  30 ++
 .../cassandra/config/DatabaseDescriptor.java       |  11 +
 src/java/org/apache/cassandra/db/ReadCommand.java  | 139 ++++++
 .../cassandra/db/virtual/TableMetricTables.java    |   1 +
 .../apache/cassandra/metrics/KeyspaceMetrics.java  |   3 +
 .../org/apache/cassandra/metrics/TableMetrics.java |   3 +
 .../config/DatabaseDescriptorRefTest.java          |   1 +
 .../org/apache/cassandra/db/ReadCommandTest.java   | 465 +++++++++++++++++++--
 12 files changed, 641 insertions(+), 44 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 4a749676fb1..28ab975f79e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Add table metric PurgeableTombstoneScannedHistogram and a tracing event for 
scanned purgeable tombstones (CASSANDRA-20132)
  * Make sure we can parse the expanded CQL before writing it to the log or 
sending it to replicas (CASSANDRA-20218)
  * Add format_bytes and format_time functions (CASSANDRA-19546)
  * Fix error when trying to assign a tuple to target type not being a tuple 
(CASSANDRA-20237)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 55fc50f6c6e..58f8945d653 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1964,6 +1964,20 @@ transparent_data_encryption_options:
 tombstone_warn_threshold: 1000
 tombstone_failure_threshold: 100000
 
+# Controls the granularity of purgeable tombstones reported to 
PurgeableTombstoneScannedHistogram table metric
+# Possible values:
+# 'disabled' - do not collect the metric at all
+# 'row'      - track only partition/range/row level tombstone,
+#              a good compromise between overheads and usability.
+#              For CPU-bound workload you may get less than 1% of overhead for 
throughput.
+#              For IO-bound workload the overhead is negligible.
+# 'cell'     - track partition/range/row/cell level tombstones.
+#              This is the most granular option,
+#              but it has some performance overheads due to iteration over 
cells.
+#              For CPU-bound workload you may get about 5% of overhead for 
throughput.
+#              For IO-bound workload the overhead is almost negligible.
+# tombstone_read_purgeable_metric_granularity: disabled
+
 # Filtering and secondary index queries at read consistency levels above 
ONE/LOCAL_ONE use a
 # mechanism called replica filtering protection to ensure that results from 
stale replicas do
 # not violate consistency. (See CASSANDRA-8272 and CASSANDRA-15907 for more 
details.) This
diff --git a/conf/cassandra_latest.yaml b/conf/cassandra_latest.yaml
index 387565fe185..ce1d97c476d 100644
--- a/conf/cassandra_latest.yaml
+++ b/conf/cassandra_latest.yaml
@@ -1838,6 +1838,20 @@ transparent_data_encryption_options:
 tombstone_warn_threshold: 1000
 tombstone_failure_threshold: 100000
 
+# Controls the granularity of purgeable tombstones reported to 
PurgeableTombstoneScannedHistogram table metric
+# Possible values:
+# 'disabled' - do not collect the metric at all
+# 'row'      - track only partition/range/row level tombstone,
+#              a good compromise between overheads and usability.
+#              For CPU-bound workload you may get less than 1% of overhead for 
throughput.
+#              For IO-bound workload the overhead is negligible.
+# 'cell'     - track partition/range/row/cell level tombstones.
+#              This is the most granular option,
+#              but it has some performance overheads due to iteration over 
cells.
+#              For CPU-bound workload you may get about 5% of overhead for 
throughput.
+#              For IO-bound workload the overhead is almost negligible.
+# tombstone_read_purgeable_metric_granularity: disabled
+
 # Filtering and secondary index queries at read consistency levels above 
ONE/LOCAL_ONE use a
 # mechanism called replica filtering protection to ensure that results from 
stale replicas do
 # not violate consistency. (See CASSANDRA-8272 and CASSANDRA-15907 for more 
details.) This
diff --git a/doc/modules/cassandra/pages/managing/operating/metrics.adoc 
b/doc/modules/cassandra/pages/managing/operating/metrics.adoc
index 78dbd165867..4f3d66652c2 100644
--- a/doc/modules/cassandra/pages/managing/operating/metrics.adoc
+++ b/doc/modules/cassandra/pages/managing/operating/metrics.adoc
@@ -181,6 +181,9 @@ by compression meta data.
 |TombstoneScannedHistogram |Histogram |Histogram of tombstones scanned
 in queries on this table.
 
+|PurgeableTombstoneScannedHistogram |Histogram |Histogram of purgeable 
tombstones scanned
+in queries on this table. Use tombstone_read_purgeable_metric_granularity 
property in cassandra.yaml to enable it.
+
 |LiveScannedHistogram |Histogram |Histogram of live cells scanned in
 queries on this table.
 
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 932423d2488..77f7d9a3417 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -544,6 +544,8 @@ public class Config
     public volatile int tombstone_warn_threshold = 1000;
     public volatile int tombstone_failure_threshold = 100000;
 
+    public TombstonesMetricGranularity 
tombstone_read_purgeable_metric_granularity = 
TombstonesMetricGranularity.disabled;
+
     public final ReplicaFilteringProtectionOptions 
replica_filtering_protection = new ReplicaFilteringProtectionOptions();
 
     @Replaces(oldName = "index_summary_capacity_in_mb", converter = 
Converters.MEBIBYTES_DATA_STORAGE_LONG, deprecated = true)
@@ -1327,6 +1329,34 @@ public class Config
         }
     }
 
+    /**
+     * Allow to control the granularity of metrics related to tombstones.
+     * It is a trade-off between granularity of a metric vs performance 
overheads.
+     * See CASSANDRA-20132 for more details.
+     */
+    public enum TombstonesMetricGranularity
+    {
+        /**
+         * Do not collect the metric at all.
+         */
+        disabled,
+        /**
+         * Track only partition/range/row level tombstone,
+         * a good compromise between overheads and usability.
+         * For CPU-bound workload you may get less than 1% of overhead for 
throughput.
+         * For IO-bound workload the overhead is negligible.
+         */
+        row,
+        /**
+         * Track partition/range/row/cell level tombstones.
+         * This is the most granular option,
+         * but it has some performance overheads due to iteration over cells.
+         * For CPU-bound workload you may get about 5% of overhead for 
throughput.
+         * For IO-bound workload the overhead is almost negligible.
+         */
+        cell
+    }
+
     private static final Set<String> SENSITIVE_KEYS = new HashSet<String>() {{
         add("client_encryption_options");
         add("server_encryption_options");
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8a09f4fd76e..b784571d01f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -5523,4 +5523,15 @@ public class DatabaseDescriptor
     {
         return conf.password_validator_reconfiguration_enabled;
     }
+
+    public static Config.TombstonesMetricGranularity 
getPurgeableTobmstonesMetricGranularity()
+    {
+        return conf.tombstone_read_purgeable_metric_granularity;
+    }
+
+    @VisibleForTesting
+    public static void 
setPurgeableTobmstonesMetricGranularity(Config.TombstonesMetricGranularity 
granularity)
+    {
+        conf.tombstone_read_purgeable_metric_granularity = granularity;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 3bb50cbcfae..e4ea5f12d74 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -466,6 +466,7 @@ public abstract class ReadCommand extends AbstractReadQuery
                 iterator = withQuerySizeTracking(iterator);
                 iterator = maybeSlowDownForTesting(iterator);
                 iterator = withQueryCancellation(iterator);
+                iterator = maybeRecordPurgeableTombstones(iterator, cfs);
                 iterator = 
RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, 
executionController), Stage.PURGED, false);
                 iterator = withMetricsRecording(iterator, cfs.metric, 
startTimeNanos);
 
@@ -894,6 +895,144 @@ public abstract class ReadCommand extends 
AbstractReadQuery
         return Transformation.apply(iterator, new 
WithoutPurgeableTombstones());
     }
 
+
+    /**
+     * Wraps the provided iterator so that metrics on count of purgeable 
tombstones are tracked and traced.
+     * It tracks only tombstones with localDeletionTime < now - 
gc_grace_period.
+     * Other (non-purgeable) tombstones will be tracked by regular Cassandra 
logic later.
+     */
+    private UnfilteredPartitionIterator 
maybeRecordPurgeableTombstones(UnfilteredPartitionIterator iter,
+                                                                       
ColumnFamilyStore cfs)
+    {
+        class PurgeableTombstonesMetricRecording extends 
Transformation<UnfilteredRowIterator>
+        {
+            private int purgeableTombstones = 0;
+
+            @Override
+            public UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator iter)
+            {
+                if (!iter.partitionLevelDeletion().isLive())
+                    purgeableTombstones++;
+                return Transformation.apply(iter, this);
+            }
+
+            @Override
+            public Row applyToStatic(Row row)
+            {
+                return applyToRow(row);
+            }
+
+            @Override
+            public Row applyToRow(Row row)
+            {
+                final long nowInSec = nowInSec();
+                boolean hasTombstones = false;
+
+                if (isPurgeableCellTombstonesTrackingEnabled())
+                {
+                    for (Cell<?> cell : row.cells())
+                    {
+                        if (!cell.isLive(nowInSec) && 
isPurgeable(cell.localDeletionTime(), nowInSec))
+                        {
+                            purgeableTombstones++;
+                            hasTombstones = true; // allows to avoid counting 
an extra tombstone if the whole row expired
+                        }
+                    }
+                }
+
+                // we replicate the logic is used for non-purged tombstones 
metric here
+                if (!row.primaryKeyLivenessInfo().isLive(nowInSec)
+                    && row.hasDeletion(nowInSec)
+                    && isPurgeable(row.deletion().time(), nowInSec)
+                    && !hasTombstones)
+                {
+                    // We're counting primary key deletions only here.
+                    purgeableTombstones++;
+                }
+
+                return row;
+            }
+
+            @Override
+            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker 
marker)
+            {
+                final long nowInSec = nowInSec();
+
+                // for boundary markers - increment metric only if both - 
close and open - markers are purgeable
+                if (marker.isBoundary())
+                {
+                    countIfBothPurgeable(marker.closeDeletionTime(false),
+                                         marker.openDeletionTime(false),
+                                         nowInSec);
+                }
+                // for bound markers - just increment if it is purgeable
+                else if (marker instanceof RangeTombstoneBoundMarker)
+                {
+                    countIfPurgeable(((RangeTombstoneBoundMarker) 
marker).deletionTime(), nowInSec);
+                }
+
+                return marker;
+            }
+
+            @Override
+            public void onClose()
+            {
+                
cfs.metric.purgeableTombstoneScannedHistogram.update(purgeableTombstones);
+                if (purgeableTombstones > 0)
+                    Tracing.trace("Read {} purgeable tombstone cells", 
purgeableTombstones);
+            }
+
+            /**
+             * Increments if both - close and open - deletion times less than 
(now - gc_grace_period)
+             */
+            private void countIfBothPurgeable(DeletionTime closeDeletionTime,
+                                              DeletionTime openDeletionTime,
+                                              long nowInSec)
+            {
+                if (isPurgeable(closeDeletionTime, nowInSec) && 
isPurgeable(openDeletionTime, nowInSec))
+                    purgeableTombstones++;
+            }
+
+            /**
+             * Increments if deletion time less than (now - gc_grace_period)
+             */
+            private void countIfPurgeable(DeletionTime deletionTime,
+                                          long nowInSec)
+            {
+                if (isPurgeable(deletionTime, nowInSec))
+                    purgeableTombstones++;
+            }
+
+            /**
+             * Checks that deletion time < now - gc_grace_period
+             */
+            private boolean isPurgeable(DeletionTime deletionTime,
+                                        long nowInSec)
+            {
+                return isPurgeable(deletionTime.localDeletionTime(), nowInSec);
+            }
+
+            /**
+             * Checks that deletion time < now - gc_grace_period
+             */
+            private boolean isPurgeable(long localDeletionTime,
+                                        long nowInSec)
+            {
+                return localDeletionTime < cfs.gcBefore(nowInSec);
+            }
+
+            private boolean isPurgeableCellTombstonesTrackingEnabled()
+            {
+                return 
DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity() == 
Config.TombstonesMetricGranularity.cell;
+            }
+        }
+
+        if (DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity() != 
Config.TombstonesMetricGranularity.disabled)
+            return Transformation.apply(iter, new 
PurgeableTombstonesMetricRecording());
+        else
+            return iter;
+    }
+
     /**
      * Return the queried token(s) for logging
      */
diff --git a/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java 
b/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java
index 5528c92011c..e1defe51e69 100644
--- a/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java
+++ b/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java
@@ -75,6 +75,7 @@ public class TableMetricTables
             new LatencyTableMetric(name, "local_write_latency", t -> 
t.writeLatency.latency),
             new LatencyTableMetric(name, "coordinator_write_latency", t -> 
t.coordinatorWriteLatency),
             new HistogramTableMetric(name, "tombstones_per_read", t -> 
t.tombstoneScannedHistogram.cf),
+            new HistogramTableMetric(name, "purgeable_tombstones_per_read", t 
-> t.purgeableTombstoneScannedHistogram.cf),
             new HistogramTableMetric(name, "rows_per_read", t -> 
t.liveScannedHistogram.cf),
             new StorageTableMetric(name, "disk_usage", (TableMetrics t) -> 
t.totalDiskSpaceUsed),
             new StorageTableMetric(name, "max_partition_size", (TableMetrics 
t) -> t.maxPartitionSize),
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java 
b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 237fd03e2d7..a1916bebd07 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -85,6 +85,8 @@ public class KeyspaceMetrics
     public final Histogram sstablesPerRangeReadHistogram;
     /** Tombstones scanned in queries on this Keyspace */
     public final Histogram tombstoneScannedHistogram;
+    /** Purgeable tombstones scanned in queries on this Keyspace */
+    public final Histogram purgeableTombstoneScannedHistogram;
     /** Live cells scanned in queries on this Keyspace */
     public final Histogram liveScannedHistogram;
     /** Column update time delta on this Keyspace */
@@ -234,6 +236,7 @@ public class KeyspaceMetrics
         sstablesPerReadHistogram = 
createKeyspaceHistogram("SSTablesPerReadHistogram", true);
         sstablesPerRangeReadHistogram = 
createKeyspaceHistogram("SSTablesPerRangeReadHistogram", true);
         tombstoneScannedHistogram = 
createKeyspaceHistogram("TombstoneScannedHistogram", false);
+        purgeableTombstoneScannedHistogram = 
createKeyspaceHistogram("PurgeableTombstoneScannedHistogram", false);
         liveScannedHistogram = createKeyspaceHistogram("LiveScannedHistogram", 
false);
         colUpdateTimeDeltaHistogram = 
createKeyspaceHistogram("ColUpdateTimeDeltaHistogram", false);
         viewLockAcquireTime = createKeyspaceTimer("ViewLockAcquireTime");
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java 
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index ab1ab6eb73a..fabb0814e49 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -156,6 +156,8 @@ public class TableMetrics
     public final Gauge<Long> compressionMetadataOffHeapMemoryUsed;
     /** Tombstones scanned in queries on this CF */
     public final TableHistogram tombstoneScannedHistogram;
+    /** Purgeable tombstones scanned in queries on this CF */
+    public final TableHistogram purgeableTombstoneScannedHistogram;
     /** Live rows scanned in queries on this CF */
     public final TableHistogram liveScannedHistogram;
     /** Column update time delta on this CF */
@@ -768,6 +770,7 @@ public class TableMetrics
         additionalWriteLatencyNanos = 
createTableGauge("AdditionalWriteLatencyNanos", () -> 
MICROSECONDS.toNanos(cfs.additionalWriteLatencyMicros));
 
         tombstoneScannedHistogram = 
createTableHistogram("TombstoneScannedHistogram", 
cfs.keyspace.metric.tombstoneScannedHistogram, false);
+        purgeableTombstoneScannedHistogram = 
createTableHistogram("PurgeableTombstoneScannedHistogram", 
cfs.keyspace.metric.purgeableTombstoneScannedHistogram, true);
         liveScannedHistogram = createTableHistogram("LiveScannedHistogram", 
cfs.keyspace.metric.liveScannedHistogram, false);
         colUpdateTimeDeltaHistogram = 
createTableHistogram("ColUpdateTimeDeltaHistogram", 
cfs.keyspace.metric.colUpdateTimeDeltaHistogram, false);
         coordinatorReadLatency = createTableTimer("CoordinatorReadLatency");
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 89af53b175a..34656eac55a 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -103,6 +103,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.ConfigurationLoader",
     "org.apache.cassandra.config.Config$CorruptedTombstoneStrategy",
     "org.apache.cassandra.config.Config$BatchlogEndpointStrategy",
+    "org.apache.cassandra.config.Config$TombstonesMetricGranularity",
     "org.apache.cassandra.config.DatabaseDescriptor$ByteUnit",
     "org.apache.cassandra.config.DataRateSpec",
     "org.apache.cassandra.config.DataRateSpec$DataRateUnit",
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java 
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index cc4ceee6952..51a49c43658 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -35,6 +35,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -93,6 +94,9 @@ import static org.junit.Assert.fail;
 
 public class ReadCommandTest
 {
+    private static final String CREATE = "1";
+    private static final String DELETE = "-1";
+
     private static final String KEYSPACE = "ReadCommandTest";
     private static final String CF1 = "Standard1";
     private static final String CF2 = "Standard2";
@@ -103,6 +107,12 @@ public class ReadCommandTest
     private static final String CF7 = "Counter7";
     private static final String CF8 = "Standard8";
     private static final String CF9 = "Standard9";
+    private static final String CF10 = "Standard10";
+    private static final String CF11 = "Standard11";
+    private static final String CF12 = "Standard12";
+    private static final String CF13 = "Standard13";
+    private static final String CF14 = "Standard14";
+
 
     private static final InetAddressAndPort REPAIR_COORDINATOR;
     static {
@@ -194,6 +204,61 @@ public class ReadCommandTest
                      .addClusteringColumn("col", 
ReversedType.getInstance(Int32Type.instance))
                      .addRegularColumn("a", AsciiType.instance);
 
+        TableMetadata.Builder metadata10 =
+        TableMetadata.builder(KEYSPACE, CF10)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col", AsciiType.instance)
+                     .addRegularColumn("a", AsciiType.instance)
+                     .addRegularColumn("b", AsciiType.instance)
+                     .addRegularColumn("c", AsciiType.instance)
+                     .addRegularColumn("d", AsciiType.instance)
+                     .addRegularColumn("e", AsciiType.instance)
+                     .addRegularColumn("f", AsciiType.instance);
+
+        TableMetadata.Builder metadata11 =
+        TableMetadata.builder(KEYSPACE, CF11)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col", AsciiType.instance)
+                     .addRegularColumn("a", AsciiType.instance)
+                     .addRegularColumn("b", AsciiType.instance)
+                     .addRegularColumn("c", AsciiType.instance)
+                     .addRegularColumn("d", AsciiType.instance)
+                     .addRegularColumn("e", AsciiType.instance)
+                     .addRegularColumn("f", AsciiType.instance);
+
+        TableMetadata.Builder metadata12 =
+        TableMetadata.builder(KEYSPACE, CF12)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col", AsciiType.instance)
+                     .addRegularColumn("a", AsciiType.instance)
+                     .addRegularColumn("b", AsciiType.instance)
+                     .addRegularColumn("c", AsciiType.instance)
+                     .addRegularColumn("d", AsciiType.instance)
+                     .addRegularColumn("e", AsciiType.instance)
+                     .addRegularColumn("f", AsciiType.instance);
+
+        TableMetadata.Builder metadata13 =
+        TableMetadata.builder(KEYSPACE, CF13)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col", AsciiType.instance)
+                     .addRegularColumn("a", AsciiType.instance)
+                     .addRegularColumn("b", AsciiType.instance)
+                     .addRegularColumn("c", AsciiType.instance)
+                     .addRegularColumn("d", AsciiType.instance)
+                     .addRegularColumn("e", AsciiType.instance)
+                     .addRegularColumn("f", AsciiType.instance);
+
+        TableMetadata.Builder metadata14 =
+        TableMetadata.builder(KEYSPACE, CF14)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col", AsciiType.instance)
+                     .addRegularColumn("a", AsciiType.instance)
+                     .addRegularColumn("b", AsciiType.instance)
+                     .addRegularColumn("c", AsciiType.instance)
+                     .addRegularColumn("d", AsciiType.instance)
+                     .addRegularColumn("e", AsciiType.instance)
+                     .addRegularColumn("f", AsciiType.instance);
+
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE,
                                     KeyspaceParams.simple(1),
@@ -205,7 +270,12 @@ public class ReadCommandTest
                                     metadata6,
                                     metadata7,
                                     metadata8,
-                                    metadata9);
+                                    metadata9,
+                                    metadata10,
+                                    metadata11,
+                                    metadata12,
+                                    metadata13,
+                                    metadata14);
 
         LocalSessionAccessor.startup();
     }
@@ -332,23 +402,23 @@ public class ReadCommandTest
 
         String[][][] groups = new String[][][] {
             new String[][] {
-                new String[] { "1", "key1", "aa", "a" }, // "1" indicates to 
create the data, "-1" to delete the row
-                new String[] { "1", "key2", "bb", "b" },
-                new String[] { "1", "key3", "cc", "c" }
+                new String[] { CREATE, "key1", "aa", "a" },
+                new String[] { CREATE, "key2", "bb", "b" },
+                new String[] { CREATE, "key3", "cc", "c" }
             },
             new String[][] {
-                new String[] { "1", "key3", "dd", "d" },
-                new String[] { "1", "key2", "ee", "e" },
-                new String[] { "1", "key1", "ff", "f" }
+                new String[] { CREATE, "key3", "dd", "d" },
+                new String[] { CREATE, "key2", "ee", "e" },
+                new String[] { CREATE, "key1", "ff", "f" }
             },
             new String[][] {
-                new String[] { "1", "key6", "aa", "a" },
-                new String[] { "1", "key5", "bb", "b" },
-                new String[] { "1", "key4", "cc", "c" }
+                new String[] { CREATE, "key6", "aa", "a" },
+                new String[] { CREATE, "key5", "bb", "b" },
+                new String[] { CREATE, "key4", "cc", "c" }
             },
             new String[][] {
-                new String[] { "-1", "key6", "aa", "a" },
-                new String[] { "-1", "key2", "bb", "b" }
+                new String[] { DELETE, "key6", "aa", "a" },
+                new String[] { DELETE, "key2", "bb", "b" }
             }
         };
 
@@ -371,7 +441,7 @@ public class ReadCommandTest
 
             for (String[] data : group)
             {
-                if (data[0].equals("1"))
+                if (data[0].equals(CREATE))
                 {
                     new RowUpdateBuilder(cfs.metadata(), 0, 
ByteBufferUtil.bytes(data[1]))
                     .clustering(data[2])
@@ -493,33 +563,32 @@ public class ReadCommandTest
 
         String[][][] groups = new String[][][] {
                 new String[][] {
-                        new String[] { "1", "key1", "aa", "a" }, // "1" 
indicates to create the data, "-1" to delete the
-                                                                 // row
-                        new String[] { "1", "key2", "bb", "b" },
-                        new String[] { "1", "key3", "cc", "c" }
+                        new String[] { CREATE, "key1", "aa", "a" },
+                        new String[] { CREATE, "key2", "bb", "b" },
+                        new String[] { CREATE, "key3", "cc", "c" }
                 },
                 new String[][] {
-                        new String[] { "1", "key3", "dd", "d" },
-                        new String[] { "1", "key2", "ee", "e" },
-                        new String[] { "1", "key1", "ff", "f" }
+                        new String[] { CREATE, "key3", "dd", "d" },
+                        new String[] { CREATE, "key2", "ee", "e" },
+                        new String[] { CREATE, "key1", "ff", "f" }
                 },
                 new String[][] {
-                        new String[] { "1", "key6", "aa", "a" },
-                        new String[] { "1", "key5", "bb", "b" },
-                        new String[] { "1", "key4", "cc", "c" }
+                        new String[] { CREATE, "key6", "aa", "a" },
+                        new String[] { CREATE, "key5", "bb", "b" },
+                        new String[] { CREATE, "key4", "cc", "c" }
                 },
                 new String[][] {
-                        new String[] { "1", "key2", "aa", "a" },
-                        new String[] { "1", "key2", "cc", "c" },
-                        new String[] { "1", "key2", "dd", "d" }
+                        new String[] { CREATE, "key2", "aa", "a" },
+                        new String[] { CREATE, "key2", "cc", "c" },
+                        new String[] { CREATE, "key2", "dd", "d" }
                 },
                 new String[][] {
-                        new String[] { "-1", "key6", "aa", "a" },
-                        new String[] { "-1", "key2", "bb", "b" },
-                        new String[] { "-1", "key2", "ee", "e" },
-                        new String[] { "-1", "key2", "aa", "a" },
-                        new String[] { "-1", "key2", "cc", "c" },
-                        new String[] { "-1", "key2", "dd", "d" }
+                        new String[] { DELETE, "key6", "aa", "a" },
+                        new String[] { DELETE, "key2", "bb", "b" },
+                        new String[] { DELETE, "key2", "ee", "e" },
+                        new String[] { DELETE, "key2", "aa", "a" },
+                        new String[] { DELETE, "key2", "cc", "c" },
+                        new String[] { DELETE, "key2", "dd", "d" }
                 }
         };
 
@@ -539,7 +608,7 @@ public class ReadCommandTest
 
             for (String[] data : group)
             {
-                if (data[0].equals("1"))
+                if (data[0].equals(CREATE))
                 {
                     new RowUpdateBuilder(cfs.metadata(), 0, 
ByteBufferUtil.bytes(data[1]))
                             .clustering(data[2])
@@ -582,20 +651,19 @@ public class ReadCommandTest
 
         String[][][] groups = new String[][][] {
                 new String[][] {
-                        new String[] { "1", "key1", "aa", "a" }, // "1" 
indicates to create the data, "-1" to delete the
-                                                                 // row
-                        new String[] { "1", "key2", "bb", "b" },
-                        new String[] { "1", "key3", "cc", "c" }
+                        new String[] { CREATE, "key1", "aa", "a" },
+                        new String[] { CREATE, "key2", "bb", "b" },
+                        new String[] { CREATE, "key3", "cc", "c" }
                 },
                 new String[][] {
-                        new String[] { "1", "key3", "dd", "d" },
-                        new String[] { "1", "key2", "ee", "e" },
-                        new String[] { "1", "key1", "ff", "f" }
+                        new String[] { CREATE, "key3", "dd", "d" },
+                        new String[] { CREATE, "key2", "ee", "e" },
+                        new String[] { CREATE, "key1", "ff", "f" }
                 },
                 new String[][] {
-                        new String[] { "1", "key6", "aa", "a" },
-                        new String[] { "1", "key5", "bb", "b" },
-                        new String[] { "1", "key4", "cc", "c" }
+                        new String[] { CREATE, "key6", "aa", "a" },
+                        new String[] { CREATE, "key5", "bb", "b" },
+                        new String[] { CREATE, "key4", "cc", "c" }
                 }
         };
 
@@ -615,7 +683,7 @@ public class ReadCommandTest
 
             for (String[] data : group)
             {
-                if (data[0].equals("1"))
+                if (data[0].equals(CREATE))
                 {
                     new RowUpdateBuilder(cfs.metadata(), 0, 
ByteBufferUtil.bytes(data[1]))
                             .clustering(data[2])
@@ -651,6 +719,315 @@ public class ReadCommandTest
         assertEquals(1, 
cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax());
     }
 
+    @Test
+    public void testCountPurgeableRowTombstones() throws Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF10);
+        TestWriteOperation[] operations = new TestWriteOperation[]
+        {
+            TestWriteOperation.insert("key1", "aa", "a"),
+            TestWriteOperation.insert("key1", "ff", "f"),
+
+            TestWriteOperation.insert("key2", "aa", "e"),
+            TestWriteOperation.deleteRow("key2", "aa", PURGEABLE_DELETION),
+            TestWriteOperation.deleteRow("key2", "bb", NEW_DELETION),
+            TestWriteOperation.deleteRow("key2", "cc", PURGEABLE_DELETION),
+            TestWriteOperation.deleteRow("key2", "dd", PURGEABLE_DELETION),
+            TestWriteOperation.deleteRow("key2", "ee", NEW_DELETION),
+        };
+
+        runTestWriteOperationsAndReadResults(cfs, operations, 
Config.TombstonesMetricGranularity.row);
+
+        assertEquals(2, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount());
+        assertEquals(0, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin());
+        assertEquals(3, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax());
+    }
+    @Test
+    public void testCountPurgeablePartitionTombstones() throws Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF11);
+        TestWriteOperation[] operations = new TestWriteOperation[]
+        {
+          TestWriteOperation.insert("key1", "aa", "a"),
+          TestWriteOperation.insert("key1", "ff", "f"),
+
+          TestWriteOperation.insert("key2", "aa", "a"),
+          TestWriteOperation.insert("key2", "cc", "c"),
+          TestWriteOperation.insert("key2", "dd", "d"),
+
+          TestWriteOperation.deletePartition("key2", PURGEABLE_DELETION),
+          TestWriteOperation.deletePartition("key3", NEW_DELETION)
+        };
+        runTestWriteOperationsAndReadResults(cfs, operations, 
Config.TombstonesMetricGranularity.row);
+
+        assertEquals(3, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount());
+        assertEquals(0, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin());
+        assertEquals(1, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax());
+    }
+
+    @Test
+    public void testCountPurgeableCellTombstones() throws Exception
+    {
+        
DatabaseDescriptor.setPurgeableTobmstonesMetricGranularity(Config.TombstonesMetricGranularity.cell);
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF12);
+        TestWriteOperation[] operations = new TestWriteOperation[]
+        {
+          TestWriteOperation.insert("key1", "aa", "a"),
+          TestWriteOperation.insert("key1", "ff", "f"),
+
+          TestWriteOperation.insert("key2", "aa", "a"),
+          TestWriteOperation.deleteCell("key2", "aa", "b", PURGEABLE_DELETION),
+          TestWriteOperation.deleteCell("key2", "aa", "f", NEW_DELETION),
+          TestWriteOperation.insert("key2", "cc", "c"),
+          TestWriteOperation.insert("key2", "dd", "d")
+        };
+        runTestWriteOperationsAndReadResults(cfs, operations, 
Config.TombstonesMetricGranularity.cell);
+
+        assertEquals(2, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount());
+        assertEquals(0, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin());
+        assertEquals(1, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax());
+
+    }
+
+    /**
+     * Test purgeable tombstones count for range tombstones with 
non-overlapping ranges,
+     * i.e. only Bound (not Boundary) Markers will be created and counted
+     */
+    @Test
+    public void testCountPurgeableRangeTombstones_nonOverlappingRanges() 
throws Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF13);
+        TestWriteOperation[] operations = new TestWriteOperation[]
+        {
+          TestWriteOperation.insert("key1", "aa", "a"),
+          TestWriteOperation.insert("key1", "ff", "f"),
+
+          TestWriteOperation.insert("key2", "aa", "a"),
+          TestWriteOperation.insert("key2", "cc", "c"),
+          TestWriteOperation.insert("key2", "dd", "d"),
+
+          TestWriteOperation.deleteRange("key2", "aa", "bb", NEW_DELETION),
+          TestWriteOperation.deleteRange("key2", "dd", "ee", 
PURGEABLE_DELETION),
+          TestWriteOperation.deleteRange("key2", "ff", "ff", 
PURGEABLE_DELETION)
+        };
+        runTestWriteOperationsAndReadResults(cfs, operations, 
Config.TombstonesMetricGranularity.row);
+
+        assertEquals(2, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount());
+        assertEquals(0, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin());
+        assertEquals(4, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax());
+    }
+
+    /**
+     * Test purgeable tombstones count for range tombstones with overlapping 
ranges
+     */
+    @Test
+    public void testCountPurgeableRangeTombstones_overlappingRanges() throws 
Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF14);
+
+        TestWriteOperation[] operations = new TestWriteOperation[]
+        {
+          TestWriteOperation.insert("key1", "aa", "a"),
+          TestWriteOperation.insert("key1", "ff", "f"),
+
+          TestWriteOperation.insert("key2", "aa", "a"),
+          TestWriteOperation.insert("key2", "bb", "b"),
+          TestWriteOperation.insert("key2", "cc", "c"),
+          TestWriteOperation.insert("key2", "dd", "d"),
+          TestWriteOperation.insert("key2", "ee", "e"),
+
+          // this range tombstone is non-purgeable and overlaps with the next 
one,
+          // so it will create one non-purgeable bound marker
+          // and one non-purgeable boundary marker so TWO NON-PURGEABLE 
tombstones
+          TestWriteOperation.deleteRange("key2", "aa", "bb", NEW_DELETION),
+
+          // this range tombstone is purgeable and overlaps with previous and 
next ones,
+          // so it will create one non-purgeable bound marker
+          // and one non-purgeable boundary marker so TWO non-purgeable 
tombstones will be counted
+          TestWriteOperation.deleteRange("key2", "bb", "ee", 
PURGEABLE_DELETION),
+
+          // this range tombstone is purgeable and overlaps with previous one,
+          // it has a different deletion time to not combine into a single 
range,
+          // so it will create one non-purgeable boundary marker (same as 
previous one)
+          // and one purgeable bound marker, so it will increment purgeable 
tombstones counter on one,
+          // we expect TWO purgeable tombstones in total
+          TestWriteOperation.deleteRange("key2", "ee", "ff", 
PURGEABLE_DELETION - 1)
+        };
+
+        runTestWriteOperationsAndReadResults(cfs, operations, 
Config.TombstonesMetricGranularity.row);
+
+        assertEquals(0, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin());
+        assertEquals(2, 
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax());
+    }
+
+
+    private static void runTestWriteOperationsAndReadResults(ColumnFamilyStore 
cfs, TestWriteOperation[] operations, Config.TombstonesMetricGranularity 
granularity) throws IOException
+    {
+        Config.TombstonesMetricGranularity original = 
DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity();
+        
DatabaseDescriptor.setPurgeableTobmstonesMetricGranularity(granularity);
+        try
+        {
+            Set<String> usedPartitionKeys = runWriteOperations(cfs, 
operations);
+            runPartitionReadCommands(cfs, usedPartitionKeys);
+        }
+        finally
+        {
+            
DatabaseDescriptor.setPurgeableTobmstonesMetricGranularity(original);
+            cfs.truncateBlocking();
+        }
+    }
+
+    private static void runPartitionReadCommands(ColumnFamilyStore cfs, 
Set<String> partitionKeys) throws IOException
+    {
+        List<SinglePartitionReadCommand> commands = new 
ArrayList<>(partitionKeys.size());
+        long nowInSeconds = FBUtilities.nowInSeconds(); // all reads within a 
group must have the same nowInSec
+        for (String partitionKey : partitionKeys)
+        {
+            commands.add(getWholePartitionReadCommand(cfs, partitionKey, 
nowInSeconds));
+        }
+        executeReadCommands(commands);
+    }
+
+    private static Set<String> runWriteOperations(ColumnFamilyStore cfs, 
TestWriteOperation[] operations)
+    {
+        Set<String> usedPartitionKeys = new HashSet<>();
+        for (TestWriteOperation operation : operations)
+        {
+            if (operation.type == OperationType.CREATE)
+            {
+                new RowUpdateBuilder(cfs.metadata(), 0, 
ByteBufferUtil.bytes(operation.partitionKey))
+                .clustering(operation.clusteringKey)
+                .add(operation.columnName, 
ByteBufferUtil.bytes(operation.columnValue))
+                .build()
+                .apply();
+            }
+            else if (operation.type == OperationType.DELETE_PARTITION)
+            {
+                new Mutation(PartitionUpdate.simpleBuilder(cfs.metadata(), 
ByteBufferUtil.bytes(operation.partitionKey))
+                                            .nowInSec(operation.deletionTime)
+                                            .delete()
+                                            .build()).apply();
+            }
+            else if (operation.type == OperationType.DELETE_RANGE)
+            {
+                new RowUpdateBuilder(cfs.metadata(), operation.deletionTime, 
0L, ByteBufferUtil.bytes(operation.partitionKey))
+                .addRangeTombstone(operation.clusteringRangeStart, 
operation.clusteringRangeEnd).build().apply();
+            }
+            else if (operation.type == OperationType.DELETE_ROW)
+            {
+                RowUpdateBuilder.deleteRowAt(cfs.metadata(), 0, 
operation.deletionTime,
+                                             
ByteBufferUtil.bytes(operation.partitionKey), operation.clusteringKey
+                ).apply();
+            }
+            else if (operation.type == OperationType.DELETE_CELL)
+            {
+                new RowUpdateBuilder(cfs.metadata(), operation.deletionTime, 
0L, ByteBufferUtil.bytes(operation.partitionKey))
+                .clustering(operation.clusteringKey)
+                .delete(operation.columnName)
+                .build().apply();
+            }
+
+            usedPartitionKeys.add(operation.partitionKey);
+        }
+        return usedPartitionKeys;
+    }
+
+    private static final long NEW_DELETION = FBUtilities.nowInSeconds();
+    private static final long PURGEABLE_DELETION = 42;
+
+    private enum OperationType
+    {
+        CREATE,
+        DELETE_PARTITION,
+        DELETE_RANGE,
+        DELETE_ROW,
+        DELETE_CELL
+    }
+
+    private static class TestWriteOperation
+    {
+        OperationType type;
+        String partitionKey;
+        String clusteringKey;
+
+        String clusteringRangeStart, clusteringRangeEnd;
+        String columnName;
+        String columnValue = "bla";
+
+        long deletionTime;
+
+        public TestWriteOperation(OperationType type, String partitionKey,
+                                  String clusteringKey, String 
clusteringRangeStart, String clusteringRangeEnd,
+                                  String columnName, long deletionTime)
+        {
+            this.type = type;
+            this.partitionKey = partitionKey;
+            this.clusteringKey = clusteringKey;
+            this.clusteringRangeStart = clusteringRangeStart;
+            this.clusteringRangeEnd = clusteringRangeEnd;
+            this.columnName = columnName;
+            this.deletionTime = deletionTime;
+        }
+
+        public static TestWriteOperation insert(String partitionKey, String 
clusteringKey,
+                                                String columnName)
+        {
+            return new TestWriteOperation(OperationType.CREATE, partitionKey, 
clusteringKey, null, null, columnName, 0);
+        }
+
+        public static TestWriteOperation deletePartition(String partitionKey, 
long deletionTime)
+        {
+            return new TestWriteOperation(OperationType.DELETE_PARTITION, 
partitionKey,
+                                          null, null, null, null, 
deletionTime);
+        }
+
+        public static TestWriteOperation deleteRange(String partitionKey, 
String clusteringRangeStart, String clusteringRangeEnd, long deletionTime)
+        {
+            return new TestWriteOperation(OperationType.DELETE_RANGE, 
partitionKey,
+                                          null, clusteringRangeStart, 
clusteringRangeEnd, null, deletionTime);
+        }
+
+        public static TestWriteOperation deleteRow(String partitionKey, String 
clusteringKey, long deletionTime)
+        {
+            return new TestWriteOperation(OperationType.DELETE_ROW, 
partitionKey, clusteringKey,
+                                          null, null, null, deletionTime);
+        }
+
+        public static TestWriteOperation deleteCell(String partitionKey, 
String clusteringKey, String columnName, long deletionTime)
+        {
+            return new TestWriteOperation(OperationType.DELETE_CELL, 
partitionKey, clusteringKey,
+                                          null, null, columnName, 
deletionTime);
+        }
+
+
+    }
+
+    private static void executeReadCommands(List<SinglePartitionReadCommand> 
commands) throws IOException
+    {
+        ReadQuery query = SinglePartitionReadCommand.Group.create(commands, 
DataLimits.NONE);
+
+        try (ReadExecutionController executionController = 
query.executionController();
+             UnfilteredPartitionIterator iter = 
query.executeLocally(executionController);
+             DataOutputBuffer buffer = new DataOutputBuffer())
+        {
+            
UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter,
+                                                                            
query.columnFilter(),
+                                                                            
buffer,
+                                                                            
MessagingService.current_version);
+        }
+    }
+
+    private static SinglePartitionReadCommand 
getWholePartitionReadCommand(ColumnFamilyStore cfs, String partitionKey, long 
nowInSeconds)
+    {
+        ColumnFilter columnFilter = 
ColumnFilter.allRegularColumnsBuilder(cfs.metadata(), false).build();
+        RowFilter rowFilter = RowFilter.create(true);
+        Slice slice = Slice.make(BufferClusteringBound.BOTTOM, 
BufferClusteringBound.TOP);
+        ClusteringIndexSliceFilter sliceFilter = new 
ClusteringIndexSliceFilter(Slices.with(cfs.metadata().comparator, slice), 
false);
+        return SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds,
+                                                 columnFilter, rowFilter,
+                                                 DataLimits.NONE, 
Util.dk(partitionKey), sliceFilter);
+    }
+
     @Test
     public void testSinglePartitionSliceRepairedDataTracking() throws Exception
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to