Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 322392729 -> 5730e7b9f refs/heads/cassandra-3.1 88892af93 -> 00c31a24c refs/heads/trunk 34822301c -> d8ad0f0f2
Notify indexers of partition deletion during cleanup Patch by Sam Tunnicliffe; reviewed by Sergio Bossa for CASSANDRA-10685 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5730e7b9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5730e7b9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5730e7b9 Branch: refs/heads/cassandra-3.0 Commit: 5730e7b9f8dfc23b9001b9ad41a63f99f33ed31b Parents: 3223927 Author: Sam Tunnicliffe <[email protected]> Authored: Fri Nov 13 17:15:11 2015 +0000 Committer: Sam Tunnicliffe <[email protected]> Committed: Fri Nov 13 17:15:11 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/index/SecondaryIndexManager.java | 7 ++++- .../apache/cassandra/index/CustomIndexTest.java | 33 ++++++++++++++++++++ 3 files changed, 40 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5730e7b9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f00b36d..f214200 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.1 + * Notify indexers of partition delete during cleanup (CASSANDRA-10685) * Keep the file open in trySkipCache (CASSANDRA-10669) * Updated trigger example (CASSANDRA-10257) Merged from 2.2: http://git-wip-us.apache.org/repos/asf/cassandra/blob/5730e7b9/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 92b04fe..db53c25 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -530,7 +530,7 @@ public class SecondaryIndexManager implements IndexRegistry partition.columns(), nowInSec); indexTransaction.start(); - indexTransaction.onPartitionDeletion(partition.partitionLevelDeletion()); + indexTransaction.onPartitionDeletion(new DeletionTime(FBUtilities.timestampMicros(), nowInSec)); indexTransaction.commit(); while (partition.hasNext()) @@ -978,8 +978,13 @@ public class SecondaryIndexManager implements IndexRegistry { Index.Indexer indexer = index.indexerFor(key, nowInSec, opGroup, Type.CLEANUP); indexer.begin(); + + if (partitionDelete != null) + indexer.partitionDelete(partitionDelete); + if (row != null) indexer.removeRow(row); + indexer.finish(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5730e7b9/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java index b372c59..b305868 100644 --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@ -9,6 +9,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.junit.Test; +import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.ColumnIdentifier; @@ -18,12 +19,15 @@ import org.apache.cassandra.cql3.statements.ModificationStatement; import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadOrderGroup; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; +import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.Util.throwAssert; import static org.apache.cassandra.cql3.statements.IndexTarget.CUSTOM_INDEX_OPTION_NAME; @@ -471,6 +475,35 @@ public class CustomIndexTest extends CQLTester assertEquals(1, index.reloads.get()); } + @Test + public void notifyIndexersOfPartitionAndRowRemovalDuringCleanup() throws Throwable + { + createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k,c))"); + createIndex(String.format("CREATE CUSTOM INDEX cleanup_index ON %%s() USING '%s'", StubIndex.class.getName())); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + StubIndex index = (StubIndex)cfs.indexManager.getIndexByName("cleanup_index"); + + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 0, 0); + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 1, 1); + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 2, 2); + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 3, 3, 3); + assertEquals(4, index.rowsInserted.size()); + assertEquals(0, index.partitionDeletions.size()); + + ReadCommand cmd = Util.cmd(cfs, 0).build(); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); + UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup)) + { + assertTrue(iterator.hasNext()); + cfs.indexManager.deletePartition(iterator.next(), FBUtilities.nowInSeconds()); + } + + assertEquals(1, index.partitionDeletions.size()); + assertEquals(3, index.rowsDeleted.size()); + for (int i = 0; i < 3; i++) + assertEquals(index.rowsDeleted.get(i).clustering(), index.rowsInserted.get(i).clustering()); + } + private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable { createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",
