This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new d7258ac8f31 Add table metric PurgeableTombstoneScannedHistogram and a
tracing event for scanned purgeable tombstones
d7258ac8f31 is described below
commit d7258ac8f31eedc9c28f8c5e381893d192209d48
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Sun Dec 8 16:10:01 2024 +0300
Add table metric PurgeableTombstoneScannedHistogram and a tracing event for
scanned purgeable tombstones
patch by Dmitry Konstantinov; reviewed by Chris Lohfink, Stefan Miklosovic
for CASSANDRA-20132
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 14 +
conf/cassandra_latest.yaml | 14 +
.../pages/managing/operating/metrics.adoc | 3 +
src/java/org/apache/cassandra/config/Config.java | 30 ++
.../cassandra/config/DatabaseDescriptor.java | 11 +
src/java/org/apache/cassandra/db/ReadCommand.java | 139 ++++++
.../cassandra/db/virtual/TableMetricTables.java | 1 +
.../apache/cassandra/metrics/KeyspaceMetrics.java | 3 +
.../org/apache/cassandra/metrics/TableMetrics.java | 3 +
.../config/DatabaseDescriptorRefTest.java | 1 +
.../org/apache/cassandra/db/ReadCommandTest.java | 465 +++++++++++++++++++--
12 files changed, 641 insertions(+), 44 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a749676fb1..28ab975f79e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Add table metric PurgeableTombstoneScannedHistogram and a tracing event for
scanned purgeable tombstones (CASSANDRA-20132)
* Make sure we can parse the expanded CQL before writing it to the log or
sending it to replicas (CASSANDRA-20218)
* Add format_bytes and format_time functions (CASSANDRA-19546)
* Fix error when trying to assign a tuple to target type not being a tuple
(CASSANDRA-20237)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 55fc50f6c6e..58f8945d653 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1964,6 +1964,20 @@ transparent_data_encryption_options:
tombstone_warn_threshold: 1000
tombstone_failure_threshold: 100000
+# Controls the granularity of purgeable tombstones reported to
PurgeableTombstoneScannedHistogram table metric
+# Possible values:
+# 'disabled' - do not collect the metric at all
+# 'row' - track only partition/range/row level tombstone,
+# a good compromise between overheads and usability.
+# For CPU-bound workload you may get less than 1% of overhead for
throughput.
+# For IO-bound workload the overhead is negligible.
+# 'cell' - track partition/range/row/cell level tombstones.
+# This is the most granular option,
+# but it has some performance overheads due to iteration over
cells.
+# For CPU-bound workload you may get about 5% of overhead for
throughput.
+# For IO-bound workload the overhead is almost negligible.
+# tombstone_read_purgeable_metric_granularity: disabled
+
# Filtering and secondary index queries at read consistency levels above
ONE/LOCAL_ONE use a
# mechanism called replica filtering protection to ensure that results from
stale replicas do
# not violate consistency. (See CASSANDRA-8272 and CASSANDRA-15907 for more
details.) This
diff --git a/conf/cassandra_latest.yaml b/conf/cassandra_latest.yaml
index 387565fe185..ce1d97c476d 100644
--- a/conf/cassandra_latest.yaml
+++ b/conf/cassandra_latest.yaml
@@ -1838,6 +1838,20 @@ transparent_data_encryption_options:
tombstone_warn_threshold: 1000
tombstone_failure_threshold: 100000
+# Controls the granularity of purgeable tombstones reported to
PurgeableTombstoneScannedHistogram table metric
+# Possible values:
+# 'disabled' - do not collect the metric at all
+# 'row' - track only partition/range/row level tombstone,
+# a good compromise between overheads and usability.
+# For CPU-bound workload you may get less than 1% of overhead for
throughput.
+# For IO-bound workload the overhead is negligible.
+# 'cell' - track partition/range/row/cell level tombstones.
+# This is the most granular option,
+# but it has some performance overheads due to iteration over
cells.
+# For CPU-bound workload you may get about 5% of overhead for
throughput.
+# For IO-bound workload the overhead is almost negligible.
+# tombstone_read_purgeable_metric_granularity: disabled
+
# Filtering and secondary index queries at read consistency levels above
ONE/LOCAL_ONE use a
# mechanism called replica filtering protection to ensure that results from
stale replicas do
# not violate consistency. (See CASSANDRA-8272 and CASSANDRA-15907 for more
details.) This
diff --git a/doc/modules/cassandra/pages/managing/operating/metrics.adoc
b/doc/modules/cassandra/pages/managing/operating/metrics.adoc
index 78dbd165867..4f3d66652c2 100644
--- a/doc/modules/cassandra/pages/managing/operating/metrics.adoc
+++ b/doc/modules/cassandra/pages/managing/operating/metrics.adoc
@@ -181,6 +181,9 @@ by compression meta data.
|TombstoneScannedHistogram |Histogram |Histogram of tombstones scanned
in queries on this table.
+|PurgeableTombstoneScannedHistogram |Histogram |Histogram of purgeable
tombstones scanned
+in queries on this table. Use tombstone_read_purgeable_metric_granularity
property in cassandra.yaml to enable it.
+
|LiveScannedHistogram |Histogram |Histogram of live cells scanned in
queries on this table.
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 932423d2488..77f7d9a3417 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -544,6 +544,8 @@ public class Config
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
+ public TombstonesMetricGranularity
tombstone_read_purgeable_metric_granularity =
TombstonesMetricGranularity.disabled;
+
public final ReplicaFilteringProtectionOptions
replica_filtering_protection = new ReplicaFilteringProtectionOptions();
@Replaces(oldName = "index_summary_capacity_in_mb", converter =
Converters.MEBIBYTES_DATA_STORAGE_LONG, deprecated = true)
@@ -1327,6 +1329,34 @@ public class Config
}
}
+ /**
+ * Allow to control the granularity of metrics related to tombstones.
+ * It is a trade-off between granularity of a metric vs performance
overheads.
+ * See CASSANDRA-20132 for more details.
+ */
+ public enum TombstonesMetricGranularity
+ {
+ /**
+ * Do not collect the metric at all.
+ */
+ disabled,
+ /**
+ * Track only partition/range/row level tombstone,
+ * a good compromise between overheads and usability.
+ * For CPU-bound workload you may get less than 1% of overhead for
throughput.
+ * For IO-bound workload the overhead is negligible.
+ */
+ row,
+ /**
+ * Track partition/range/row/cell level tombstones.
+ * This is the most granular option,
+ * but it has some performance overheads due to iteration over cells.
+ * For CPU-bound workload you may get about 5% of overhead for
throughput.
+ * For IO-bound workload the overhead is almost negligible.
+ */
+ cell
+ }
+
private static final Set<String> SENSITIVE_KEYS = new HashSet<String>() {{
add("client_encryption_options");
add("server_encryption_options");
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8a09f4fd76e..b784571d01f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -5523,4 +5523,15 @@ public class DatabaseDescriptor
{
return conf.password_validator_reconfiguration_enabled;
}
+
+ public static Config.TombstonesMetricGranularity
getPurgeableTobmstonesMetricGranularity()
+ {
+ return conf.tombstone_read_purgeable_metric_granularity;
+ }
+
+ @VisibleForTesting
+ public static void
setPurgeableTobmstonesMetricGranularity(Config.TombstonesMetricGranularity
granularity)
+ {
+ conf.tombstone_read_purgeable_metric_granularity = granularity;
+ }
}
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 3bb50cbcfae..e4ea5f12d74 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -466,6 +466,7 @@ public abstract class ReadCommand extends AbstractReadQuery
iterator = withQuerySizeTracking(iterator);
iterator = maybeSlowDownForTesting(iterator);
iterator = withQueryCancellation(iterator);
+ iterator = maybeRecordPurgeableTombstones(iterator, cfs);
iterator =
RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs,
executionController), Stage.PURGED, false);
iterator = withMetricsRecording(iterator, cfs.metric,
startTimeNanos);
@@ -894,6 +895,144 @@ public abstract class ReadCommand extends
AbstractReadQuery
return Transformation.apply(iterator, new
WithoutPurgeableTombstones());
}
+
+ /**
+ * Wraps the provided iterator so that metrics on count of purgeable
tombstones are tracked and traced.
+ * It tracks only tombstones with localDeletionTime < now -
gc_grace_period.
+ * Other (non-purgeable) tombstones will be tracked by regular Cassandra
logic later.
+ */
+ private UnfilteredPartitionIterator
maybeRecordPurgeableTombstones(UnfilteredPartitionIterator iter,
+
ColumnFamilyStore cfs)
+ {
+ class PurgeableTombstonesMetricRecording extends
Transformation<UnfilteredRowIterator>
+ {
+ private int purgeableTombstones = 0;
+
+ @Override
+ public UnfilteredRowIterator
applyToPartition(UnfilteredRowIterator iter)
+ {
+ if (!iter.partitionLevelDeletion().isLive())
+ purgeableTombstones++;
+ return Transformation.apply(iter, this);
+ }
+
+ @Override
+ public Row applyToStatic(Row row)
+ {
+ return applyToRow(row);
+ }
+
+ @Override
+ public Row applyToRow(Row row)
+ {
+ final long nowInSec = nowInSec();
+ boolean hasTombstones = false;
+
+ if (isPurgeableCellTombstonesTrackingEnabled())
+ {
+ for (Cell<?> cell : row.cells())
+ {
+ if (!cell.isLive(nowInSec) &&
isPurgeable(cell.localDeletionTime(), nowInSec))
+ {
+ purgeableTombstones++;
+ hasTombstones = true; // allows to avoid counting
an extra tombstone if the whole row expired
+ }
+ }
+ }
+
+ // we replicate the logic is used for non-purged tombstones
metric here
+ if (!row.primaryKeyLivenessInfo().isLive(nowInSec)
+ && row.hasDeletion(nowInSec)
+ && isPurgeable(row.deletion().time(), nowInSec)
+ && !hasTombstones)
+ {
+ // We're counting primary key deletions only here.
+ purgeableTombstones++;
+ }
+
+ return row;
+ }
+
+ @Override
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker
marker)
+ {
+ final long nowInSec = nowInSec();
+
+ // for boundary markers - increment metric only if both -
close and open - markers are purgeable
+ if (marker.isBoundary())
+ {
+ countIfBothPurgeable(marker.closeDeletionTime(false),
+ marker.openDeletionTime(false),
+ nowInSec);
+ }
+ // for bound markers - just increment if it is purgeable
+ else if (marker instanceof RangeTombstoneBoundMarker)
+ {
+ countIfPurgeable(((RangeTombstoneBoundMarker)
marker).deletionTime(), nowInSec);
+ }
+
+ return marker;
+ }
+
+ @Override
+ public void onClose()
+ {
+
cfs.metric.purgeableTombstoneScannedHistogram.update(purgeableTombstones);
+ if (purgeableTombstones > 0)
+ Tracing.trace("Read {} purgeable tombstone cells",
purgeableTombstones);
+ }
+
+ /**
+ * Increments if both - close and open - deletion times less than
(now - gc_grace_period)
+ */
+ private void countIfBothPurgeable(DeletionTime closeDeletionTime,
+ DeletionTime openDeletionTime,
+ long nowInSec)
+ {
+ if (isPurgeable(closeDeletionTime, nowInSec) &&
isPurgeable(openDeletionTime, nowInSec))
+ purgeableTombstones++;
+ }
+
+ /**
+ * Increments if deletion time less than (now - gc_grace_period)
+ */
+ private void countIfPurgeable(DeletionTime deletionTime,
+ long nowInSec)
+ {
+ if (isPurgeable(deletionTime, nowInSec))
+ purgeableTombstones++;
+ }
+
+ /**
+ * Checks that deletion time < now - gc_grace_period
+ */
+ private boolean isPurgeable(DeletionTime deletionTime,
+ long nowInSec)
+ {
+ return isPurgeable(deletionTime.localDeletionTime(), nowInSec);
+ }
+
+ /**
+ * Checks that deletion time < now - gc_grace_period
+ */
+ private boolean isPurgeable(long localDeletionTime,
+ long nowInSec)
+ {
+ return localDeletionTime < cfs.gcBefore(nowInSec);
+ }
+
+ private boolean isPurgeableCellTombstonesTrackingEnabled()
+ {
+ return
DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity() ==
Config.TombstonesMetricGranularity.cell;
+ }
+ }
+
+ if (DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity() !=
Config.TombstonesMetricGranularity.disabled)
+ return Transformation.apply(iter, new
PurgeableTombstonesMetricRecording());
+ else
+ return iter;
+ }
+
/**
* Return the queried token(s) for logging
*/
diff --git a/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java
b/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java
index 5528c92011c..e1defe51e69 100644
--- a/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java
+++ b/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java
@@ -75,6 +75,7 @@ public class TableMetricTables
new LatencyTableMetric(name, "local_write_latency", t ->
t.writeLatency.latency),
new LatencyTableMetric(name, "coordinator_write_latency", t ->
t.coordinatorWriteLatency),
new HistogramTableMetric(name, "tombstones_per_read", t ->
t.tombstoneScannedHistogram.cf),
+ new HistogramTableMetric(name, "purgeable_tombstones_per_read", t
-> t.purgeableTombstoneScannedHistogram.cf),
new HistogramTableMetric(name, "rows_per_read", t ->
t.liveScannedHistogram.cf),
new StorageTableMetric(name, "disk_usage", (TableMetrics t) ->
t.totalDiskSpaceUsed),
new StorageTableMetric(name, "max_partition_size", (TableMetrics
t) -> t.maxPartitionSize),
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 237fd03e2d7..a1916bebd07 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -85,6 +85,8 @@ public class KeyspaceMetrics
public final Histogram sstablesPerRangeReadHistogram;
/** Tombstones scanned in queries on this Keyspace */
public final Histogram tombstoneScannedHistogram;
+ /** Purgeable tombstones scanned in queries on this Keyspace */
+ public final Histogram purgeableTombstoneScannedHistogram;
/** Live cells scanned in queries on this Keyspace */
public final Histogram liveScannedHistogram;
/** Column update time delta on this Keyspace */
@@ -234,6 +236,7 @@ public class KeyspaceMetrics
sstablesPerReadHistogram =
createKeyspaceHistogram("SSTablesPerReadHistogram", true);
sstablesPerRangeReadHistogram =
createKeyspaceHistogram("SSTablesPerRangeReadHistogram", true);
tombstoneScannedHistogram =
createKeyspaceHistogram("TombstoneScannedHistogram", false);
+ purgeableTombstoneScannedHistogram =
createKeyspaceHistogram("PurgeableTombstoneScannedHistogram", false);
liveScannedHistogram = createKeyspaceHistogram("LiveScannedHistogram",
false);
colUpdateTimeDeltaHistogram =
createKeyspaceHistogram("ColUpdateTimeDeltaHistogram", false);
viewLockAcquireTime = createKeyspaceTimer("ViewLockAcquireTime");
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index ab1ab6eb73a..fabb0814e49 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -156,6 +156,8 @@ public class TableMetrics
public final Gauge<Long> compressionMetadataOffHeapMemoryUsed;
/** Tombstones scanned in queries on this CF */
public final TableHistogram tombstoneScannedHistogram;
+ /** Purgeable tombstones scanned in queries on this CF */
+ public final TableHistogram purgeableTombstoneScannedHistogram;
/** Live rows scanned in queries on this CF */
public final TableHistogram liveScannedHistogram;
/** Column update time delta on this CF */
@@ -768,6 +770,7 @@ public class TableMetrics
additionalWriteLatencyNanos =
createTableGauge("AdditionalWriteLatencyNanos", () ->
MICROSECONDS.toNanos(cfs.additionalWriteLatencyMicros));
tombstoneScannedHistogram =
createTableHistogram("TombstoneScannedHistogram",
cfs.keyspace.metric.tombstoneScannedHistogram, false);
+ purgeableTombstoneScannedHistogram =
createTableHistogram("PurgeableTombstoneScannedHistogram",
cfs.keyspace.metric.purgeableTombstoneScannedHistogram, true);
liveScannedHistogram = createTableHistogram("LiveScannedHistogram",
cfs.keyspace.metric.liveScannedHistogram, false);
colUpdateTimeDeltaHistogram =
createTableHistogram("ColUpdateTimeDeltaHistogram",
cfs.keyspace.metric.colUpdateTimeDeltaHistogram, false);
coordinatorReadLatency = createTableTimer("CoordinatorReadLatency");
diff --git
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 89af53b175a..34656eac55a 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -103,6 +103,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.ConfigurationLoader",
"org.apache.cassandra.config.Config$CorruptedTombstoneStrategy",
"org.apache.cassandra.config.Config$BatchlogEndpointStrategy",
+ "org.apache.cassandra.config.Config$TombstonesMetricGranularity",
"org.apache.cassandra.config.DatabaseDescriptor$ByteUnit",
"org.apache.cassandra.config.DataRateSpec",
"org.apache.cassandra.config.DataRateSpec$DataRateUnit",
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index cc4ceee6952..51a49c43658 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -35,6 +35,7 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
@@ -93,6 +94,9 @@ import static org.junit.Assert.fail;
public class ReadCommandTest
{
+ private static final String CREATE = "1";
+ private static final String DELETE = "-1";
+
private static final String KEYSPACE = "ReadCommandTest";
private static final String CF1 = "Standard1";
private static final String CF2 = "Standard2";
@@ -103,6 +107,12 @@ public class ReadCommandTest
private static final String CF7 = "Counter7";
private static final String CF8 = "Standard8";
private static final String CF9 = "Standard9";
+ private static final String CF10 = "Standard10";
+ private static final String CF11 = "Standard11";
+ private static final String CF12 = "Standard12";
+ private static final String CF13 = "Standard13";
+ private static final String CF14 = "Standard14";
+
private static final InetAddressAndPort REPAIR_COORDINATOR;
static {
@@ -194,6 +204,61 @@ public class ReadCommandTest
.addClusteringColumn("col",
ReversedType.getInstance(Int32Type.instance))
.addRegularColumn("a", AsciiType.instance);
+ TableMetadata.Builder metadata10 =
+ TableMetadata.builder(KEYSPACE, CF10)
+ .addPartitionKeyColumn("key", BytesType.instance)
+ .addClusteringColumn("col", AsciiType.instance)
+ .addRegularColumn("a", AsciiType.instance)
+ .addRegularColumn("b", AsciiType.instance)
+ .addRegularColumn("c", AsciiType.instance)
+ .addRegularColumn("d", AsciiType.instance)
+ .addRegularColumn("e", AsciiType.instance)
+ .addRegularColumn("f", AsciiType.instance);
+
+ TableMetadata.Builder metadata11 =
+ TableMetadata.builder(KEYSPACE, CF11)
+ .addPartitionKeyColumn("key", BytesType.instance)
+ .addClusteringColumn("col", AsciiType.instance)
+ .addRegularColumn("a", AsciiType.instance)
+ .addRegularColumn("b", AsciiType.instance)
+ .addRegularColumn("c", AsciiType.instance)
+ .addRegularColumn("d", AsciiType.instance)
+ .addRegularColumn("e", AsciiType.instance)
+ .addRegularColumn("f", AsciiType.instance);
+
+ TableMetadata.Builder metadata12 =
+ TableMetadata.builder(KEYSPACE, CF12)
+ .addPartitionKeyColumn("key", BytesType.instance)
+ .addClusteringColumn("col", AsciiType.instance)
+ .addRegularColumn("a", AsciiType.instance)
+ .addRegularColumn("b", AsciiType.instance)
+ .addRegularColumn("c", AsciiType.instance)
+ .addRegularColumn("d", AsciiType.instance)
+ .addRegularColumn("e", AsciiType.instance)
+ .addRegularColumn("f", AsciiType.instance);
+
+ TableMetadata.Builder metadata13 =
+ TableMetadata.builder(KEYSPACE, CF13)
+ .addPartitionKeyColumn("key", BytesType.instance)
+ .addClusteringColumn("col", AsciiType.instance)
+ .addRegularColumn("a", AsciiType.instance)
+ .addRegularColumn("b", AsciiType.instance)
+ .addRegularColumn("c", AsciiType.instance)
+ .addRegularColumn("d", AsciiType.instance)
+ .addRegularColumn("e", AsciiType.instance)
+ .addRegularColumn("f", AsciiType.instance);
+
+ TableMetadata.Builder metadata14 =
+ TableMetadata.builder(KEYSPACE, CF14)
+ .addPartitionKeyColumn("key", BytesType.instance)
+ .addClusteringColumn("col", AsciiType.instance)
+ .addRegularColumn("a", AsciiType.instance)
+ .addRegularColumn("b", AsciiType.instance)
+ .addRegularColumn("c", AsciiType.instance)
+ .addRegularColumn("d", AsciiType.instance)
+ .addRegularColumn("e", AsciiType.instance)
+ .addRegularColumn("f", AsciiType.instance);
+
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE,
KeyspaceParams.simple(1),
@@ -205,7 +270,12 @@ public class ReadCommandTest
metadata6,
metadata7,
metadata8,
- metadata9);
+ metadata9,
+ metadata10,
+ metadata11,
+ metadata12,
+ metadata13,
+ metadata14);
LocalSessionAccessor.startup();
}
@@ -332,23 +402,23 @@ public class ReadCommandTest
String[][][] groups = new String[][][] {
new String[][] {
- new String[] { "1", "key1", "aa", "a" }, // "1" indicates to
create the data, "-1" to delete the row
- new String[] { "1", "key2", "bb", "b" },
- new String[] { "1", "key3", "cc", "c" }
+ new String[] { CREATE, "key1", "aa", "a" },
+ new String[] { CREATE, "key2", "bb", "b" },
+ new String[] { CREATE, "key3", "cc", "c" }
},
new String[][] {
- new String[] { "1", "key3", "dd", "d" },
- new String[] { "1", "key2", "ee", "e" },
- new String[] { "1", "key1", "ff", "f" }
+ new String[] { CREATE, "key3", "dd", "d" },
+ new String[] { CREATE, "key2", "ee", "e" },
+ new String[] { CREATE, "key1", "ff", "f" }
},
new String[][] {
- new String[] { "1", "key6", "aa", "a" },
- new String[] { "1", "key5", "bb", "b" },
- new String[] { "1", "key4", "cc", "c" }
+ new String[] { CREATE, "key6", "aa", "a" },
+ new String[] { CREATE, "key5", "bb", "b" },
+ new String[] { CREATE, "key4", "cc", "c" }
},
new String[][] {
- new String[] { "-1", "key6", "aa", "a" },
- new String[] { "-1", "key2", "bb", "b" }
+ new String[] { DELETE, "key6", "aa", "a" },
+ new String[] { DELETE, "key2", "bb", "b" }
}
};
@@ -371,7 +441,7 @@ public class ReadCommandTest
for (String[] data : group)
{
- if (data[0].equals("1"))
+ if (data[0].equals(CREATE))
{
new RowUpdateBuilder(cfs.metadata(), 0,
ByteBufferUtil.bytes(data[1]))
.clustering(data[2])
@@ -493,33 +563,32 @@ public class ReadCommandTest
String[][][] groups = new String[][][] {
new String[][] {
- new String[] { "1", "key1", "aa", "a" }, // "1"
indicates to create the data, "-1" to delete the
- // row
- new String[] { "1", "key2", "bb", "b" },
- new String[] { "1", "key3", "cc", "c" }
+ new String[] { CREATE, "key1", "aa", "a" },
+ new String[] { CREATE, "key2", "bb", "b" },
+ new String[] { CREATE, "key3", "cc", "c" }
},
new String[][] {
- new String[] { "1", "key3", "dd", "d" },
- new String[] { "1", "key2", "ee", "e" },
- new String[] { "1", "key1", "ff", "f" }
+ new String[] { CREATE, "key3", "dd", "d" },
+ new String[] { CREATE, "key2", "ee", "e" },
+ new String[] { CREATE, "key1", "ff", "f" }
},
new String[][] {
- new String[] { "1", "key6", "aa", "a" },
- new String[] { "1", "key5", "bb", "b" },
- new String[] { "1", "key4", "cc", "c" }
+ new String[] { CREATE, "key6", "aa", "a" },
+ new String[] { CREATE, "key5", "bb", "b" },
+ new String[] { CREATE, "key4", "cc", "c" }
},
new String[][] {
- new String[] { "1", "key2", "aa", "a" },
- new String[] { "1", "key2", "cc", "c" },
- new String[] { "1", "key2", "dd", "d" }
+ new String[] { CREATE, "key2", "aa", "a" },
+ new String[] { CREATE, "key2", "cc", "c" },
+ new String[] { CREATE, "key2", "dd", "d" }
},
new String[][] {
- new String[] { "-1", "key6", "aa", "a" },
- new String[] { "-1", "key2", "bb", "b" },
- new String[] { "-1", "key2", "ee", "e" },
- new String[] { "-1", "key2", "aa", "a" },
- new String[] { "-1", "key2", "cc", "c" },
- new String[] { "-1", "key2", "dd", "d" }
+ new String[] { DELETE, "key6", "aa", "a" },
+ new String[] { DELETE, "key2", "bb", "b" },
+ new String[] { DELETE, "key2", "ee", "e" },
+ new String[] { DELETE, "key2", "aa", "a" },
+ new String[] { DELETE, "key2", "cc", "c" },
+ new String[] { DELETE, "key2", "dd", "d" }
}
};
@@ -539,7 +608,7 @@ public class ReadCommandTest
for (String[] data : group)
{
- if (data[0].equals("1"))
+ if (data[0].equals(CREATE))
{
new RowUpdateBuilder(cfs.metadata(), 0,
ByteBufferUtil.bytes(data[1]))
.clustering(data[2])
@@ -582,20 +651,19 @@ public class ReadCommandTest
String[][][] groups = new String[][][] {
new String[][] {
- new String[] { "1", "key1", "aa", "a" }, // "1"
indicates to create the data, "-1" to delete the
- // row
- new String[] { "1", "key2", "bb", "b" },
- new String[] { "1", "key3", "cc", "c" }
+ new String[] { CREATE, "key1", "aa", "a" },
+ new String[] { CREATE, "key2", "bb", "b" },
+ new String[] { CREATE, "key3", "cc", "c" }
},
new String[][] {
- new String[] { "1", "key3", "dd", "d" },
- new String[] { "1", "key2", "ee", "e" },
- new String[] { "1", "key1", "ff", "f" }
+ new String[] { CREATE, "key3", "dd", "d" },
+ new String[] { CREATE, "key2", "ee", "e" },
+ new String[] { CREATE, "key1", "ff", "f" }
},
new String[][] {
- new String[] { "1", "key6", "aa", "a" },
- new String[] { "1", "key5", "bb", "b" },
- new String[] { "1", "key4", "cc", "c" }
+ new String[] { CREATE, "key6", "aa", "a" },
+ new String[] { CREATE, "key5", "bb", "b" },
+ new String[] { CREATE, "key4", "cc", "c" }
}
};
@@ -615,7 +683,7 @@ public class ReadCommandTest
for (String[] data : group)
{
- if (data[0].equals("1"))
+ if (data[0].equals(CREATE))
{
new RowUpdateBuilder(cfs.metadata(), 0,
ByteBufferUtil.bytes(data[1]))
.clustering(data[2])
@@ -651,6 +719,315 @@ public class ReadCommandTest
assertEquals(1,
cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax());
}
+ @Test
+ public void testCountPurgeableRowTombstones() throws Exception
+ {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF10);
+ TestWriteOperation[] operations = new TestWriteOperation[]
+ {
+ TestWriteOperation.insert("key1", "aa", "a"),
+ TestWriteOperation.insert("key1", "ff", "f"),
+
+ TestWriteOperation.insert("key2", "aa", "e"),
+ TestWriteOperation.deleteRow("key2", "aa", PURGEABLE_DELETION),
+ TestWriteOperation.deleteRow("key2", "bb", NEW_DELETION),
+ TestWriteOperation.deleteRow("key2", "cc", PURGEABLE_DELETION),
+ TestWriteOperation.deleteRow("key2", "dd", PURGEABLE_DELETION),
+ TestWriteOperation.deleteRow("key2", "ee", NEW_DELETION),
+ };
+
+ runTestWriteOperationsAndReadResults(cfs, operations,
Config.TombstonesMetricGranularity.row);
+
+ assertEquals(2,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount());
+ assertEquals(0,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin());
+ assertEquals(3,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax());
+ }
+ @Test
+ public void testCountPurgeablePartitionTombstones() throws Exception
+ {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF11);
+ TestWriteOperation[] operations = new TestWriteOperation[]
+ {
+ TestWriteOperation.insert("key1", "aa", "a"),
+ TestWriteOperation.insert("key1", "ff", "f"),
+
+ TestWriteOperation.insert("key2", "aa", "a"),
+ TestWriteOperation.insert("key2", "cc", "c"),
+ TestWriteOperation.insert("key2", "dd", "d"),
+
+ TestWriteOperation.deletePartition("key2", PURGEABLE_DELETION),
+ TestWriteOperation.deletePartition("key3", NEW_DELETION)
+ };
+ runTestWriteOperationsAndReadResults(cfs, operations,
Config.TombstonesMetricGranularity.row);
+
+ assertEquals(3,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount());
+ assertEquals(0,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin());
+ assertEquals(1,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax());
+ }
+
+ @Test
+ public void testCountPurgeableCellTombstones() throws Exception
+ {
+
DatabaseDescriptor.setPurgeableTobmstonesMetricGranularity(Config.TombstonesMetricGranularity.cell);
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF12);
+ TestWriteOperation[] operations = new TestWriteOperation[]
+ {
+ TestWriteOperation.insert("key1", "aa", "a"),
+ TestWriteOperation.insert("key1", "ff", "f"),
+
+ TestWriteOperation.insert("key2", "aa", "a"),
+ TestWriteOperation.deleteCell("key2", "aa", "b", PURGEABLE_DELETION),
+ TestWriteOperation.deleteCell("key2", "aa", "f", NEW_DELETION),
+ TestWriteOperation.insert("key2", "cc", "c"),
+ TestWriteOperation.insert("key2", "dd", "d")
+ };
+ runTestWriteOperationsAndReadResults(cfs, operations,
Config.TombstonesMetricGranularity.cell);
+
+ assertEquals(2,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount());
+ assertEquals(0,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin());
+ assertEquals(1,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax());
+
+ }
+
+ /**
+ * Test purgeable tombstones count for range tombstones with
non-overlapping ranges,
+ * i.e. only Bound (not Boundary) Markers will be created and counted
+ */
+ @Test
+ public void testCountPurgeableRangeTombstones_nonOverlappingRanges()
throws Exception
+ {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF13);
+ TestWriteOperation[] operations = new TestWriteOperation[]
+ {
+ TestWriteOperation.insert("key1", "aa", "a"),
+ TestWriteOperation.insert("key1", "ff", "f"),
+
+ TestWriteOperation.insert("key2", "aa", "a"),
+ TestWriteOperation.insert("key2", "cc", "c"),
+ TestWriteOperation.insert("key2", "dd", "d"),
+
+ TestWriteOperation.deleteRange("key2", "aa", "bb", NEW_DELETION),
+ TestWriteOperation.deleteRange("key2", "dd", "ee",
PURGEABLE_DELETION),
+ TestWriteOperation.deleteRange("key2", "ff", "ff",
PURGEABLE_DELETION)
+ };
+ runTestWriteOperationsAndReadResults(cfs, operations,
Config.TombstonesMetricGranularity.row);
+
+ assertEquals(2,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount());
+ assertEquals(0,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin());
+ assertEquals(4,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax());
+ }
+
+ /**
+ * Test purgeable tombstones count for range tombstones with overlapping
ranges
+ */
+ @Test
+ public void testCountPurgeableRangeTombstones_overlappingRanges() throws
Exception
+ {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF14);
+
+ TestWriteOperation[] operations = new TestWriteOperation[]
+ {
+ TestWriteOperation.insert("key1", "aa", "a"),
+ TestWriteOperation.insert("key1", "ff", "f"),
+
+ TestWriteOperation.insert("key2", "aa", "a"),
+ TestWriteOperation.insert("key2", "bb", "b"),
+ TestWriteOperation.insert("key2", "cc", "c"),
+ TestWriteOperation.insert("key2", "dd", "d"),
+ TestWriteOperation.insert("key2", "ee", "e"),
+
+ // this range tombstone is non-purgeable and overlaps with the next
one,
+ // so it will create one non-purgeable bound marker
+ // and one non-purgeable boundary marker so TWO NON-PURGEABLE
tombstones
+ TestWriteOperation.deleteRange("key2", "aa", "bb", NEW_DELETION),
+
+ // this range tombstone is purgeable and overlaps with previous and
next ones,
+ // so it will create one non-purgeable bound marker
+ // and one non-purgeable boundary marker so TWO non-purgeable
tombstones will be counted
+ TestWriteOperation.deleteRange("key2", "bb", "ee",
PURGEABLE_DELETION),
+
+ // this range tombstone is purgeable and overlaps with previous one,
+ // it has a different deletion time to not combine into a single
range,
+ // so it will create one non-purgeable boundary marker (same as
previous one)
+ // and one purgeable bound marker, so it will increment purgeable
tombstones counter on one,
+ // we expect TWO purgeable tombstones in total
+ TestWriteOperation.deleteRange("key2", "ee", "ff",
PURGEABLE_DELETION - 1)
+ };
+
+ runTestWriteOperationsAndReadResults(cfs, operations,
Config.TombstonesMetricGranularity.row);
+
+ assertEquals(0,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin());
+ assertEquals(2,
cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax());
+ }
+
+
+ private static void runTestWriteOperationsAndReadResults(ColumnFamilyStore
cfs, TestWriteOperation[] operations, Config.TombstonesMetricGranularity
granularity) throws IOException
+ {
+ Config.TombstonesMetricGranularity original =
DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity();
+
DatabaseDescriptor.setPurgeableTobmstonesMetricGranularity(granularity);
+ try
+ {
+ Set<String> usedPartitionKeys = runWriteOperations(cfs,
operations);
+ runPartitionReadCommands(cfs, usedPartitionKeys);
+ }
+ finally
+ {
+
DatabaseDescriptor.setPurgeableTobmstonesMetricGranularity(original);
+ cfs.truncateBlocking();
+ }
+ }
+
+ private static void runPartitionReadCommands(ColumnFamilyStore cfs,
Set<String> partitionKeys) throws IOException
+ {
+ List<SinglePartitionReadCommand> commands = new
ArrayList<>(partitionKeys.size());
+ long nowInSeconds = FBUtilities.nowInSeconds(); // all reads within a
group must have the same nowInSec
+ for (String partitionKey : partitionKeys)
+ {
+ commands.add(getWholePartitionReadCommand(cfs, partitionKey,
nowInSeconds));
+ }
+ executeReadCommands(commands);
+ }
+
+ private static Set<String> runWriteOperations(ColumnFamilyStore cfs,
TestWriteOperation[] operations)
+ {
+ Set<String> usedPartitionKeys = new HashSet<>();
+ for (TestWriteOperation operation : operations)
+ {
+ if (operation.type == OperationType.CREATE)
+ {
+ new RowUpdateBuilder(cfs.metadata(), 0,
ByteBufferUtil.bytes(operation.partitionKey))
+ .clustering(operation.clusteringKey)
+ .add(operation.columnName,
ByteBufferUtil.bytes(operation.columnValue))
+ .build()
+ .apply();
+ }
+ else if (operation.type == OperationType.DELETE_PARTITION)
+ {
+ new Mutation(PartitionUpdate.simpleBuilder(cfs.metadata(),
ByteBufferUtil.bytes(operation.partitionKey))
+ .nowInSec(operation.deletionTime)
+ .delete()
+ .build()).apply();
+ }
+ else if (operation.type == OperationType.DELETE_RANGE)
+ {
+ new RowUpdateBuilder(cfs.metadata(), operation.deletionTime,
0L, ByteBufferUtil.bytes(operation.partitionKey))
+ .addRangeTombstone(operation.clusteringRangeStart,
operation.clusteringRangeEnd).build().apply();
+ }
+ else if (operation.type == OperationType.DELETE_ROW)
+ {
+ RowUpdateBuilder.deleteRowAt(cfs.metadata(), 0,
operation.deletionTime,
+
ByteBufferUtil.bytes(operation.partitionKey), operation.clusteringKey
+ ).apply();
+ }
+ else if (operation.type == OperationType.DELETE_CELL)
+ {
+ new RowUpdateBuilder(cfs.metadata(), operation.deletionTime,
0L, ByteBufferUtil.bytes(operation.partitionKey))
+ .clustering(operation.clusteringKey)
+ .delete(operation.columnName)
+ .build().apply();
+ }
+
+ usedPartitionKeys.add(operation.partitionKey);
+ }
+ return usedPartitionKeys;
+ }
+
+ private static final long NEW_DELETION = FBUtilities.nowInSeconds();
+ private static final long PURGEABLE_DELETION = 42;
+
+ private enum OperationType
+ {
+ CREATE,
+ DELETE_PARTITION,
+ DELETE_RANGE,
+ DELETE_ROW,
+ DELETE_CELL
+ }
+
+ private static class TestWriteOperation
+ {
+ OperationType type;
+ String partitionKey;
+ String clusteringKey;
+
+ String clusteringRangeStart, clusteringRangeEnd;
+ String columnName;
+ String columnValue = "bla";
+
+ long deletionTime;
+
+ public TestWriteOperation(OperationType type, String partitionKey,
+ String clusteringKey, String
clusteringRangeStart, String clusteringRangeEnd,
+ String columnName, long deletionTime)
+ {
+ this.type = type;
+ this.partitionKey = partitionKey;
+ this.clusteringKey = clusteringKey;
+ this.clusteringRangeStart = clusteringRangeStart;
+ this.clusteringRangeEnd = clusteringRangeEnd;
+ this.columnName = columnName;
+ this.deletionTime = deletionTime;
+ }
+
+ public static TestWriteOperation insert(String partitionKey, String
clusteringKey,
+ String columnName)
+ {
+ return new TestWriteOperation(OperationType.CREATE, partitionKey,
clusteringKey, null, null, columnName, 0);
+ }
+
+ public static TestWriteOperation deletePartition(String partitionKey,
long deletionTime)
+ {
+ return new TestWriteOperation(OperationType.DELETE_PARTITION,
partitionKey,
+ null, null, null, null,
deletionTime);
+ }
+
+ public static TestWriteOperation deleteRange(String partitionKey,
String clusteringRangeStart, String clusteringRangeEnd, long deletionTime)
+ {
+ return new TestWriteOperation(OperationType.DELETE_RANGE,
partitionKey,
+ null, clusteringRangeStart,
clusteringRangeEnd, null, deletionTime);
+ }
+
+ public static TestWriteOperation deleteRow(String partitionKey, String
clusteringKey, long deletionTime)
+ {
+ return new TestWriteOperation(OperationType.DELETE_ROW,
partitionKey, clusteringKey,
+ null, null, null, deletionTime);
+ }
+
+ public static TestWriteOperation deleteCell(String partitionKey,
String clusteringKey, String columnName, long deletionTime)
+ {
+ return new TestWriteOperation(OperationType.DELETE_CELL,
partitionKey, clusteringKey,
+ null, null, columnName,
deletionTime);
+ }
+
+
+ }
+
+ private static void executeReadCommands(List<SinglePartitionReadCommand>
commands) throws IOException
+ {
+ ReadQuery query = SinglePartitionReadCommand.Group.create(commands,
DataLimits.NONE);
+
+ try (ReadExecutionController executionController =
query.executionController();
+ UnfilteredPartitionIterator iter =
query.executeLocally(executionController);
+ DataOutputBuffer buffer = new DataOutputBuffer())
+ {
+
UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter,
+
query.columnFilter(),
+
buffer,
+
MessagingService.current_version);
+ }
+ }
+
+ private static SinglePartitionReadCommand
getWholePartitionReadCommand(ColumnFamilyStore cfs, String partitionKey, long
nowInSeconds)
+ {
+ ColumnFilter columnFilter =
ColumnFilter.allRegularColumnsBuilder(cfs.metadata(), false).build();
+ RowFilter rowFilter = RowFilter.create(true);
+ Slice slice = Slice.make(BufferClusteringBound.BOTTOM,
BufferClusteringBound.TOP);
+ ClusteringIndexSliceFilter sliceFilter = new
ClusteringIndexSliceFilter(Slices.with(cfs.metadata().comparator, slice),
false);
+ return SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds,
+ columnFilter, rowFilter,
+ DataLimits.NONE,
Util.dk(partitionKey), sliceFilter);
+ }
+
@Test
public void testSinglePartitionSliceRepairedDataTracking() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]