http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java index fa6700b,0000000..f02823c mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@@ -1,806 -1,0 +1,826 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.index; + +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.restrictions.IndexRestrictions; +import org.apache.cassandra.cql3.restrictions.StatementRestrictions; +import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.cql3.statements.ModificationStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadOrderGroup; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.Indexes; +import org.apache.cassandra.transport.Server; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.Util.throwAssert; +import static org.apache.cassandra.cql3.statements.IndexTarget.CUSTOM_INDEX_OPTION_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class CustomIndexTest extends CQLTester +{ + @Test + public void testInsertsOnCfsBackedIndex() throws Throwable + { + // test to ensure that we don't deadlock when flushing CFS backed custom indexers + // see CASSANDRA-10181 + createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); + createIndex("CREATE CUSTOM INDEX myindex ON %s(c) USING 'org.apache.cassandra.index.internal.CustomCassandraIndex'"); + + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 2); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 0, 1); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 2, 0, 0); + } + + @Test + public void testTruncateWithNonCfsCustomIndex() throws Throwable + { + // deadlocks and times out the test in the face of the synchronisation + // issues described in the comments on CASSANDRA-9669 + createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a))"); + createIndex("CREATE CUSTOM INDEX b_index ON %s(b) USING 'org.apache.cassandra.index.StubIndex'"); + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 0, 1, 2); + getCurrentColumnFamilyStore().truncateBlocking(); + } + + @Test + public void indexControlsIfIncludedInBuildOnNewSSTables() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, PRIMARY KEY (a))"); + String toInclude = "include"; + String toExclude = "exclude"; + createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'", + toInclude, IndexIncludedInBuild.class.getName())); + createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'", + toExclude, IndexExcludedFromBuild.class.getName())); + + execute("INSERT INTO %s (a, b) VALUES (?, ?)", 0, 0); + execute("INSERT INTO %s (a, b) VALUES (?, ?)", 1, 1); + execute("INSERT INTO %s (a, b) VALUES (?, ?)", 2, 2); + flush(); + + SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager; + IndexIncludedInBuild included = (IndexIncludedInBuild)indexManager.getIndexByName(toInclude); + included.reset(); + assertTrue(included.rowsInserted.isEmpty()); + + IndexExcludedFromBuild excluded = (IndexExcludedFromBuild)indexManager.getIndexByName(toExclude); + excluded.reset(); + assertTrue(excluded.rowsInserted.isEmpty()); + + indexManager.buildAllIndexesBlocking(getCurrentColumnFamilyStore().getLiveSSTables()); + + assertEquals(3, included.rowsInserted.size()); + assertTrue(excluded.rowsInserted.isEmpty()); + } + + @Test + public void indexReceivesWriteTimeDeletionsCorrectly() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b, c))"); + String indexName = "test_index"; + createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(d) USING '%s'", + indexName, StubIndex.class.getName())); + + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 0); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 1, 1); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 2, 2); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 3, 3); + + SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager; + StubIndex index = (StubIndex)indexManager.getIndexByName(indexName); + assertEquals(4, index.rowsInserted.size()); + assertTrue(index.partitionDeletions.isEmpty()); + assertTrue(index.rangeTombstones.isEmpty()); + + execute("DELETE FROM %s WHERE a=0 AND b=0"); + assertTrue(index.partitionDeletions.isEmpty()); + assertEquals(1, index.rangeTombstones.size()); + + execute("DELETE FROM %s WHERE a=0"); + assertEquals(1, index.partitionDeletions.size()); + assertEquals(1, index.rangeTombstones.size()); + } + @Test + public void nonCustomIndexesRequireExactlyOneTargetColumn() throws Throwable + { + createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY (k,c))"); + + assertInvalidMessage("Only CUSTOM indexes support multiple columns", "CREATE INDEX multi_idx on %s(v1,v2)"); + assertInvalidMessage("Only CUSTOM indexes can be created without specifying a target column", + "CREATE INDEX no_targets on %s()"); + + createIndex(String.format("CREATE CUSTOM INDEX multi_idx ON %%s(v1, v2) USING '%s'", StubIndex.class.getName())); + assertIndexCreated("multi_idx", "v1", "v2"); + } + + @Test + public void rejectDuplicateColumnsInTargetList() throws Throwable + { + createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY (k,c))"); + + assertInvalidMessage("Duplicate column v1 in index target list", + String.format("CREATE CUSTOM INDEX ON %%s(v1, v1) USING '%s'", + StubIndex.class.getName())); + + assertInvalidMessage("Duplicate column v1 in index target list", + String.format("CREATE CUSTOM INDEX ON %%s(v1, v1, c, c) USING '%s'", + StubIndex.class.getName())); + } + + @Test + public void requireFullQualifierForFrozenCollectionTargets() throws Throwable + { + // this is really just to prove that we require the full modifier on frozen collection + // targets whether the index is multicolumn or not + createTable("CREATE TABLE %s(" + + " k int," + + " c int," + + " fmap frozen<map<int, text>>," + + " flist frozen<list<int>>," + + " fset frozen<set<int>>," + + " PRIMARY KEY(k,c))"); + + assertInvalidMessage("Cannot create keys() index on frozen column fmap. " + + "Frozen collections only support full() indexes", + String.format("CREATE CUSTOM INDEX ON %%s(c, keys(fmap)) USING'%s'", + StubIndex.class.getName())); + assertInvalidMessage("Cannot create entries() index on frozen column fmap. " + + "Frozen collections only support full() indexes", + String.format("CREATE CUSTOM INDEX ON %%s(c, entries(fmap)) USING'%s'", + StubIndex.class.getName())); + assertInvalidMessage("Cannot create values() index on frozen column fmap. " + + "Frozen collections only support full() indexes", + String.format("CREATE CUSTOM INDEX ON %%s(c, fmap) USING'%s'", StubIndex.class.getName())); + + assertInvalidMessage("Cannot create keys() index on frozen column flist. " + + "Frozen collections only support full() indexes", + String.format("CREATE CUSTOM INDEX ON %%s(c, keys(flist)) USING'%s'", + StubIndex.class.getName())); + assertInvalidMessage("Cannot create entries() index on frozen column flist. " + + "Frozen collections only support full() indexes", + String.format("CREATE CUSTOM INDEX ON %%s(c, entries(flist)) USING'%s'", + StubIndex.class.getName())); + assertInvalidMessage("Cannot create values() index on frozen column flist. " + + "Frozen collections only support full() indexes", + String.format("CREATE CUSTOM INDEX ON %%s(c, flist) USING'%s'", StubIndex.class.getName())); + + assertInvalidMessage("Cannot create keys() index on frozen column fset. " + + "Frozen collections only support full() indexes", + String.format("CREATE CUSTOM INDEX ON %%s(c, keys(fset)) USING'%s'", + StubIndex.class.getName())); + assertInvalidMessage("Cannot create entries() index on frozen column fset. " + + "Frozen collections only support full() indexes", + String.format("CREATE CUSTOM INDEX ON %%s(c, entries(fset)) USING'%s'", + StubIndex.class.getName())); + assertInvalidMessage("Cannot create values() index on frozen column fset. " + + "Frozen collections only support full() indexes", + String.format("CREATE CUSTOM INDEX ON %%s(c, fset) USING'%s'", StubIndex.class.getName())); + + createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(fmap)) USING'%s'", StubIndex.class.getName())); + createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(flist)) USING'%s'", StubIndex.class.getName())); + createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(fset)) USING'%s'", StubIndex.class.getName())); + } + + @Test + public void defaultIndexNameContainsTargetColumns() throws Throwable + { + createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))"); + + createIndex(String.format("CREATE CUSTOM INDEX ON %%s(v1, v2) USING '%s'", StubIndex.class.getName())); + assertEquals(1, getCurrentColumnFamilyStore().metadata.getIndexes().size()); + assertIndexCreated(currentTable() + "_idx", "v1", "v2"); + + createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v1, v2) USING '%s'", StubIndex.class.getName())); + assertEquals(2, getCurrentColumnFamilyStore().metadata.getIndexes().size()); + assertIndexCreated(currentTable() + "_idx_1", "c", "v1", "v2"); + + createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s'", StubIndex.class.getName())); + assertEquals(3, getCurrentColumnFamilyStore().metadata.getIndexes().size()); + assertIndexCreated(currentTable() + "_idx_2", "c", "v2"); + + // duplicate the previous index with some additional options and check the name is generated as expected + createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s' WITH OPTIONS = {'foo':'bar'}", + StubIndex.class.getName())); + assertEquals(4, getCurrentColumnFamilyStore().metadata.getIndexes().size()); + Map<String, String> options = new HashMap<>(); + options.put("foo", "bar"); + assertIndexCreated(currentTable() + "_idx_3", options, "c", "v2"); + } + + @Test + public void createMultiColumnIndexes() throws Throwable + { + // smoke test for various permutations of multicolumn indexes + createTable("CREATE TABLE %s (" + + " pk1 int," + + " pk2 int," + + " c1 int," + + " c2 int," + + " v1 int," + + " v2 int," + + " mval map<text, int>," + + " lval list<int>," + + " sval set<int>," + + " fmap frozen<map<text,int>>," + + " flist frozen<list<int>>," + + " fset frozen<set<int>>," + + " PRIMARY KEY ((pk1, pk2), c1, c2))"); + + testCreateIndex("idx_1", "pk1", "pk2"); + testCreateIndex("idx_2", "pk1", "c1"); + testCreateIndex("idx_3", "pk1", "c2"); + testCreateIndex("idx_4", "c1", "c2"); + testCreateIndex("idx_5", "c2", "v1"); + testCreateIndex("idx_6", "v1", "v2"); + testCreateIndex("idx_7", "pk2", "c2", "v2"); + testCreateIndex("idx_8", "pk1", "c1", "v1", "mval", "sval", "lval"); + + createIndex(String.format("CREATE CUSTOM INDEX inc_frozen ON %%s(" + + " pk2, c2, v2, full(fmap), full(fset), full(flist)" + + ") USING '%s'", + StubIndex.class.getName())); + assertIndexCreated("inc_frozen", + new HashMap<>(), + ImmutableList.of(indexTarget("pk2", IndexTarget.Type.VALUES), + indexTarget("c2", IndexTarget.Type.VALUES), + indexTarget("v2", IndexTarget.Type.VALUES), + indexTarget("fmap", IndexTarget.Type.FULL), + indexTarget("fset", IndexTarget.Type.FULL), + indexTarget("flist", IndexTarget.Type.FULL))); + + createIndex(String.format("CREATE CUSTOM INDEX all_teh_things ON %%s(" + + " pk1, pk2, c1, c2, v1, v2, keys(mval), lval, sval, full(fmap), full(fset), full(flist)" + + ") USING '%s'", + StubIndex.class.getName())); + assertIndexCreated("all_teh_things", + new HashMap<>(), + ImmutableList.of(indexTarget("pk1", IndexTarget.Type.VALUES), + indexTarget("pk2", IndexTarget.Type.VALUES), + indexTarget("c1", IndexTarget.Type.VALUES), + indexTarget("c2", IndexTarget.Type.VALUES), + indexTarget("v1", IndexTarget.Type.VALUES), + indexTarget("v2", IndexTarget.Type.VALUES), + indexTarget("mval", IndexTarget.Type.KEYS), + indexTarget("lval", IndexTarget.Type.VALUES), + indexTarget("sval", IndexTarget.Type.VALUES), + indexTarget("fmap", IndexTarget.Type.FULL), + indexTarget("fset", IndexTarget.Type.FULL), + indexTarget("flist", IndexTarget.Type.FULL))); + } + + @Test + public void createMultiColumnIndexIncludingUserTypeColumn() throws Throwable + { + String myType = KEYSPACE + '.' + createType("CREATE TYPE %s (a int, b int)"); + createTable("CREATE TABLE %s (k int PRIMARY KEY, v1 int, v2 frozen<" + myType + ">)"); + testCreateIndex("udt_idx", "v1", "v2"); + Indexes indexes = getCurrentColumnFamilyStore().metadata.getIndexes(); + IndexMetadata expected = IndexMetadata.fromIndexTargets(getCurrentColumnFamilyStore().metadata, + ImmutableList.of(indexTarget("v1", IndexTarget.Type.VALUES), + indexTarget("v2", IndexTarget.Type.VALUES)), + "udt_idx", + IndexMetadata.Kind.CUSTOM, + ImmutableMap.of(CUSTOM_INDEX_OPTION_NAME, + StubIndex.class.getName())); + IndexMetadata actual = indexes.get("udt_idx").orElseThrow(throwAssert("Index udt_idx not found")); + assertEquals(expected, actual); + } + + @Test + public void createIndexWithoutTargets() throws Throwable + { + createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))"); + // only allowed for CUSTOM indexes + assertInvalidMessage("Only CUSTOM indexes can be created without specifying a target column", + "CREATE INDEX ON %s()"); + + // parentheses are mandatory + assertInvalidSyntax("CREATE CUSTOM INDEX ON %%s USING '%s'", StubIndex.class.getName()); + createIndex(String.format("CREATE CUSTOM INDEX no_targets ON %%s() USING '%s'", StubIndex.class.getName())); + assertIndexCreated("no_targets", new HashMap<>()); + } + + @Test + public void testCustomIndexExpressionSyntax() throws Throwable + { + Object[] row = row(0, 0, 0, 0); + createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", row); + + assertInvalidMessage(String.format(IndexRestrictions.INDEX_NOT_FOUND, "custom_index", keyspace(), currentTable()), + "SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')"); + + createIndex(String.format("CREATE CUSTOM INDEX custom_index ON %%s(c) USING '%s'", StubIndex.class.getName())); + + assertInvalidThrowMessage(Server.CURRENT_VERSION, + String.format(IndexRestrictions.INDEX_NOT_FOUND, "no_such_index", keyspace(), currentTable()), + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(no_such_index, 'foo bar baz ')"); + + // simple case + assertRows(execute("SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')"), row); + assertRows(execute("SELECT * FROM %s WHERE expr(\"custom_index\", 'foo bar baz')"), row); + assertRows(execute("SELECT * FROM %s WHERE expr(custom_index, $$foo \" ~~~ bar Baz$$)"), row); + + // multiple expressions on the same index + assertInvalidThrowMessage(Server.CURRENT_VERSION, + IndexRestrictions.MULTIPLE_EXPRESSIONS, + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND expr(custom_index, 'bar')"); + + // multiple expressions on different indexes + createIndex(String.format("CREATE CUSTOM INDEX other_custom_index ON %%s(d) USING '%s'", StubIndex.class.getName())); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + IndexRestrictions.MULTIPLE_EXPRESSIONS, + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND expr(other_custom_index, 'bar')"); + + assertInvalidThrowMessage(Server.CURRENT_VERSION, + StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE, + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND d=0"); + assertRows(execute("SELECT * FROM %s WHERE expr(custom_index, 'foo') AND d=0 ALLOW FILTERING"), row); + } + + @Test + public void customIndexDoesntSupportCustomExpressions() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); + createIndex(String.format("CREATE CUSTOM INDEX custom_index ON %%s(c) USING '%s'", + NoCustomExpressionsIndex.class.getName())); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + String.format( IndexRestrictions.CUSTOM_EXPRESSION_NOT_SUPPORTED, "custom_index"), + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')"); + } + + @Test + public void customIndexRejectsExpressionSyntax() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); + createIndex(String.format("CREATE CUSTOM INDEX custom_index ON %%s(c) USING '%s'", + AlwaysRejectIndex.class.getName())); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + "None shall pass", + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')"); + } + + @Test + public void customExpressionsMustTargetCustomIndex() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); + createIndex("CREATE INDEX non_custom_index ON %s(c)"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + String.format(IndexRestrictions.NON_CUSTOM_INDEX_IN_EXPRESSION, "non_custom_index"), + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(non_custom_index, 'c=0')"); + } + + @Test + public void customExpressionsDisallowedInModifications() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); + String indexName = currentTable() + "_custom_index"; + createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(c) USING '%s'", + indexName, StubIndex.class.getName())); + + assertInvalidThrowMessage(Server.CURRENT_VERSION, + ModificationStatement.CUSTOM_EXPRESSIONS_NOT_ALLOWED, + QueryValidationException.class, + String.format("DELETE FROM %%s WHERE expr(%s, 'foo bar baz ')", indexName)); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + ModificationStatement.CUSTOM_EXPRESSIONS_NOT_ALLOWED, + QueryValidationException.class, + String.format("UPDATE %%s SET d=0 WHERE expr(%s, 'foo bar baz ')", indexName)); + } + + @Test + public void indexSelectionPrefersMostSelectiveIndex() throws Throwable + { + createTable("CREATE TABLE %s(a int, b int, c int, PRIMARY KEY (a))"); + createIndex(String.format("CREATE CUSTOM INDEX %s_more_selective ON %%s(b) USING '%s'", + currentTable(), + SettableSelectivityIndex.class.getName())); + createIndex(String.format("CREATE CUSTOM INDEX %s_less_selective ON %%s(c) USING '%s'", + currentTable(), + SettableSelectivityIndex.class.getName())); + SettableSelectivityIndex moreSelective = + (SettableSelectivityIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName(currentTable() + "_more_selective"); + SettableSelectivityIndex lessSelective = + (SettableSelectivityIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName(currentTable() + "_less_selective"); + assertEquals(0, moreSelective.searchersProvided); + assertEquals(0, lessSelective.searchersProvided); + + // the more selective index should be chosen + moreSelective.setEstimatedResultRows(1); + lessSelective.setEstimatedResultRows(1000); + execute("SELECT * FROM %s WHERE b=0 AND c=0 ALLOW FILTERING"); + assertEquals(1, moreSelective.searchersProvided); + assertEquals(0, lessSelective.searchersProvided); + + // and adjusting the selectivity should have an observable effect + moreSelective.setEstimatedResultRows(10000); + execute("SELECT * FROM %s WHERE b=0 AND c=0 ALLOW FILTERING"); + assertEquals(1, moreSelective.searchersProvided); + assertEquals(1, lessSelective.searchersProvided); + } + + @Test + public void customExpressionForcesIndexSelection() throws Throwable + { + createTable("CREATE TABLE %s(a int, b int, c int, PRIMARY KEY (a))"); + createIndex(String.format("CREATE CUSTOM INDEX %s_more_selective ON %%s(b) USING '%s'", + currentTable(), + SettableSelectivityIndex.class.getName())); + createIndex(String.format("CREATE CUSTOM INDEX %s_less_selective ON %%s(c) USING '%s'", + currentTable(), + SettableSelectivityIndex.class.getName())); + SettableSelectivityIndex moreSelective = + (SettableSelectivityIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName(currentTable() + "_more_selective"); + SettableSelectivityIndex lessSelective = + (SettableSelectivityIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName(currentTable() + "_less_selective"); + assertEquals(0, moreSelective.searchersProvided); + assertEquals(0, lessSelective.searchersProvided); + + // without a custom expression, the more selective index should be chosen + moreSelective.setEstimatedResultRows(1); + lessSelective.setEstimatedResultRows(1000); + execute("SELECT * FROM %s WHERE b=0 AND c=0 ALLOW FILTERING"); + assertEquals(1, moreSelective.searchersProvided); + assertEquals(0, lessSelective.searchersProvided); + + // when a custom expression is present, its target index should be preferred + execute(String.format("SELECT * FROM %%s WHERE b=0 AND expr(%s_less_selective, 'expression') ALLOW FILTERING", currentTable())); + assertEquals(1, moreSelective.searchersProvided); + assertEquals(1, lessSelective.searchersProvided); + } + + @Test + public void testCustomExpressionValueType() throws Throwable + { + // verify that the type of the expression value is determined by Index::customExpressionValueType + createTable("CREATE TABLE %s (k int, v1 uuid, v2 blob, PRIMARY KEY(k))"); + createIndex(String.format("CREATE CUSTOM INDEX int_index ON %%s() USING '%s'", + Int32ExpressionIndex.class.getName())); + createIndex(String.format("CREATE CUSTOM INDEX text_index ON %%s() USING '%s'", + UTF8ExpressionIndex.class.getName())); + + execute("SELECT * FROM %s WHERE expr(text_index, 'foo')"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + "Invalid INTEGER constant (99) for \"custom index expression\" of type text", + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(text_index, 99)"); + + execute("SELECT * FROM %s WHERE expr(int_index, 99)"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + "Invalid STRING constant (foo) for \"custom index expression\" of type int", + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(int_index, 'foo')"); + } + + @Test + public void reloadIndexMetadataOnBaseCfsReload() throws Throwable + { + // verify that whenever the base table CFMetadata is reloaded, a reload of the index + // metadata is performed + createTable("CREATE TABLE %s (k int, v1 int, PRIMARY KEY(k))"); + createIndex(String.format("CREATE CUSTOM INDEX reload_counter ON %%s() USING '%s'", + CountMetadataReloadsIndex.class.getName())); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + CountMetadataReloadsIndex index = (CountMetadataReloadsIndex)cfs.indexManager.getIndexByName("reload_counter"); + assertEquals(0, index.reloads.get()); + + // reloading the CFS, even without any metadata changes invokes the index's metadata reload task + cfs.reload(); + assertEquals(1, index.reloads.get()); + } + + @Test + public void notifyIndexersOfPartitionAndRowRemovalDuringCleanup() throws Throwable + { + createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k,c))"); + createIndex(String.format("CREATE CUSTOM INDEX cleanup_index ON %%s() USING '%s'", StubIndex.class.getName())); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + StubIndex index = (StubIndex)cfs.indexManager.getIndexByName("cleanup_index"); + + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 0, 0); + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 1, 1); + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 2, 2); + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 3, 3, 3); + assertEquals(4, index.rowsInserted.size()); + assertEquals(0, index.partitionDeletions.size()); + + ReadCommand cmd = Util.cmd(cfs, 0).build(); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); + UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup)) + { + assertTrue(iterator.hasNext()); + cfs.indexManager.deletePartition(iterator.next(), FBUtilities.nowInSeconds()); + } + + assertEquals(1, index.partitionDeletions.size()); + assertEquals(3, index.rowsDeleted.size()); + for (int i = 0; i < 3; i++) + assertEquals(index.rowsDeleted.get(i).clustering(), index.rowsInserted.get(i).clustering()); + } + + @Test + public void notifyIndexersOfExpiredRowsDuringCompaction() throws Throwable + { + createTable("CREATE TABLE %s (k int, c int, PRIMARY KEY (k,c))"); + createIndex(String.format("CREATE CUSTOM INDEX row_ttl_test_index ON %%s() USING '%s'", StubIndex.class.getName())); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + StubIndex index = (StubIndex)cfs.indexManager.getIndexByName("row_ttl_test_index"); + + execute("INSERT INTO %s (k, c) VALUES (?, ?) USING TTL 1", 0, 0); + execute("INSERT INTO %s (k, c) VALUES (?, ?)", 0, 1); + execute("INSERT INTO %s (k, c) VALUES (?, ?)", 0, 2); + execute("INSERT INTO %s (k, c) VALUES (?, ?)", 3, 3); + assertEquals(4, index.rowsInserted.size()); + // flush so that we end up with an expiring row in the first sstable + flush(); + + // let the row with the ttl expire, then force a compaction + TimeUnit.SECONDS.sleep(2); + compact(); + + // the index should have been notified of the expired row + assertEquals(1, index.rowsDeleted.size()); + Integer deletedClustering = Int32Type.instance.compose(index.rowsDeleted.get(0).clustering().get(0)); + assertEquals(0, deletedClustering.intValue()); + } + + @Test + public void validateOptions() throws Throwable + { + createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))"); + createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s' WITH OPTIONS = {'foo':'bar'}", + IndexWithValidateOptions.class.getName())); + assertNotNull(IndexWithValidateOptions.options); + assertEquals("bar", IndexWithValidateOptions.options.get("foo")); + } + + @Test + public void validateOptionsWithCFMetaData() throws Throwable + { + createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))"); + createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s' WITH OPTIONS = {'foo':'bar'}", + IndexWithOverloadedValidateOptions.class.getName())); + CFMetaData cfm = getCurrentColumnFamilyStore().metadata; + assertEquals(cfm, IndexWithOverloadedValidateOptions.cfm); + assertNotNull(IndexWithOverloadedValidateOptions.options); + assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo")); + } + + private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable + { + createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'", + indexName, + Arrays.stream(targetColumnNames).collect(Collectors.joining(",")), + StubIndex.class.getName())); + assertIndexCreated(indexName, targetColumnNames); + } + + private void assertIndexCreated(String name, String... targetColumnNames) + { + assertIndexCreated(name, new HashMap<>(), targetColumnNames); + } + + private void assertIndexCreated(String name, Map<String, String> options, String... targetColumnNames) + { + List<IndexTarget> targets = Arrays.stream(targetColumnNames) + .map(s -> new IndexTarget(ColumnIdentifier.getInterned(s, true), + IndexTarget.Type.VALUES)) + .collect(Collectors.toList()); + assertIndexCreated(name, options, targets); + } + + private void assertIndexCreated(String name, Map<String, String> options, List<IndexTarget> targets) + { + // all tests here use StubIndex as the custom index class, + // so add that to the map of options + options.put(CUSTOM_INDEX_OPTION_NAME, StubIndex.class.getName()); + CFMetaData cfm = getCurrentColumnFamilyStore().metadata; + IndexMetadata expected = IndexMetadata.fromIndexTargets(cfm, targets, name, IndexMetadata.Kind.CUSTOM, options); + Indexes indexes = getCurrentColumnFamilyStore().metadata.getIndexes(); + for (IndexMetadata actual : indexes) + if (actual.equals(expected)) + return; + + fail(String.format("Index %s not found in CFMetaData", expected)); + } + + private static IndexTarget indexTarget(String name, IndexTarget.Type type) + { + return new IndexTarget(ColumnIdentifier.getInterned(name, true), type); + } + + public static final class CountMetadataReloadsIndex extends StubIndex + { + private final AtomicInteger reloads = new AtomicInteger(0); + + public CountMetadataReloadsIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public void reset() + { + super.reset(); + reloads.set(0); + } + + public Callable<?> getMetadataReloadTask(IndexMetadata indexMetadata) + { + return reloads::incrementAndGet; + } + } + + public static final class IndexIncludedInBuild extends StubIndex + { + public IndexIncludedInBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public boolean shouldBuildBlocking() + { + return true; + } + } + + public static final class UTF8ExpressionIndex extends StubIndex + { + public UTF8ExpressionIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public AbstractType<?> customExpressionValueType() + { + return UTF8Type.instance; + } + } + + public static final class Int32ExpressionIndex extends StubIndex + { + public Int32ExpressionIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public AbstractType<?> customExpressionValueType() + { + return Int32Type.instance; + } + } + + public static final class SettableSelectivityIndex extends StubIndex + { + private int searchersProvided = 0; + private long estimatedResultRows = 0; + + public SettableSelectivityIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public void setEstimatedResultRows(long estimate) + { + estimatedResultRows = estimate; + } + + public long getEstimatedResultRows() + { + return estimatedResultRows; + } + + public Searcher searcherFor(ReadCommand command) + { + searchersProvided++; + return super.searcherFor(command); + } + } + + public static final class IndexExcludedFromBuild extends StubIndex + { + public IndexExcludedFromBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public boolean shouldBuildBlocking() + { + return false; + } + } + + public static final class NoCustomExpressionsIndex extends StubIndex + { + public NoCustomExpressionsIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public AbstractType<?> customExpressionValueType() + { + return null; + } + } + + public static final class AlwaysRejectIndex extends StubIndex + { + public AlwaysRejectIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public void validate(ReadCommand command) throws InvalidRequestException + { + throw new InvalidRequestException("None shall pass"); + } + + public Searcher searcherFor(ReadCommand command) + { + throw new InvalidRequestException("None shall pass (though I'd have expected to fail faster)"); + } + } + + public static final class IndexWithValidateOptions extends StubIndex + { + public static Map<String, String> options; + + public IndexWithValidateOptions(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public static Map<String, String> validateOptions(Map<String, String> options) + { + IndexWithValidateOptions.options = options; + return new HashMap<>(); + } + } + + public static final class IndexWithOverloadedValidateOptions extends StubIndex + { + public static CFMetaData cfm; + public static Map<String, String> options; + + public IndexWithOverloadedValidateOptions(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public static Map<String, String> validateOptions(Map<String, String> options, CFMetaData cfm) + { + IndexWithOverloadedValidateOptions.options = options; + IndexWithOverloadedValidateOptions.cfm = cfm; + return new HashMap<>(); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java index a30cf4e,0000000..6aaefb7 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java +++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java @@@ -1,640 -1,0 +1,660 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.index.internal; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.index.IndexRegistry; +import org.apache.cassandra.index.SecondaryIndexBuilder; +import org.apache.cassandra.index.transactions.IndexTransaction; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.Refs; + +import static org.apache.cassandra.index.internal.CassandraIndex.getFunctions; +import static org.apache.cassandra.index.internal.CassandraIndex.indexCfsMetadata; +import static org.apache.cassandra.index.internal.CassandraIndex.parseTarget; + +/** + * Clone of KeysIndex used in CassandraIndexTest#testCustomIndexWithCFS to verify + * behaviour of flushing CFS backed CUSTOM indexes + */ +public class CustomCassandraIndex implements Index +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class); + + public final ColumnFamilyStore baseCfs; + protected IndexMetadata metadata; + protected ColumnFamilyStore indexCfs; + protected ColumnDefinition indexedColumn; + protected CassandraIndexFunctions functions; + + public CustomCassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) + { + this.baseCfs = baseCfs; + setMetadata(indexDef); + } + + /** + * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value] + * @param indexedColumn + * @param operator + * @return + */ + protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator) + { + return operator.equals(Operator.EQ); + } + + public ColumnDefinition getIndexedColumn() + { + return indexedColumn; + } + + public ClusteringComparator getIndexComparator() + { + return indexCfs.metadata.comparator; + } + + public ColumnFamilyStore getIndexCfs() + { + return indexCfs; + } + + public void register(IndexRegistry registry) + { + registry.registerIndex(this); + } + + public Callable<?> getInitializationTask() + { + // if we're just linking in the index on an already-built index post-restart + // or if the table is empty we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder + return isBuilt() || baseCfs.isEmpty() ? null : getBuildIndexTask(); + } + + public IndexMetadata getIndexMetadata() + { + return metadata; + } + + public Optional<ColumnFamilyStore> getBackingTable() + { + return indexCfs == null ? Optional.empty() : Optional.of(indexCfs); + } + + public Callable<Void> getBlockingFlushTask() + { + return () -> { + indexCfs.forceBlockingFlush(); + return null; + }; + } + + public Callable<?> getInvalidateTask() + { + return () -> { + invalidate(); + return null; + }; + } + + public Callable<?> getMetadataReloadTask(IndexMetadata indexDef) + { + setMetadata(indexDef); + return () -> { + indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata); + indexCfs.reload(); + return null; + }; + } + + private void setMetadata(IndexMetadata indexDef) + { + metadata = indexDef; + Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef); + functions = getFunctions(indexDef, target); + CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef); + indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace, + cfm.cfName, + cfm, + baseCfs.getTracker().loadsstables); + indexedColumn = target.left; + } + + public Callable<?> getTruncateTask(final long truncatedAt) + { + return () -> { + indexCfs.discardSSTables(truncatedAt); + return null; + }; + } + + public boolean shouldBuildBlocking() + { + return true; + } + + public boolean dependsOn(ColumnDefinition column) + { + return column.equals(indexedColumn); + } + + public boolean supportsExpression(ColumnDefinition column, Operator operator) + { + return indexedColumn.name.equals(column.name) + && supportsOperator(indexedColumn, operator); + } + + public AbstractType<?> customExpressionValueType() + { + return null; + } + + private boolean supportsExpression(RowFilter.Expression expression) + { + return supportsExpression(expression.column(), expression.operator()); + } + + public long getEstimatedResultRows() + { + return indexCfs.getMeanColumns(); + } + + /** + * No post processing of query results, just return them unchanged + */ + public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command) + { + return (partitionIterator, readCommand) -> partitionIterator; + } + + public RowFilter getPostIndexQueryFilter(RowFilter filter) + { + return getTargetExpression(filter.getExpressions()).map(filter::without) + .orElse(filter); + } + + private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions) + { + return expressions.stream().filter(this::supportsExpression).findFirst(); + } + + public Index.Searcher searcherFor(ReadCommand command) + { + return null; + } + + public void validate(PartitionUpdate update) throws InvalidRequestException + { + switch (indexedColumn.kind) + { + case PARTITION_KEY: + validatePartitionKey(update.partitionKey()); + break; + case CLUSTERING: + validateClusterings(update); + break; + case REGULAR: + validateRows(update); + break; + case STATIC: + validateRows(Collections.singleton(update.staticRow())); + break; + } + } + + protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey, + ClusteringPrefix prefix, + CellPath path) + { + CBuilder builder = CBuilder.create(getIndexComparator()); + builder.add(partitionKey); + return builder; + } + + protected ByteBuffer getIndexedValue(ByteBuffer partitionKey, + Clustering clustering, + CellPath path, ByteBuffer cellValue) + { + return cellValue; + } + + public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry) + { + throw new UnsupportedOperationException("KEYS indexes do not use a specialized index entry format"); + } + + public boolean isStale(Row row, ByteBuffer indexValue, int nowInSec) + { + if (row == null) + return true; + + Cell cell = row.getCell(indexedColumn); + + return (cell == null + || !cell.isLive(nowInSec) + || indexedColumn.type.compare(indexValue, cell.value()) != 0); + } + + public Indexer indexerFor(final DecoratedKey key, + final PartitionColumns columns, + final int nowInSec, + final OpOrder.Group opGroup, + final IndexTransaction.Type transactionType) + { + if (!isPrimaryKeyIndex() && !columns.contains(indexedColumn)) + return null; + + return new Indexer() + { + public void begin() + { + } + + public void partitionDelete(DeletionTime deletionTime) + { + } + + public void rangeTombstone(RangeTombstone tombstone) + { + } + + public void insertRow(Row row) + { + if (isPrimaryKeyIndex()) + { + indexPrimaryKey(row.clustering(), + getPrimaryKeyIndexLiveness(row), + row.deletion()); + } + else + { + if (indexedColumn.isComplex()) + indexCells(row.clustering(), row.getComplexColumnData(indexedColumn)); + else + indexCell(row.clustering(), row.getCell(indexedColumn)); + } + } + + public void removeRow(Row row) + { + if (isPrimaryKeyIndex()) + indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion()); + + if (indexedColumn.isComplex()) + removeCells(row.clustering(), row.getComplexColumnData(indexedColumn)); + else + removeCell(row.clustering(), row.getCell(indexedColumn)); + } + + + public void updateRow(Row oldRow, Row newRow) + { + if (isPrimaryKeyIndex()) + indexPrimaryKey(newRow.clustering(), + newRow.primaryKeyLivenessInfo(), + newRow.deletion()); + + if (indexedColumn.isComplex()) + { + indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn)); + removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn)); + } + else + { + indexCell(newRow.clustering(), newRow.getCell(indexedColumn)); + removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn)); + } + } + + public void finish() + { + } + + private void indexCells(Clustering clustering, Iterable<Cell> cells) + { + if (cells == null) + return; + + for (Cell cell : cells) + indexCell(clustering, cell); + } + + private void indexCell(Clustering clustering, Cell cell) + { + if (cell == null || !cell.isLive(nowInSec)) + return; + + insert(key.getKey(), + clustering, + cell, + LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), + opGroup); + } + + private void removeCells(Clustering clustering, Iterable<Cell> cells) + { + if (cells == null) + return; + + for (Cell cell : cells) + removeCell(clustering, cell); + } + + private void removeCell(Clustering clustering, Cell cell) + { + if (cell == null || !cell.isLive(nowInSec)) + return; + + delete(key.getKey(), clustering, cell, opGroup, nowInSec); + } + + private void indexPrimaryKey(final Clustering clustering, + final LivenessInfo liveness, + final Row.Deletion deletion) + { + if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP) + insert(key.getKey(), clustering, null, liveness, opGroup); + + if (!deletion.isLive()) + delete(key.getKey(), clustering, deletion.time(), opGroup); + } + + private LivenessInfo getPrimaryKeyIndexLiveness(Row row) + { + long timestamp = row.primaryKeyLivenessInfo().timestamp(); + int ttl = row.primaryKeyLivenessInfo().ttl(); + for (Cell cell : row.cells()) + { + long cellTimestamp = cell.timestamp(); + if (cell.isLive(nowInSec)) + { + if (cellTimestamp > timestamp) + { + timestamp = cellTimestamp; + ttl = cell.ttl(); + } + } + } + return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec); + } + }; + } + + /** + * Specific to internal indexes, this is called by a + * searcher when it encounters a stale entry in the index + * @param indexKey the partition key in the index table + * @param indexClustering the clustering in the index table + * @param deletion deletion timestamp etc + * @param opGroup the operation under which to perform the deletion + */ + public void deleteStaleEntry(DecoratedKey indexKey, + Clustering indexClustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + doDelete(indexKey, indexClustering, deletion, opGroup); + logger.debug("Removed index entry for stale value {}", indexKey); + } + + /** + * Called when adding a new entry to the index + */ + private void insert(ByteBuffer rowKey, + Clustering clustering, + Cell cell, + LivenessInfo info, + OpOrder.Group opGroup) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + cell)); + Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info); + PartitionUpdate upd = partitionUpdate(valueKey, row); + indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + logger.debug("Inserted entry into index for value {}", valueKey); + } + + /** + * Called when deleting entries on non-primary key columns + */ + private void delete(ByteBuffer rowKey, + Clustering clustering, + Cell cell, + OpOrder.Group opGroup, + int nowInSec) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + cell)); + doDelete(valueKey, + buildIndexClustering(rowKey, clustering, cell), + new DeletionTime(cell.timestamp(), nowInSec), + opGroup); + } + + /** + * Called when deleting entries from indexes on primary key columns + */ + private void delete(ByteBuffer rowKey, + Clustering clustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + null)); + doDelete(valueKey, + buildIndexClustering(rowKey, clustering, null), + deletion, + opGroup); + } + + private void doDelete(DecoratedKey indexKey, + Clustering indexClustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion)); + PartitionUpdate upd = partitionUpdate(indexKey, row); + indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + logger.debug("Removed index entry for value {}", indexKey); + } + + private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException + { + assert indexedColumn.isPartitionKey(); + validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null )); + } + + private void validateClusterings(PartitionUpdate update) throws InvalidRequestException + { + assert indexedColumn.isClusteringColumn(); + for (Row row : update) + validateIndexedValue(getIndexedValue(null, row.clustering(), null)); + } + + private void validateRows(Iterable<Row> rows) + { + assert !indexedColumn.isPrimaryKeyColumn(); + for (Row row : rows) + { + if (indexedColumn.isComplex()) + { + ComplexColumnData data = row.getComplexColumnData(indexedColumn); + if (data != null) + { + for (Cell cell : data) + { + validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value())); + } + } + } + else + { + validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn))); + } + } + } + + private void validateIndexedValue(ByteBuffer value) + { + if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT) + throw new InvalidRequestException(String.format( + "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)", + value.remaining(), + metadata.name, + baseCfs.metadata.ksName, + baseCfs.metadata.cfName, + indexedColumn.name.toString(), + FBUtilities.MAX_UNSIGNED_SHORT)); + } + + private ByteBuffer getIndexedValue(ByteBuffer rowKey, + Clustering clustering, + Cell cell) + { + return getIndexedValue(rowKey, + clustering, + cell == null ? null : cell.path(), + cell == null ? null : cell.value() + ); + } + + private Clustering buildIndexClustering(ByteBuffer rowKey, + Clustering clustering, + Cell cell) + { + return buildIndexClusteringPrefix(rowKey, + clustering, + cell == null ? null : cell.path()).build(); + } + + private DecoratedKey getIndexKeyFor(ByteBuffer value) + { + return indexCfs.decorateKey(value); + } + + private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row) + { + return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row); + } + + private void invalidate() + { + // interrupt in-progress compactions + Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs); + CompactionManager.instance.interruptCompactionForCFs(cfss, true); + CompactionManager.instance.waitForCessation(cfss); + indexCfs.keyspace.writeOrder.awaitNewBarrier(); + indexCfs.forceBlockingFlush(); + indexCfs.readOrdering.awaitNewBarrier(); + indexCfs.invalidate(); + } + + private boolean isBuilt() + { + return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), metadata.name); + } + + private boolean isPrimaryKeyIndex() + { + return indexedColumn.isPrimaryKeyColumn(); + } + + private Callable<?> getBuildIndexTask() + { + return () -> { + buildBlocking(); + return null; + }; + } + + private void buildBlocking() + { + baseCfs.forceBlockingFlush(); + + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + Refs<SSTableReader> sstables = viewFragment.refs) + { + if (sstables.isEmpty()) + { + logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built", + baseCfs.metadata.ksName, + baseCfs.metadata.cfName, + metadata.name); + baseCfs.indexManager.markIndexBuilt(metadata.name); + return; + } + + logger.info("Submitting index build of {} for data in {}", + metadata.name, + getSSTableNames(sstables)); + + SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, + Collections.singleton(this), + new ReducingKeyIterator(sstables)); + Future<?> future = CompactionManager.instance.submitIndexBuild(builder); + FBUtilities.waitOnFuture(future); + indexCfs.forceBlockingFlush(); + baseCfs.indexManager.markIndexBuilt(metadata.name); + } + logger.info("Index build of {} complete", metadata.name); + } + + private static String getSSTableNames(Collection<SSTableReader> sstables) + { + return StreamSupport.stream(sstables.spliterator(), false) + .map(SSTableReader::toString) + .collect(Collectors.joining(", ")); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java index e83c015,0000000..aad5117 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java @@@ -1,483 -1,0 +1,503 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.io.util; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class RandomAccessReaderTest +{ + private static final Logger logger = LoggerFactory.getLogger(RandomAccessReaderTest.class); + + private static final class Parameters + { + public final long fileLength; + public final int bufferSize; + + public BufferType bufferType; + public int maxSegmentSize; + public boolean mmappedRegions; + public byte[] expected; + + Parameters(long fileLength, int bufferSize) + { + this.fileLength = fileLength; + this.bufferSize = bufferSize; + this.bufferType = BufferType.OFF_HEAP; + this.maxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE; + this.mmappedRegions = false; + this.expected = "The quick brown fox jumps over the lazy dog".getBytes(FileUtils.CHARSET); + } + + public Parameters mmappedRegions(boolean mmappedRegions) + { + this.mmappedRegions = mmappedRegions; + return this; + } + + public Parameters bufferType(BufferType bufferType) + { + this.bufferType = bufferType; + return this; + } + + public Parameters maxSegmentSize(int maxSegmentSize) + { + this.maxSegmentSize = maxSegmentSize; + return this; + } + + public Parameters expected(byte[] expected) + { + this.expected = expected; + return this; + } + } + + @Test + public void testBufferedOffHeap() throws IOException + { + testReadFully(new Parameters(8192, 4096).bufferType(BufferType.OFF_HEAP)); + } + + @Test + public void testBufferedOnHeap() throws IOException + { + testReadFully(new Parameters(8192, 4096).bufferType(BufferType.ON_HEAP)); + } + + @Test + public void testBigBufferSize() throws IOException + { + testReadFully(new Parameters(8192, 65536).bufferType(BufferType.ON_HEAP)); + } + + @Test + public void testTinyBufferSize() throws IOException + { + testReadFully(new Parameters(8192, 16).bufferType(BufferType.ON_HEAP)); + } + + @Test + public void testOneSegment() throws IOException + { + testReadFully(new Parameters(8192, 4096).mmappedRegions(true)); + } + + @Test + public void testMultipleSegments() throws IOException + { + testReadFully(new Parameters(8192, 4096).mmappedRegions(true).maxSegmentSize(1024)); + } + + @Test + public void testVeryLarge() throws IOException + { + final long SIZE = 1L << 32; // 2GB + Parameters params = new Parameters(SIZE, 1 << 20); // 1MB + + try(ChannelProxy channel = new ChannelProxy("abc", new FakeFileChannel(SIZE))) + { + RandomAccessReader.Builder builder = new RandomAccessReader.Builder(channel) + .bufferType(params.bufferType) + .bufferSize(params.bufferSize); + + try(RandomAccessReader reader = builder.build()) + { + assertEquals(channel.size(), reader.length()); + assertEquals(channel.size(), reader.bytesRemaining()); + assertEquals(Integer.MAX_VALUE, reader.available()); + + assertEquals(channel.size(), reader.skip(channel.size())); + + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + } + } + } + + /** A fake file channel that simply increments the position and doesn't + * actually read anything. We use it to simulate very large files, > 2G. + */ + private static final class FakeFileChannel extends FileChannel + { + private final long size; + private long position; + + FakeFileChannel(long size) + { + this.size = size; + } + + public int read(ByteBuffer dst) + { + int ret = dst.remaining(); + position += ret; + dst.position(dst.limit()); + return ret; + } + + public long read(ByteBuffer[] dsts, int offset, int length) + { + throw new UnsupportedOperationException(); + } + + public int write(ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + public long write(ByteBuffer[] srcs, int offset, int length) + { + throw new UnsupportedOperationException(); + } + + public long position() + { + return position; + } + + public FileChannel position(long newPosition) + { + position = newPosition; + return this; + } + + public long size() + { + return size; + } + + public FileChannel truncate(long size) + { + throw new UnsupportedOperationException(); + } + + public void force(boolean metaData) + { + throw new UnsupportedOperationException(); + } + + public long transferTo(long position, long count, WritableByteChannel target) + { + throw new UnsupportedOperationException(); + } + + public long transferFrom(ReadableByteChannel src, long position, long count) + { + throw new UnsupportedOperationException(); + } + + public int read(ByteBuffer dst, long position) + { + int ret = dst.remaining(); + this.position = position + ret; + dst.position(dst.limit()); + return ret; + } + + public int write(ByteBuffer src, long position) + { + throw new UnsupportedOperationException(); + } + + public MappedByteBuffer map(MapMode mode, long position, long size) + { + throw new UnsupportedOperationException(); + } + + public FileLock lock(long position, long size, boolean shared) + { + throw new UnsupportedOperationException(); + } + + public FileLock tryLock(long position, long size, boolean shared) + { + throw new UnsupportedOperationException(); + } + + protected void implCloseChannel() + { + + } + } + + private static File writeFile(Parameters params) throws IOException + { + final File f = File.createTempFile("testReadFully", "1"); + f.deleteOnExit(); + + try(SequentialWriter writer = SequentialWriter.open(f)) + { + long numWritten = 0; + while (numWritten < params.fileLength) + { + writer.write(params.expected); + numWritten += params.expected.length; + } + + writer.finish(); + } + + assert f.exists(); + assert f.length() >= params.fileLength; + return f; + } + + private static void testReadFully(Parameters params) throws IOException + { + final File f = writeFile(params); + try(ChannelProxy channel = new ChannelProxy(f)) + { + RandomAccessReader.Builder builder = new RandomAccessReader.Builder(channel) + .bufferType(params.bufferType) + .bufferSize(params.bufferSize); + if (params.mmappedRegions) + builder.regions(MmappedRegions.map(channel, f.length())); + + try(RandomAccessReader reader = builder.build()) + { + assertEquals(f.getAbsolutePath(), reader.getPath()); + assertEquals(f.length(), reader.length()); + assertEquals(f.length(), reader.bytesRemaining()); + assertEquals(Math.min(Integer.MAX_VALUE, f.length()), reader.available()); + + byte[] b = new byte[params.expected.length]; + long numRead = 0; + while (numRead < params.fileLength) + { + reader.readFully(b); + assertTrue(Arrays.equals(params.expected, b)); + numRead += b.length; + } + + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + } + + if (builder.regions != null) + assertNull(builder.regions.close(null)); + } + } + + @Test + public void testReadBytes() throws IOException + { + File f = File.createTempFile("testReadBytes", "1"); + final String expected = "The quick brown fox jumps over the lazy dog"; + + try(SequentialWriter writer = SequentialWriter.open(f)) + { + writer.write(expected.getBytes()); + writer.finish(); + } + + assert f.exists(); + + try(ChannelProxy channel = new ChannelProxy(f); + RandomAccessReader reader = new RandomAccessReader.Builder(channel).build()) + { + assertEquals(f.getAbsolutePath(), reader.getPath()); + assertEquals(expected.length(), reader.length()); + + ByteBuffer b = ByteBufferUtil.read(reader, expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + } + } + + @Test + public void testReset() throws IOException + { + File f = File.createTempFile("testMark", "1"); + final String expected = "The quick brown fox jumps over the lazy dog"; + final int numIterations = 10; + + try(SequentialWriter writer = SequentialWriter.open(f)) + { + for (int i = 0; i < numIterations; i++) + writer.write(expected.getBytes()); + writer.finish(); + } + + assert f.exists(); + + try(ChannelProxy channel = new ChannelProxy(f); + RandomAccessReader reader = new RandomAccessReader.Builder(channel).build()) + { + assertEquals(expected.length() * numIterations, reader.length()); + + ByteBuffer b = ByteBufferUtil.read(reader, expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + + assertFalse(reader.isEOF()); + assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining()); + + DataPosition mark = reader.mark(); + assertEquals(0, reader.bytesPastMark()); + assertEquals(0, reader.bytesPastMark(mark)); + + for (int i = 0; i < (numIterations - 1); i++) + { + b = ByteBufferUtil.read(reader, expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + } + assertTrue(reader.isEOF()); + assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark()); + assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark(mark)); + + reader.reset(mark); + assertEquals(0, reader.bytesPastMark()); + assertEquals(0, reader.bytesPastMark(mark)); + assertFalse(reader.isEOF()); + for (int i = 0; i < (numIterations - 1); i++) + { + b = ByteBufferUtil.read(reader, expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + } + + reader.reset(); + assertEquals(0, reader.bytesPastMark()); + assertEquals(0, reader.bytesPastMark(mark)); + assertFalse(reader.isEOF()); + for (int i = 0; i < (numIterations - 1); i++) + { + b = ByteBufferUtil.read(reader, expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + } + + assertTrue(reader.isEOF()); + } + } + + @Test + public void testSeekSingleThread() throws IOException, InterruptedException + { + testSeek(1); + } + + @Test + public void testSeekMultipleThreads() throws IOException, InterruptedException + { + testSeek(10); + } + + private static void testSeek(int numThreads) throws IOException, InterruptedException + { + final File f = File.createTempFile("testMark", "1"); + final byte[] expected = new byte[1 << 16]; + + long seed = System.nanoTime(); + //seed = 365238103404423L; + logger.info("Seed {}", seed); + Random r = new Random(seed); + r.nextBytes(expected); + + try(SequentialWriter writer = SequentialWriter.open(f)) + { + writer.write(expected); + writer.finish(); + } + + assert f.exists(); + + try(final ChannelProxy channel = new ChannelProxy(f)) + { + final Runnable worker = () -> + { + try(RandomAccessReader reader = new RandomAccessReader.Builder(channel).build()) + { + assertEquals(expected.length, reader.length()); + + ByteBuffer b = ByteBufferUtil.read(reader, expected.length); + assertTrue(Arrays.equals(expected, b.array())); + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + + reader.seek(0); + b = ByteBufferUtil.read(reader, expected.length); + assertTrue(Arrays.equals(expected, b.array())); + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + + for (int i = 0; i < 10; i++) + { + int pos = r.nextInt(expected.length); + reader.seek(pos); + assertEquals(pos, reader.getPosition()); + + ByteBuffer buf = ByteBuffer.wrap(expected, pos, expected.length - pos) + .order(ByteOrder.BIG_ENDIAN); + + while (reader.bytesRemaining() > 4) + assertEquals(buf.getInt(), reader.readInt()); + } + + reader.close(); + } + catch (Exception ex) + { + ex.printStackTrace(); + fail(ex.getMessage()); + } + }; + + if (numThreads == 1) + { + worker.run(); + } + else + { + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) + executor.submit(worker); + + executor.shutdown(); + executor.awaitTermination(1, TimeUnit.MINUTES); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/service/RMIServerSocketFactoryImplTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/service/RMIServerSocketFactoryImplTest.java index 3459ec3,0000000..393dfe1 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/service/RMIServerSocketFactoryImplTest.java +++ b/test/unit/org/apache/cassandra/service/RMIServerSocketFactoryImplTest.java @@@ -1,24 -1,0 +1,44 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.service; + +import java.io.IOException; +import java.net.ServerSocket; +import java.rmi.server.RMIServerSocketFactory; + +import org.junit.Test; + +import org.apache.cassandra.utils.RMIServerSocketFactoryImpl; + +import static org.junit.Assert.assertTrue; + + +public class RMIServerSocketFactoryImplTest +{ + @Test + public void testReusableAddrSocket() throws IOException + { + RMIServerSocketFactory serverFactory = new RMIServerSocketFactoryImpl(); + ServerSocket socket = serverFactory.createServerSocket(7199); + assertTrue(socket.getReuseAddress()); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java ----------------------------------------------------------------------
