Repository: cassandra
Updated Branches:
  refs/heads/trunk 473e8dfd7 -> a4cf29fe9


Count deleted rows scanned during reads for tracing and tombstone thresholds.

If a row is read but is not live anymore (which happens with row level 
tombstones) it is not counted anywhere
in the metrics nor reported in tracing. Row tombstones themselves are not 
reported anywhere.
The consequence is that some delete heavy workloads will show no tombstone read 
but endure severe
performance issues. This commit counts deleted rows as standard tombstone cells.

Patch by Alexander Dejanovski; Reviewed by Jon Haddad for CASSANDRA-8527


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9d649d69
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9d649d69
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9d649d69

Branch: refs/heads/trunk
Commit: 9d649d69a56a91fcb06a3582b22606f0fe361f49
Parents: d369190
Author: Jon Haddad <j...@jonhaddad.com>
Authored: Thu Feb 8 11:01:38 2018 -0800
Committer: Jon Haddad <j...@jonhaddad.com>
Committed: Thu Feb 8 11:01:38 2018 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/db/ReadCommand.java    |  34 ++-
 .../apache/cassandra/metrics/TableMetrics.java  |   2 +-
 .../apache/cassandra/db/ReadCommandTest.java    | 206 ++++++++++++++-----
 4 files changed, 185 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d649d69/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8c0d8f0..4eb03e5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,8 @@
  * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
  * Update jackson JSON jars (CASSANDRA-13949)
  * Avoid locks when checking LCS fanout and if we should defrag 
(CASSANDRA-13930)
+ * Correctly count range tombstones in traces and tombstone thresholds 
(CASSANDRA-8527)
+
 Merged from 3.0:
  * Handle failure when mutating repaired status in Verifier (CASSANDRA-13933)
  * Set encoding for javadoc generation (CASSANDRA-14154)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d649d69/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index ab8779e..c8b256a 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -485,17 +485,37 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
                 return applyToRow(row);
             }
 
+            /**
+             * Count the number of live rows returned by the read command and 
the number of tombstones.
+             *
+             * Tombstones come in two forms on rows :
+             * - cells that aren't live anymore (either expired through TTL or 
deleted) : 1 tombstone per cell
+             * - Rows that aren't live and have no cell (DELETEs performed on 
the primary key) : 1 tombstone per row 
+             * We avoid counting rows as tombstones if they contain nothing 
but expired cells.
+             */
             @Override
             public Row applyToRow(Row row)
             {
-                if (row.hasLiveData(ReadCommand.this.nowInSec(), 
enforceStrictLiveness))
-                    ++liveRows;
-
+                boolean hasTombstones = false;
                 for (Cell cell : row.cells())
                 {
                     if (!cell.isLive(ReadCommand.this.nowInSec()))
+                    {
                         countTombstone(row.clustering());
+                        hasTombstones = true; // allows to avoid counting an 
extra tombstone if the whole row expired
+                    }
                 }
+
+                if (row.hasLiveData(ReadCommand.this.nowInSec(), 
enforceStrictLiveness))
+                    ++liveRows;
+                else if 
(!row.primaryKeyLivenessInfo().isLive(ReadCommand.this.nowInSec())
+                        && row.hasDeletion(ReadCommand.this.nowInSec())
+                        && !hasTombstones)
+                {
+                    // We're counting primary key deletions only here.
+                    countTombstone(row.clustering());
+                }
+
                 return row;
             }
 
@@ -528,12 +548,16 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
                 boolean warnTombstones = tombstones > warningThreshold && 
respectTombstoneThresholds;
                 if (warnTombstones)
                 {
-                    String msg = String.format("Read %d live rows and %d 
tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, 
tombstones, ReadCommand.this.toCQLString());
+                    String msg = String.format(
+                            "Read %d live rows and %d tombstone cells for 
query %1.512s (see tombstone_warn_threshold)",
+                            liveRows, tombstones, 
ReadCommand.this.toCQLString());
                     ClientWarn.instance.warn(msg);
                     logger.warn(msg);
                 }
 
-                Tracing.trace("Read {} live and {} tombstone cells{}", 
liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : 
""));
+                Tracing.trace("Read {} live rows and {} tombstone cells{}",
+                        liveRows, tombstones,
+                        (warnTombstones ? " (see tombstone_warn_threshold)" : 
""));
             }
         };
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d649d69/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java 
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index e78bb66..620ef72 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -122,7 +122,7 @@ public class TableMetrics
     public final Gauge<Double> keyCacheHitRate;
     /** Tombstones scanned in queries on this CF */
     public final TableHistogram tombstoneScannedHistogram;
-    /** Live cells scanned in queries on this CF */
+    /** Live rows scanned in queries on this CF */
     public final TableHistogram liveScannedHistogram;
     /** Column update time delta on this CF */
     public final TableHistogram colUpdateTimeDeltaHistogram;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d649d69/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java 
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 9264297..960539c 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -210,13 +211,144 @@ public class ReadCommandTest
         // Given the data above, when the keys are sorted and the deletions 
removed, we should
         // get these clustering rows in this order
         String[] expectedRows = new String[] { "aa", "ff", "ee", "cc", "dd", 
"cc", "bb"};
+        int nowInSeconds = FBUtilities.nowInSeconds();
 
-        List<ByteBuffer> buffers = new ArrayList<>(groups.length);
+        List<UnfilteredPartitionIterator> iterators = 
writeAndThenReadPartitions(cfs, groups, nowInSeconds);
+        UnfilteredPartitionIterators.MergeListener listener =
+            new UnfilteredPartitionIterators.MergeListener()
+            {
+                public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
+                {
+                    return null;
+                }
+
+                public void close()
+                {
+
+                }
+            };
+
+        try (PartitionIterator partitionIterator = 
UnfilteredPartitionIterators.filter(UnfilteredPartitionIterators.merge(iterators,
 nowInSeconds, listener), nowInSeconds))
+        {
+
+            int i = 0;
+            int numPartitions = 0;
+            while (partitionIterator.hasNext())
+            {
+                numPartitions++;
+                try(RowIterator rowIterator = partitionIterator.next())
+                {
+                    while (rowIterator.hasNext())
+                    {
+                        Row row = rowIterator.next();
+                        assertEquals("col=" + expectedRows[i++], 
row.clustering().toString(cfs.metadata));
+                        //System.out.print(row.toString(cfs.metadata, true));
+                    }
+                }
+            }
+
+            assertEquals(5, numPartitions);
+            assertEquals(expectedRows.length, i);
+        }
+    }
+
+    /**
+     * This test will create several partitions with several rows each. Then, 
it will perform up to 5 row deletions on
+     * some partitions. We check that when reading the partitions, the maximum 
number of tombstones reported in the
+     * metrics is indeed equal to 5.
+     */
+    @Test
+    public void testCountDeletedRows() throws Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF3);
+
+        String[][][] groups = new String[][][] {
+                new String[][] {
+                        new String[] { "1", "key1", "aa", "a" }, // "1" 
indicates to create the data, "-1" to delete the
+                                                                 // row
+                        new String[] { "1", "key2", "bb", "b" },
+                        new String[] { "1", "key3", "cc", "c" }
+                },
+                new String[][] {
+                        new String[] { "1", "key3", "dd", "d" },
+                        new String[] { "1", "key2", "ee", "e" },
+                        new String[] { "1", "key1", "ff", "f" }
+                },
+                new String[][] {
+                        new String[] { "1", "key6", "aa", "a" },
+                        new String[] { "1", "key5", "bb", "b" },
+                        new String[] { "1", "key4", "cc", "c" }
+                },
+                new String[][] {
+                        new String[] { "1", "key2", "aa", "a" },
+                        new String[] { "1", "key2", "cc", "c" },
+                        new String[] { "1", "key2", "dd", "d" }
+                },
+                new String[][] {
+                        new String[] { "-1", "key6", "aa", "a" },
+                        new String[] { "-1", "key2", "bb", "b" },
+                        new String[] { "-1", "key2", "ee", "e" },
+                        new String[] { "-1", "key2", "aa", "a" },
+                        new String[] { "-1", "key2", "cc", "c" },
+                        new String[] { "-1", "key2", "dd", "d" }
+                }
+        };
         int nowInSeconds = FBUtilities.nowInSeconds();
+
+        writeAndThenReadPartitions(cfs, groups, nowInSeconds);
+
+        assertEquals(5, 
cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax());
+    }
+
+    /**
+     * This test will create several partitions with several rows each and no 
deletions. We check that when reading the
+     * partitions, the maximum number of tombstones reported in the metrics is 
equal to 1, which is apparently the
+     * default max value for histograms in the metrics lib (equivalent to 
having no element reported).
+     */
+    @Test
+    public void testCountWithNoDeletedRow() throws Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF3);
+
+        String[][][] groups = new String[][][] {
+                new String[][] {
+                        new String[] { "1", "key1", "aa", "a" }, // "1" 
indicates to create the data, "-1" to delete the
+                                                                 // row
+                        new String[] { "1", "key2", "bb", "b" },
+                        new String[] { "1", "key3", "cc", "c" }
+                },
+                new String[][] {
+                        new String[] { "1", "key3", "dd", "d" },
+                        new String[] { "1", "key2", "ee", "e" },
+                        new String[] { "1", "key1", "ff", "f" }
+                },
+                new String[][] {
+                        new String[] { "1", "key6", "aa", "a" },
+                        new String[] { "1", "key5", "bb", "b" },
+                        new String[] { "1", "key4", "cc", "c" }
+                }
+        };
+
+        int nowInSeconds = FBUtilities.nowInSeconds();
+
+        writeAndThenReadPartitions(cfs, groups, nowInSeconds);
+
+        assertEquals(1, 
cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax());
+    }
+
+    /**
+     * Writes rows to the column family store using the groups as input and 
then reads them. Returns the iterators from
+     * the read.
+     */
+    private List<UnfilteredPartitionIterator> 
writeAndThenReadPartitions(ColumnFamilyStore cfs, String[][][] groups,
+            int nowInSeconds) throws IOException
+    {
+        List<ByteBuffer> buffers = new ArrayList<>(groups.length);
         ColumnFilter columnFilter = 
ColumnFilter.allColumnsBuilder(cfs.metadata).build();
         RowFilter rowFilter = RowFilter.create();
         Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.TOP);
-        ClusteringIndexSliceFilter sliceFilter = new 
ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), false);
+        ClusteringIndexSliceFilter sliceFilter = new 
ClusteringIndexSliceFilter(
+                Slices.with(cfs.metadata.comparator, slice), false);
 
         for (String[][] group : groups)
         {
@@ -229,16 +361,18 @@ public class ReadCommandTest
                 if (data[0].equals("1"))
                 {
                     new RowUpdateBuilder(cfs.metadata, 0, 
ByteBufferUtil.bytes(data[1]))
-                    .clustering(data[2])
-                    .add(data[3], ByteBufferUtil.bytes("blah"))
-                    .build()
-                    .apply();
+                            .clustering(data[2])
+                            .add(data[3], ByteBufferUtil.bytes("blah"))
+                            .build()
+                            .apply();
                 }
                 else
                 {
-                    RowUpdateBuilder.deleteRow(cfs.metadata, 
FBUtilities.timestampMicros(), ByteBufferUtil.bytes(data[1]), data[2]).apply();
+                    RowUpdateBuilder.deleteRow(cfs.metadata, 
FBUtilities.timestampMicros(),
+                            ByteBufferUtil.bytes(data[1]), data[2]).apply();
                 }
-                commands.add(SinglePartitionReadCommand.create(cfs.metadata, 
nowInSeconds, columnFilter, rowFilter, DataLimits.NONE, Util.dk(data[1]), 
sliceFilter));
+                commands.add(SinglePartitionReadCommand.create(cfs.metadata, 
nowInSeconds, columnFilter, rowFilter,
+                        DataLimits.NONE, Util.dk(data[1]), sliceFilter));
             }
 
             cfs.forceBlockingFlush();
@@ -246,13 +380,13 @@ public class ReadCommandTest
             ReadQuery query = new SinglePartitionReadCommand.Group(commands, 
DataLimits.NONE);
 
             try (ReadExecutionController executionController = 
query.executionController();
-                 UnfilteredPartitionIterator iter = 
query.executeLocally(executionController);
-                 DataOutputBuffer buffer = new DataOutputBuffer())
+                    UnfilteredPartitionIterator iter = 
query.executeLocally(executionController);
+                    DataOutputBuffer buffer = new DataOutputBuffer())
             {
                 
UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter,
-                                                                               
 columnFilter,
-                                                                               
 buffer,
-                                                                               
 MessagingService.current_version);
+                        columnFilter,
+                        buffer,
+                        MessagingService.current_version);
                 buffers.add(buffer.buffer());
             }
         }
@@ -265,48 +399,14 @@ public class ReadCommandTest
             try (DataInputBuffer in = new DataInputBuffer(buffer, true))
             {
                 
iterators.add(UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
-                                                                               
                 MessagingService.current_version,
-                                                                               
                 cfs.metadata,
-                                                                               
                 columnFilter,
-                                                                               
                 SerializationHelper.Flag.LOCAL));
+                        MessagingService.current_version,
+                        cfs.metadata,
+                        columnFilter,
+                        SerializationHelper.Flag.LOCAL));
             }
         }
 
-        UnfilteredPartitionIterators.MergeListener listener =
-            new UnfilteredPartitionIterators.MergeListener()
-            {
-                public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
-                {
-                    return null;
-                }
-
-                public void close()
-                {
-
-                }
-            };
-
-        try (PartitionIterator partitionIterator = 
UnfilteredPartitionIterators.filter(UnfilteredPartitionIterators.merge(iterators,
 nowInSeconds, listener), nowInSeconds))
-        {
-
-            int i = 0;
-            int numPartitions = 0;
-            while (partitionIterator.hasNext())
-            {
-                numPartitions++;
-                try(RowIterator rowIterator = partitionIterator.next())
-                {
-                    while (rowIterator.hasNext())
-                    {
-                        Row row = rowIterator.next();
-                        assertEquals("col=" + expectedRows[i++], 
row.clustering().toString(cfs.metadata));
-                        //System.out.print(row.toString(cfs.metadata, true));
-                    }
-                }
-            }
-
-            assertEquals(5, numPartitions);
-            assertEquals(expectedRows.length, i);
-        }
+        return iterators;
     }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to