This is an automated email from the ASF dual-hosted git repository.
jwest pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2d323cb565 Add row,tombstone,and sstable count to profileload
2d323cb565 is described below
commit 2d323cb56572e867b13b6d102a61aaff8bd66c86
Author: Jordan West <[email protected]>
AuthorDate: Wed Dec 21 12:10:42 2022 -0800
Add row,tombstone,and sstable count to profileload
Patch by Jordan West; Reviewed by David Capwell for CASSANDRA-18022
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/ReadCommand.java | 18 ++++
.../cassandra/db/SinglePartitionReadCommand.java | 2 +
src/java/org/apache/cassandra/metrics/Sampler.java | 17 ++++
.../org/apache/cassandra/metrics/TableMetrics.java | 34 +++++++
.../apache/cassandra/tools/TopPartitionsTest.java | 108 +++++++++++++++++++++
6 files changed, 180 insertions(+)
diff --git a/CHANGES.txt b/CHANGES.txt
index bf760aa8dc..84136b255d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Add row, tombstone, and sstable count to nodetool profileload
(CASSANDRA-18022)
* Coordinator level metrics for read response and mutation row and column
counts (CASSANDRA-18155)
* Add CQL functions for dynamic data masking (CASSANDRA-17941)
* Print friendly error when nodetool attempts to connect to uninitialized
server (CASSANDRA-11537)
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java
b/src/java/org/apache/cassandra/db/ReadCommand.java
index d03650ff36..95fa95da91 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -501,7 +501,9 @@ public abstract class ReadCommand extends AbstractReadQuery
private final boolean enforceStrictLiveness =
metadata().enforceStrictLiveness();
private int liveRows = 0;
+ private int lastReportedLiveRows = 0;
private int tombstones = 0;
+ private int lastReportedTombstones = 0;
private DecoratedKey currentKey;
@@ -568,6 +570,22 @@ public abstract class ReadCommand extends AbstractReadQuery
}
}
+ @Override
+ protected void onPartitionClose()
+ {
+ int lr = liveRows - lastReportedLiveRows;
+ int ts = tombstones - lastReportedTombstones;
+
+ if (lr > 0)
+
metric.topReadPartitionRowCount.addSample(currentKey.getKey(), lr);
+
+ if (ts > 0)
+
metric.topReadPartitionTombstoneCount.addSample(currentKey.getKey(), ts);
+
+ lastReportedLiveRows = liveRows;
+ lastReportedTombstones = tombstones;
+ }
+
@Override
public void onClose()
{
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 963b9fee1c..34414bbea6 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -820,6 +820,7 @@ public class SinglePartitionReadCommand extends ReadCommand
implements SinglePar
{
DecoratedKey key = merged.partitionKey();
metrics.topReadPartitionFrequency.addSample(key.getKey(), 1);
+ metrics.topReadPartitionSSTableCount.addSample(key.getKey(),
metricsCollector.getMergedSSTables());
}
class UpdateSstablesIterated extends
Transformation<UnfilteredRowIterator>
@@ -963,6 +964,7 @@ public class SinglePartitionReadCommand extends ReadCommand
implements SinglePar
DecoratedKey key = result.partitionKey();
cfs.metric.topReadPartitionFrequency.addSample(key.getKey(), 1);
+ cfs.metric.topReadPartitionSSTableCount.addSample(key.getKey(),
metricsCollector.getMergedSSTables());
StorageHook.instance.reportRead(cfs.metadata.id, partitionKey());
return result.unfilteredIterator(columnFilter(), Slices.ALL,
clusteringIndexFilter().isReversed());
diff --git a/src/java/org/apache/cassandra/metrics/Sampler.java
b/src/java/org/apache/cassandra/metrics/Sampler.java
index de7f0b2e1a..6eb8508b9f 100644
--- a/src/java/org/apache/cassandra/metrics/Sampler.java
+++ b/src/java/org/apache/cassandra/metrics/Sampler.java
@@ -54,6 +54,23 @@ public abstract class Sampler<T>
resultBuilder.forType(samplerType, samplerType.description)
.addColumn("Query", "value")
.addColumn("Microseconds", "count"))),
+ READ_ROW_COUNT("Partitions read with the most rows", ((samplerType,
resultBuilder) ->
+
resultBuilder.forType(samplerType, samplerType.description)
+ .addColumn("Table",
"table")
+ .addColumn("Partition",
"value")
+ .addColumn("Rows",
"count"))),
+
+ READ_TOMBSTONE_COUNT("Partitions read with the most tombstones",
((samplerType, resultBuilder) ->
+
resultBuilder.forType(samplerType, samplerType.description)
+
.addColumn("Table", "table")
+
.addColumn("Partition", "value")
+
.addColumn("Tombstones", "count"))),
+
+ READ_SSTABLE_COUNT("Partitions read with the most sstables",
((samplerType, resultBuilder) ->
+
resultBuilder.forType(samplerType, samplerType.description)
+
.addColumn("Table", "table")
+
.addColumn("Partition", "value")
+
.addColumn("SSTables", "count"))),
WRITE_SIZE("Max mutation size by partition", ((samplerType,
resultBuilder) ->
resultBuilder.forType(samplerType, samplerType.description)
.addColumn("Table", "table")
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 8f3645dd1e..04102f7995 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -262,6 +262,12 @@ public class TableMetrics
public final Sampler<ByteBuffer> topCasPartitionContention;
/** When sampler activated, will track the slowest local reads **/
public final Sampler<String> topLocalReadQueryTime;
+ /** When sampler activated, will track partitions read with the most rows
**/
+ public final Sampler<ByteBuffer> topReadPartitionRowCount;
+ /** When sampler activated, will track partitions read with the most
tombstones **/
+ public final Sampler<ByteBuffer> topReadPartitionTombstoneCount;
+ /** When sample activated, will track partitions read with the most merged
sstables **/
+ public final Sampler<ByteBuffer> topReadPartitionSSTableCount;
public final TableMeter clientTombstoneWarnings;
public final TableMeter clientTombstoneAborts;
@@ -442,11 +448,39 @@ public class TableMetrics
}
};
+ topReadPartitionRowCount = new MaxSampler<ByteBuffer>()
+ {
+ public String toString(ByteBuffer value)
+ {
+ return cfs.metadata().partitionKeyType.getString(value);
+ }
+ };
+
+ topReadPartitionTombstoneCount = new MaxSampler<ByteBuffer>()
+ {
+ public String toString(ByteBuffer value)
+ {
+ return cfs.metadata().partitionKeyType.getString(value);
+ }
+ };
+
+ topReadPartitionSSTableCount = new MaxSampler<ByteBuffer>()
+ {
+ @Override
+ public String toString(ByteBuffer value)
+ {
+ return cfs.metadata().partitionKeyType.getString(value);
+ }
+ };
+
samplers.put(SamplerType.READS, topReadPartitionFrequency);
samplers.put(SamplerType.WRITES, topWritePartitionFrequency);
samplers.put(SamplerType.WRITE_SIZE, topWritePartitionSize);
samplers.put(SamplerType.CAS_CONTENTIONS, topCasPartitionContention);
samplers.put(SamplerType.LOCAL_READ_TIME, topLocalReadQueryTime);
+ samplers.put(SamplerType.READ_ROW_COUNT, topReadPartitionRowCount);
+ samplers.put(SamplerType.READ_TOMBSTONE_COUNT,
topReadPartitionTombstoneCount);
+ samplers.put(SamplerType.READ_SSTABLE_COUNT,
topReadPartitionSSTableCount);
memtableColumnsCount = createTableGauge("MemtableColumnsCount",
() ->
cfs.getTracker().getView().getCurrentMemtable().operationCount());
diff --git a/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java
b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java
index 934d4d814c..d9cfdeecef 100644
--- a/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java
+++ b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.metrics.Sampler;
+import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
import static java.lang.String.format;
@@ -54,10 +55,15 @@ import static org.junit.Assert.assertTrue;
*/
public class TopPartitionsTest
{
+ public static String KEYSPACE =
TopPartitionsTest.class.getSimpleName().toLowerCase();
+ public static String TABLE = "test";
+
@BeforeClass
public static void loadSchema() throws ConfigurationException
{
SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1));
+ executeInternal(format("CREATE TABLE %s.%s (k text, c text, v text,
PRIMARY KEY (k, c))", KEYSPACE, TABLE));
}
@Test
@@ -93,6 +99,108 @@ public class TopPartitionsTest
assertEquals("If this failed you probably have to raise the
beginLocalSampling duration", 1, result.size());
}
+ @Test
+ public void testTopPartitionsRowTombstoneAndSSTableCount() throws Exception
+ {
+ int count = 10;
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, TABLE);
+ cfs.disableAutoCompaction();
+
+ executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'a',
'a')", KEYSPACE, TABLE));
+ executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'b',
'a')", KEYSPACE, TABLE));
+ cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+
+ executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'c',
'a')", KEYSPACE, TABLE));
+ executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('b', 'b',
'b')", KEYSPACE, TABLE));
+ executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'c',
'c')", KEYSPACE, TABLE));
+ executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'd',
'a')", KEYSPACE, TABLE));
+ executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'e',
'a')", KEYSPACE, TABLE));
+ executeInternal(format("DELETE FROM %s.%s WHERE k='a' AND c='a'",
KEYSPACE, TABLE));
+ cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+
+ // test multi-partition read
+ cfs.beginLocalSampling("READ_ROW_COUNT", count, 240000);
+ cfs.beginLocalSampling("READ_TOMBSTONE_COUNT", count, 240000);
+ cfs.beginLocalSampling("READ_SSTABLE_COUNT", count, 240000);
+
+ executeInternal(format("SELECT * FROM %s.%s", KEYSPACE, TABLE));
+ Thread.sleep(2000); // simulate waiting before finishing sampling
+
+ List<CompositeData> rowCounts =
cfs.finishLocalSampling("READ_ROW_COUNT", count);
+ List<CompositeData> tsCounts =
cfs.finishLocalSampling("READ_TOMBSTONE_COUNT", count);
+ List<CompositeData> sstCounts =
cfs.finishLocalSampling("READ_SSTABLE_COUNT", count);
+
+ assertEquals(0, sstCounts.size()); // not tracked on range reads
+ assertEquals(3, rowCounts.size()); // 3 partitions read (a, b, c)
+ assertEquals(1, tsCounts.size()); // 1 partition w tombstones (a)
+
+ for (CompositeData data : rowCounts)
+ {
+ String partitionKey = (String) data.get("value");
+ long numRows = (long) data.get("count");
+ if (partitionKey.equalsIgnoreCase("a"))
+ {
+ assertEquals(2, numRows);
+ }
+ else if (partitionKey.equalsIgnoreCase("b"))
+ assertEquals(1, numRows);
+ else if (partitionKey.equalsIgnoreCase("c"))
+ assertEquals(3, numRows);
+ }
+
+ assertEquals("a", tsCounts.get(0).get("value"));
+ assertEquals(1, (long) tsCounts.get(0).get("count"));
+
+ // test single partition read
+ cfs.beginLocalSampling("READ_ROW_COUNT", count, 240000);
+ cfs.beginLocalSampling("READ_TOMBSTONE_COUNT", count, 240000);
+ cfs.beginLocalSampling("READ_SSTABLE_COUNT", count, 240000);
+
+ executeInternal(format("SELECT * FROM %s.%s WHERE k='a'", KEYSPACE,
TABLE));
+ executeInternal(format("SELECT * FROM %s.%s WHERE k='b'", KEYSPACE,
TABLE));
+ executeInternal(format("SELECT * FROM %s.%s WHERE k='c'", KEYSPACE,
TABLE));
+ Thread.sleep(2000); // simulate waiting before finishing sampling
+
+ rowCounts = cfs.finishLocalSampling("READ_ROW_COUNT", count);
+ tsCounts = cfs.finishLocalSampling("READ_TOMBSTONE_COUNT", count);
+ sstCounts = cfs.finishLocalSampling("READ_SSTABLE_COUNT", count);
+
+ assertEquals(3, sstCounts.size()); // 3 partitions read
+ assertEquals(3, rowCounts.size()); // 3 partitions read
+ assertEquals(1, tsCounts.size()); // 3 partitions read only one
containing tombstones
+
+ for (CompositeData data : sstCounts)
+ {
+ String partitionKey = (String) data.get("value");
+ long numRows = (long) data.get("count");
+ if (partitionKey.equalsIgnoreCase("a"))
+ {
+ assertEquals(2, numRows);
+ }
+ else if (partitionKey.equalsIgnoreCase("b"))
+ assertEquals(1, numRows);
+ else if (partitionKey.equalsIgnoreCase("c"))
+ assertEquals(1, numRows);
+ }
+
+ for (CompositeData data : rowCounts)
+ {
+ String partitionKey = (String) data.get("value");
+ long numRows = (long) data.get("count");
+ if (partitionKey.equalsIgnoreCase("a"))
+ {
+ assertEquals(2, numRows);
+ }
+ else if (partitionKey.equalsIgnoreCase("b"))
+ assertEquals(1, numRows);
+ else if (partitionKey.equalsIgnoreCase("c"))
+ assertEquals(3, numRows);
+ }
+
+ assertEquals("a", tsCounts.get(0).get("value"));
+ assertEquals(1, (long) tsCounts.get(0).get("count"));
+ }
+
@Test
public void testStartAndStopScheduledSampling()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]