Notify registered indexes of expired rows during compaction Patch by Sam Tunnicliffe; reviewed by Sylvain Lebresne for CASSANDRA-11329
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/42459320 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/42459320 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/42459320 Branch: refs/heads/trunk Commit: 42459320586636c6dcbec9f56544d8a5256a3412 Parents: 1681c18 Author: Sam Tunnicliffe <[email protected]> Authored: Wed Mar 9 18:58:47 2016 +0000 Committer: Sam Tunnicliffe <[email protected]> Committed: Wed Apr 6 15:18:23 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/index/SecondaryIndexManager.java | 19 +++++++++----- .../index/internal/CassandraIndex.java | 2 +- .../apache/cassandra/index/CustomIndexTest.java | 27 ++++++++++++++++++++ 4 files changed, 41 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/42459320/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8f4329a..26ab66d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.6 + * Notify indexers of expired rows during compaction (CASSANDRA-11329) * Properly respond with ProtocolError when a v1/v2 native protocol header is received (CASSANDRA-11464) * Validate that num_tokens and initial_token are consistent with one another (CASSANDRA-10120) http://git-wip-us.apache.org/repos/asf/cassandra/blob/42459320/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 16cb9c4..0a2e128 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -902,6 +902,8 @@ public class SecondaryIndexManager implements IndexRegistry { public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) { + if (original != null && (merged == null || !merged.isLive(nowInSec))) + getBuilder(i, clustering).addPrimaryKeyLivenessInfo(original); } public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) @@ -914,15 +916,18 @@ public class SecondaryIndexManager implements IndexRegistry public void onCell(int i, Clustering clustering, Cell merged, Cell original) { - if (original != null && merged == null) + if (original != null && (merged == null || !merged.isLive(nowInSec))) + getBuilder(i, clustering).addCell(original); + } + + private Row.Builder getBuilder(int index, Clustering clustering) + { + if (builders[index] == null) { - if (builders[i] == null) - { - builders[i] = BTreeRow.sortedBuilder(); - builders[i].newRow(clustering); - } - builders[i].addCell(original); + builders[index] = BTreeRow.sortedBuilder(); + builders[index].newRow(clustering); } + return builders[index]; } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/42459320/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index 74d3f5d..4bbf682 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -380,7 +380,7 @@ public abstract class CassandraIndex implements Index public void removeRow(Row row) { if (isPrimaryKeyIndex()) - indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion()); + return; if (indexedColumn.isComplex()) removeCells(row.clustering(), row.getComplexColumnData(indexedColumn)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/42459320/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java index 0b553f4..9de3606 100644 --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@ -2,6 +2,7 @@ package org.apache.cassandra.index; import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -537,6 +538,32 @@ public class CustomIndexTest extends CQLTester } @Test + public void notifyIndexersOfExpiredRowsDuringCompaction() throws Throwable + { + createTable("CREATE TABLE %s (k int, c int, PRIMARY KEY (k,c))"); + createIndex(String.format("CREATE CUSTOM INDEX row_ttl_test_index ON %%s() USING '%s'", StubIndex.class.getName())); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + StubIndex index = (StubIndex)cfs.indexManager.getIndexByName("row_ttl_test_index"); + + execute("INSERT INTO %s (k, c) VALUES (?, ?) USING TTL 1", 0, 0); + execute("INSERT INTO %s (k, c) VALUES (?, ?)", 0, 1); + execute("INSERT INTO %s (k, c) VALUES (?, ?)", 0, 2); + execute("INSERT INTO %s (k, c) VALUES (?, ?)", 3, 3); + assertEquals(4, index.rowsInserted.size()); + // flush so that we end up with an expiring row in the first sstable + flush(); + + // let the row with the ttl expire, then force a compaction + TimeUnit.SECONDS.sleep(2); + compact(); + + // the index should have been notified of the expired row + assertEquals(1, index.rowsDeleted.size()); + Integer deletedClustering = Int32Type.instance.compose(index.rowsDeleted.get(0).clustering().get(0)); + assertEquals(0, deletedClustering.intValue()); + } + + @Test public void validateOptions() throws Throwable { createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))");
