http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java index d30937f,b8f6b9f..e8bf1fd --- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java @@@ -17,38 -17,35 +17,44 @@@ */ package org.apache.cassandra.cql3.validation.entities; - import java.nio.ByteBuffer; - import java.util.*; - - import org.apache.cassandra.db.DeletionTime; - import org.apache.cassandra.utils.Pair; - import org.apache.commons.lang3.StringUtils; +import org.junit.Before; +import org.junit.Test; ++import static org.apache.cassandra.Util.throwAssert; ++import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.assertFalse; ++import static org.junit.Assert.assertNotNull; + import static org.junit.Assert.assertTrue; + import static org.junit.Assert.fail; + + import java.nio.ByteBuffer; + import java.util.HashMap; -import java.util.List; + import java.util.Locale; + import java.util.Map; -import java.util.Set; + import java.util.UUID; ++import java.util.concurrent.Callable; + import java.util.concurrent.CountDownLatch; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLTester; -import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.statements.IndexTarget; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.IndexExpression; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.index.IndexNotAvailableException; -import org.apache.cassandra.db.index.PerRowSecondaryIndex; -import org.apache.cassandra.db.index.SecondaryIndexSearcher; -import org.apache.cassandra.db.index.composites.CompositesSearcher; ++import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.concurrent.OpOrder.Group; ++import org.apache.cassandra.index.IndexNotAvailableException; +import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.StubIndex; ++import org.apache.cassandra.index.internal.CustomCassandraIndex; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; - - import static org.apache.cassandra.Util.throwAssert; - import static org.junit.Assert.assertEquals; - import static org.junit.Assert.assertFalse; - import static org.junit.Assert.assertNotNull; - import static org.junit.Assert.assertTrue; - import static org.junit.Assert.fail; ++import org.apache.cassandra.utils.Pair; + import org.apache.commons.lang3.StringUtils; -import org.junit.Test; public class SecondaryIndexTest extends CQLTester { @@@ -668,16 -575,9 +674,16 @@@ { createTable("CREATE TABLE %s(a int, b frozen<map<int, blob>>, PRIMARY KEY (a))"); createIndex("CREATE INDEX ON %s(full(b))"); - Map<Integer, ByteBuffer> map = new HashMap(); + Map<Integer, ByteBuffer> map = new HashMap<>(); map.put(0, ByteBuffer.allocate(1024 * 65)); failInsert("INSERT INTO %s (a, b) VALUES (0, ?)", map); + failInsert("INSERT INTO %s (a, b) VALUES (0, ?) IF NOT EXISTS", map); + failInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b) VALUES (0, ?);\n" + + "APPLY BATCH", map); + failInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b) VALUES (0, ?) IF NOT EXISTS;\n" + + "APPLY BATCH", map); } public void failInsert(String insertCQL, Object...args) throws Throwable @@@ -755,180 -655,134 +761,234 @@@ } @Test + public void testMultipleIndexesOnOneColumn() throws Throwable + { + String indexClassName = StubIndex.class.getName(); + createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY ((a), b))"); + // uses different options otherwise the two indexes are considered duplicates + createIndex(String.format("CREATE CUSTOM INDEX c_idx_1 ON %%s(c) USING '%s' WITH OPTIONS = {'foo':'a'}", indexClassName)); + createIndex(String.format("CREATE CUSTOM INDEX c_idx_2 ON %%s(c) USING '%s' WITH OPTIONS = {'foo':'b'}", indexClassName)); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + CFMetaData cfm = cfs.metadata; + StubIndex index1 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes() + .get("c_idx_1") + .orElseThrow(throwAssert("index not found"))); + StubIndex index2 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes() + .get("c_idx_2") + .orElseThrow(throwAssert("index not found"))); + Object[] row1a = row(0, 0, 0); + Object[] row1b = row(0, 0, 1); + Object[] row2 = row(2, 2, 2); + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row1a); + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row1b); + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row2); + + assertEquals(2, index1.rowsInserted.size()); + assertColumnValue(0, "c", index1.rowsInserted.get(0), cfm); + assertColumnValue(2, "c", index1.rowsInserted.get(1), cfm); + + assertEquals(2, index2.rowsInserted.size()); + assertColumnValue(0, "c", index2.rowsInserted.get(0), cfm); + assertColumnValue(2, "c", index2.rowsInserted.get(1), cfm); + + assertEquals(1, index1.rowsUpdated.size()); + assertColumnValue(0, "c", index1.rowsUpdated.get(0).left, cfm); + assertColumnValue(1, "c", index1.rowsUpdated.get(0).right, cfm); + + assertEquals(1, index2.rowsUpdated.size()); + assertColumnValue(0, "c", index2.rowsUpdated.get(0).left, cfm); + assertColumnValue(1, "c", index2.rowsUpdated.get(0).right, cfm); + } + + @Test + public void testDeletions() throws Throwable + { + // Test for bugs like CASSANDRA-10694. These may not be readily visible with the built-in secondary index + // implementation because of the stale entry handling. + + String indexClassName = StubIndex.class.getName(); + createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY ((a), b))"); + createIndex(String.format("CREATE CUSTOM INDEX c_idx ON %%s(c) USING '%s'", indexClassName)); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + CFMetaData cfm = cfs.metadata; + StubIndex index1 = (StubIndex) cfs.indexManager.getIndex(cfm.getIndexes() + .get("c_idx") + .orElseThrow(throwAssert("index not found"))); + + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?) USING TIMESTAMP 1", 0, 0, 0); + assertEquals(1, index1.rowsInserted.size()); + + execute("DELETE FROM %s USING TIMESTAMP 2 WHERE a = ? AND b = ?", 0, 0); + assertEquals(1, index1.rowsUpdated.size()); + Pair<Row, Row> update = index1.rowsUpdated.get(0); + Row existingRow = update.left; + Row newRow = update.right; + + // check the existing row from the update call + assertTrue(existingRow.deletion().isLive()); + assertEquals(DeletionTime.LIVE, existingRow.deletion().time()); + assertEquals(1L, existingRow.primaryKeyLivenessInfo().timestamp()); + + // check the new row from the update call + assertFalse(newRow.deletion().isLive()); + assertEquals(2L, newRow.deletion().time().markedForDeleteAt()); + assertFalse(newRow.cells().iterator().hasNext()); + + // delete the same row again + execute("DELETE FROM %s USING TIMESTAMP 3 WHERE a = ? AND b = ?", 0, 0); + assertEquals(2, index1.rowsUpdated.size()); + update = index1.rowsUpdated.get(1); + existingRow = update.left; + newRow = update.right; + + // check the new row from the update call + assertFalse(existingRow.deletion().isLive()); + assertEquals(2L, existingRow.deletion().time().markedForDeleteAt()); + assertFalse(existingRow.cells().iterator().hasNext()); + + // check the new row from the update call + assertFalse(newRow.deletion().isLive()); + assertEquals(3L, newRow.deletion().time().markedForDeleteAt()); + assertFalse(newRow.cells().iterator().hasNext()); + } + + @Test + public void testUpdatesToMemtableData() throws Throwable + { + // verify the contract specified by Index.Indexer::updateRow(oldRowData, newRowData), + // when a row in the memtable is updated, the indexer should be informed of: + // * new columns + // * removed columns + // * columns whose value, timestamp or ttl have been modified. + // Any columns which are unchanged by the update are not passed to the Indexer + // Note that for simplicity this test resets the index between each scenario + createTable("CREATE TABLE %s (k int, c int, v1 int, v2 int, PRIMARY KEY (k,c))"); + createIndex(String.format("CREATE CUSTOM INDEX test_index ON %%s() USING '%s'", StubIndex.class.getName())); + execute("INSERT INTO %s (k, c, v1, v2) VALUES (0, 0, 0, 0) USING TIMESTAMP 0"); + + ColumnDefinition v1 = getCurrentColumnFamilyStore().metadata.getColumnDefinition(new ColumnIdentifier("v1", true)); + ColumnDefinition v2 = getCurrentColumnFamilyStore().metadata.getColumnDefinition(new ColumnIdentifier("v2", true)); + + StubIndex index = (StubIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName("test_index"); + assertEquals(1, index.rowsInserted.size()); + + // Overwrite a single value, leaving the other untouched + execute("UPDATE %s USING TIMESTAMP 1 SET v1=1 WHERE k=0 AND c=0"); + assertEquals(1, index.rowsUpdated.size()); + Row oldRow = index.rowsUpdated.get(0).left; + assertEquals(1, oldRow.size()); + validateCell(oldRow.getCell(v1), v1, ByteBufferUtil.bytes(0), 0); + Row newRow = index.rowsUpdated.get(0).right; + assertEquals(1, newRow.size()); + validateCell(newRow.getCell(v1), v1, ByteBufferUtil.bytes(1), 1); + index.reset(); + + // Overwrite both values + execute("UPDATE %s USING TIMESTAMP 2 SET v1=2, v2=2 WHERE k=0 AND c=0"); + assertEquals(1, index.rowsUpdated.size()); + oldRow = index.rowsUpdated.get(0).left; + assertEquals(2, oldRow.size()); + validateCell(oldRow.getCell(v1), v1, ByteBufferUtil.bytes(1), 1); + validateCell(oldRow.getCell(v2), v2, ByteBufferUtil.bytes(0), 0); + newRow = index.rowsUpdated.get(0).right; + assertEquals(2, newRow.size()); + validateCell(newRow.getCell(v1), v1, ByteBufferUtil.bytes(2), 2); + validateCell(newRow.getCell(v2), v2, ByteBufferUtil.bytes(2), 2); + index.reset(); + + // Delete one value + execute("DELETE v1 FROM %s USING TIMESTAMP 3 WHERE k=0 AND c=0"); + assertEquals(1, index.rowsUpdated.size()); + oldRow = index.rowsUpdated.get(0).left; + assertEquals(1, oldRow.size()); + validateCell(oldRow.getCell(v1), v1, ByteBufferUtil.bytes(2), 2); + newRow = index.rowsUpdated.get(0).right; + assertEquals(1, newRow.size()); + Cell newCell = newRow.getCell(v1); + assertTrue(newCell.isTombstone()); + assertEquals(3, newCell.timestamp()); + index.reset(); + + // Modify the liveness of the primary key, the delta rows should contain + // no cell data as only the pk was altered, but it should illustrate the + // change to the liveness info + execute("INSERT INTO %s(k, c) VALUES (0, 0) USING TIMESTAMP 4"); + assertEquals(1, index.rowsUpdated.size()); + oldRow = index.rowsUpdated.get(0).left; + assertEquals(0, oldRow.size()); + assertEquals(0, oldRow.primaryKeyLivenessInfo().timestamp()); + newRow = index.rowsUpdated.get(0).right; + assertEquals(0, newRow.size()); + assertEquals(4, newRow.primaryKeyLivenessInfo().timestamp()); + } + ++ @Test + public void testIndexQueriesWithIndexNotReady() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, value int, PRIMARY KEY (pk, ck))"); + + for (int i = 0; i < 10; i++) + for (int j = 0; j < 10; j++) + execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", i, j, i + j); + - createIndex("CREATE CUSTOM INDEX testIndex ON %s (value) USING '" + IndexBlockingOnInitialization.class.getName() - + "'"); ++ createIndex("CREATE CUSTOM INDEX testIndex ON %s (value) USING '" + IndexBlockingOnInitialization.class.getName() + "'"); + try + { + execute("SELECT value FROM %s WHERE value = 2"); + fail(); + } + catch (IndexNotAvailableException e) + { + assertTrue(true); + } + finally + { + execute("DROP index " + KEYSPACE + ".testIndex"); + } + } + - /** - * Custom index used to test the behavior of the system when the index is not ready. - * As Custom indices cannot by <code>PerColumnSecondaryIndex</code> we use a <code>PerRowSecondaryIndex</code> - * to avoid the check but return a <code>CompositesSearcher</code>. - */ - public static class IndexBlockingOnInitialization extends PerRowSecondaryIndex + private void validateCell(Cell cell, ColumnDefinition def, ByteBuffer val, long timestamp) { - private volatile CountDownLatch latch = new CountDownLatch(1); - - @Override - public void index(ByteBuffer rowKey, ColumnFamily cf) - { - try - { - latch.await(); - } - catch (InterruptedException e) - { - Thread.interrupted(); - } - } - - @Override - public void delete(DecoratedKey key, Group opGroup) - { - } - - @Override - public void init() - { - } + assertNotNull(cell); + assertEquals(0, def.type.compare(cell.value(), val)); + assertEquals(timestamp, cell.timestamp()); + } - @Override - public void reload() - { - } + private static void assertColumnValue(int expected, String name, Row row, CFMetaData cfm) + { + ColumnDefinition col = cfm.getColumnDefinition(new ColumnIdentifier(name, true)); + AbstractType<?> type = col.type; + assertEquals(expected, type.compose(row.getCell(col).value())); + } + - @Override - public void validateOptions() throws ConfigurationException - { - } ++ /** ++ * <code>CassandraIndex</code> that blocks during the initialization. ++ */ ++ public static class IndexBlockingOnInitialization extends CustomCassandraIndex ++ { ++ private final CountDownLatch latch = new CountDownLatch(1); + - @Override - public String getIndexName() ++ public IndexBlockingOnInitialization(ColumnFamilyStore baseCfs, IndexMetadata indexDef) + { - return "testIndex"; ++ super(baseCfs, indexDef); + } + + @Override - protected SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns) ++ public Callable<?> getInitializationTask() + { - return new CompositesSearcher(baseCfs.indexManager, columns) - { - @Override - public boolean canHandleIndexClause(List<IndexExpression> clause) - { - return true; - } - - @Override - public void validate(IndexExpression indexExpression) throws InvalidRequestException - { - } ++ return () -> { ++ latch.await(); ++ return null; + }; + } + + @Override - public void forceBlockingFlush() - { - } - - @Override - public ColumnFamilyStore getIndexCfs() - { - return baseCfs; - } - - @Override - public void removeIndex(ByteBuffer columnName) ++ public Callable<?> getInvalidateTask() + { + latch.countDown(); - } - - @Override - public void invalidate() - { - } - - @Override - public void truncateBlocking(long truncatedAt) - { - } - - @Override - public boolean indexes(CellName name) - { - return false; - } - - @Override - public long estimateResultRows() - { - return 0; ++ return super.getInvalidateTask(); + } + } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java index 0957f74,0000000..3bce683 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java +++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java @@@ -1,642 -1,0 +1,642 @@@ +package org.apache.cassandra.index.internal; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.index.IndexRegistry; +import org.apache.cassandra.index.SecondaryIndexBuilder; +import org.apache.cassandra.index.transactions.IndexTransaction; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.Refs; + +import static org.apache.cassandra.index.internal.CassandraIndex.getFunctions; +import static org.apache.cassandra.index.internal.CassandraIndex.indexCfsMetadata; +import static org.apache.cassandra.index.internal.CassandraIndex.parseTarget; + +/** + * Clone of KeysIndex used in CassandraIndexTest#testCustomIndexWithCFS to verify + * behaviour of flushing CFS backed CUSTOM indexes + */ +public class CustomCassandraIndex implements Index +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class); + + public final ColumnFamilyStore baseCfs; + protected IndexMetadata metadata; + protected ColumnFamilyStore indexCfs; + protected ColumnDefinition indexedColumn; + protected CassandraIndexFunctions functions; + + public CustomCassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) + { + this.baseCfs = baseCfs; + setMetadata(indexDef); + } + + /** + * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value] + * @param indexedColumn + * @param operator + * @return + */ + protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator) + { + return operator.equals(Operator.EQ); + } + + public ColumnDefinition getIndexedColumn() + { + return indexedColumn; + } + + public ClusteringComparator getIndexComparator() + { + return indexCfs.metadata.comparator; + } + + public ColumnFamilyStore getIndexCfs() + { + return indexCfs; + } + + public void register(IndexRegistry registry) + { + registry.registerIndex(this); + } + + public Callable<?> getInitializationTask() + { + // if we're just linking in the index on an already-built index post-restart - // we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder - return isBuilt() ? null : getBuildIndexTask(); ++ // or if the table is empty we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder ++ return isBuilt() || baseCfs.isEmpty() ? null : getBuildIndexTask(); + } + + public IndexMetadata getIndexMetadata() + { + return metadata; + } + + public Optional<ColumnFamilyStore> getBackingTable() + { + return indexCfs == null ? Optional.empty() : Optional.of(indexCfs); + } + + public Callable<Void> getBlockingFlushTask() + { + return () -> { + indexCfs.forceBlockingFlush(); + return null; + }; + } + + public Callable<?> getInvalidateTask() + { + return () -> { + invalidate(); + return null; + }; + } + + public Callable<?> getMetadataReloadTask(IndexMetadata indexDef) + { + setMetadata(indexDef); + return () -> { + indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata); + indexCfs.reload(); + return null; + }; + } + + private void setMetadata(IndexMetadata indexDef) + { + metadata = indexDef; + Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef); + functions = getFunctions(indexDef, target); + CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef); + indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace, + cfm.cfName, + cfm, + baseCfs.getTracker().loadsstables); + indexedColumn = target.left; + } + + public Callable<?> getTruncateTask(final long truncatedAt) + { + return () -> { + indexCfs.discardSSTables(truncatedAt); + return null; + }; + } + + public boolean shouldBuildBlocking() + { + return true; + } + + public boolean indexes(PartitionColumns columns) + { + // if we have indexes on the partition key or clustering columns, return true + return isPrimaryKeyIndex() || columns.contains(indexedColumn); + } + + public boolean dependsOn(ColumnDefinition column) + { + return column.equals(indexedColumn); + } + + public boolean supportsExpression(ColumnDefinition column, Operator operator) + { + return indexedColumn.name.equals(column.name) + && supportsOperator(indexedColumn, operator); + } + + public AbstractType<?> customExpressionValueType() + { + return null; + } + + private boolean supportsExpression(RowFilter.Expression expression) + { + return supportsExpression(expression.column(), expression.operator()); + } + + public long getEstimatedResultRows() + { + return indexCfs.getMeanColumns(); + } + + /** + * No post processing of query results, just return them unchanged + */ + public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command) + { + return (partitionIterator, readCommand) -> partitionIterator; + } + + public RowFilter getPostIndexQueryFilter(RowFilter filter) + { + return getTargetExpression(filter.getExpressions()).map(filter::without) + .orElse(filter); + } + + private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions) + { + return expressions.stream().filter(this::supportsExpression).findFirst(); + } + + public Index.Searcher searcherFor(ReadCommand command) + { + return null; + } + + public void validate(PartitionUpdate update) throws InvalidRequestException + { + switch (indexedColumn.kind) + { + case PARTITION_KEY: + validatePartitionKey(update.partitionKey()); + break; + case CLUSTERING: + validateClusterings(update); + break; + case REGULAR: + validateRows(update); + break; + case STATIC: + validateRows(Collections.singleton(update.staticRow())); + break; + } + } + + protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey, + ClusteringPrefix prefix, + CellPath path) + { + CBuilder builder = CBuilder.create(getIndexComparator()); + builder.add(partitionKey); + return builder; + } + + protected ByteBuffer getIndexedValue(ByteBuffer partitionKey, + Clustering clustering, + CellPath path, ByteBuffer cellValue) + { + return cellValue; + } + + public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry) + { + throw new UnsupportedOperationException("KEYS indexes do not use a specialized index entry format"); + } + + public boolean isStale(Row row, ByteBuffer indexValue, int nowInSec) + { + if (row == null) + return true; + + Cell cell = row.getCell(indexedColumn); + + return (cell == null + || !cell.isLive(nowInSec) + || indexedColumn.type.compare(indexValue, cell.value()) != 0); + } + + public Indexer indexerFor(final DecoratedKey key, + final int nowInSec, + final OpOrder.Group opGroup, + final IndexTransaction.Type transactionType) + { + return new Indexer() + { + public void begin() + { + } + + public void partitionDelete(DeletionTime deletionTime) + { + } + + public void rangeTombstone(RangeTombstone tombstone) + { + } + + public void insertRow(Row row) + { + if (isPrimaryKeyIndex()) + { + indexPrimaryKey(row.clustering(), + getPrimaryKeyIndexLiveness(row), + row.deletion()); + } + else + { + if (indexedColumn.isComplex()) + indexCells(row.clustering(), row.getComplexColumnData(indexedColumn)); + else + indexCell(row.clustering(), row.getCell(indexedColumn)); + } + } + + public void removeRow(Row row) + { + if (isPrimaryKeyIndex()) + indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion()); + + if (indexedColumn.isComplex()) + removeCells(row.clustering(), row.getComplexColumnData(indexedColumn)); + else + removeCell(row.clustering(), row.getCell(indexedColumn)); + } + + + public void updateRow(Row oldRow, Row newRow) + { + if (isPrimaryKeyIndex()) + indexPrimaryKey(newRow.clustering(), + newRow.primaryKeyLivenessInfo(), + newRow.deletion()); + + if (indexedColumn.isComplex()) + { + indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn)); + removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn)); + } + else + { + indexCell(newRow.clustering(), newRow.getCell(indexedColumn)); + removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn)); + } + } + + public void finish() + { + } + + private void indexCells(Clustering clustering, Iterable<Cell> cells) + { + if (cells == null) + return; + + for (Cell cell : cells) + indexCell(clustering, cell); + } + + private void indexCell(Clustering clustering, Cell cell) + { + if (cell == null || !cell.isLive(nowInSec)) + return; + + insert(key.getKey(), + clustering, + cell, + LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), + opGroup); + } + + private void removeCells(Clustering clustering, Iterable<Cell> cells) + { + if (cells == null) + return; + + for (Cell cell : cells) + removeCell(clustering, cell); + } + + private void removeCell(Clustering clustering, Cell cell) + { + if (cell == null || !cell.isLive(nowInSec)) + return; + + delete(key.getKey(), clustering, cell, opGroup, nowInSec); + } + + private void indexPrimaryKey(final Clustering clustering, + final LivenessInfo liveness, + final Row.Deletion deletion) + { + if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP) + insert(key.getKey(), clustering, null, liveness, opGroup); + + if (!deletion.isLive()) + delete(key.getKey(), clustering, deletion.time(), opGroup); + } + + private LivenessInfo getPrimaryKeyIndexLiveness(Row row) + { + long timestamp = row.primaryKeyLivenessInfo().timestamp(); + int ttl = row.primaryKeyLivenessInfo().ttl(); + for (Cell cell : row.cells()) + { + long cellTimestamp = cell.timestamp(); + if (cell.isLive(nowInSec)) + { + if (cellTimestamp > timestamp) + { + timestamp = cellTimestamp; + ttl = cell.ttl(); + } + } + } + return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec); + } + }; + } + + /** + * Specific to internal indexes, this is called by a + * searcher when it encounters a stale entry in the index + * @param indexKey the partition key in the index table + * @param indexClustering the clustering in the index table + * @param deletion deletion timestamp etc + * @param opGroup the operation under which to perform the deletion + */ + public void deleteStaleEntry(DecoratedKey indexKey, + Clustering indexClustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + doDelete(indexKey, indexClustering, deletion, opGroup); + logger.debug("Removed index entry for stale value {}", indexKey); + } + + /** + * Called when adding a new entry to the index + */ + private void insert(ByteBuffer rowKey, + Clustering clustering, + Cell cell, + LivenessInfo info, + OpOrder.Group opGroup) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + cell)); + Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info); + PartitionUpdate upd = partitionUpdate(valueKey, row); + indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + logger.debug("Inserted entry into index for value {}", valueKey); + } + + /** + * Called when deleting entries on non-primary key columns + */ + private void delete(ByteBuffer rowKey, + Clustering clustering, + Cell cell, + OpOrder.Group opGroup, + int nowInSec) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + cell)); + doDelete(valueKey, + buildIndexClustering(rowKey, clustering, cell), + new DeletionTime(cell.timestamp(), nowInSec), + opGroup); + } + + /** + * Called when deleting entries from indexes on primary key columns + */ + private void delete(ByteBuffer rowKey, + Clustering clustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + null)); + doDelete(valueKey, + buildIndexClustering(rowKey, clustering, null), + deletion, + opGroup); + } + + private void doDelete(DecoratedKey indexKey, + Clustering indexClustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion)); + PartitionUpdate upd = partitionUpdate(indexKey, row); + indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + logger.debug("Removed index entry for value {}", indexKey); + } + + private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException + { + assert indexedColumn.isPartitionKey(); + validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null )); + } + + private void validateClusterings(PartitionUpdate update) throws InvalidRequestException + { + assert indexedColumn.isClusteringColumn(); + for (Row row : update) + validateIndexedValue(getIndexedValue(null, row.clustering(), null)); + } + + private void validateRows(Iterable<Row> rows) + { + assert !indexedColumn.isPrimaryKeyColumn(); + for (Row row : rows) + { + if (indexedColumn.isComplex()) + { + ComplexColumnData data = row.getComplexColumnData(indexedColumn); + if (data != null) + { + for (Cell cell : data) + { + validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value())); + } + } + } + else + { + validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn))); + } + } + } + + private void validateIndexedValue(ByteBuffer value) + { + if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT) + throw new InvalidRequestException(String.format( + "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)", + value.remaining(), + metadata.name, + baseCfs.metadata.ksName, + baseCfs.metadata.cfName, + indexedColumn.name.toString(), + FBUtilities.MAX_UNSIGNED_SHORT)); + } + + private ByteBuffer getIndexedValue(ByteBuffer rowKey, + Clustering clustering, + Cell cell) + { + return getIndexedValue(rowKey, + clustering, + cell == null ? null : cell.path(), + cell == null ? null : cell.value() + ); + } + + private Clustering buildIndexClustering(ByteBuffer rowKey, + Clustering clustering, + Cell cell) + { + return buildIndexClusteringPrefix(rowKey, + clustering, + cell == null ? null : cell.path()).build(); + } + + private DecoratedKey getIndexKeyFor(ByteBuffer value) + { + return indexCfs.decorateKey(value); + } + + private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row) + { + return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row); + } + + private void invalidate() + { + // interrupt in-progress compactions + Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs); + CompactionManager.instance.interruptCompactionForCFs(cfss, true); + CompactionManager.instance.waitForCessation(cfss); + indexCfs.keyspace.writeOrder.awaitNewBarrier(); + indexCfs.forceBlockingFlush(); + indexCfs.readOrdering.awaitNewBarrier(); + indexCfs.invalidate(); + } + + private boolean isBuilt() + { + return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), metadata.name); + } + + private boolean isPrimaryKeyIndex() + { + return indexedColumn.isPrimaryKeyColumn(); + } + + private Callable<?> getBuildIndexTask() + { + return () -> { + buildBlocking(); + return null; + }; + } + + private void buildBlocking() + { + baseCfs.forceBlockingFlush(); + + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + Refs<SSTableReader> sstables = viewFragment.refs) + { + if (sstables.isEmpty()) + { + logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built", + baseCfs.metadata.ksName, + baseCfs.metadata.cfName, + metadata.name); + baseCfs.indexManager.markIndexBuilt(metadata.name); + return; + } + + logger.info("Submitting index build of {} for data in {}", + metadata.name, + getSSTableNames(sstables)); + + SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, + Collections.singleton(this), + new ReducingKeyIterator(sstables)); + Future<?> future = CompactionManager.instance.submitIndexBuild(builder); + FBUtilities.waitOnFuture(future); + indexCfs.forceBlockingFlush(); + baseCfs.indexManager.markIndexBuilt(metadata.name); + } + logger.info("Index build of {} complete", metadata.name); + } + + private static String getSSTableNames(Collection<SSTableReader> sstables) + { + return StreamSupport.stream(sstables.spliterator(), false) + .map(SSTableReader::toString) + .collect(Collectors.joining(", ")); + } +}
