http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/UnfilteredRows.java index 98640ae,0000000..f000fcf mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java +++ b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java @@@ -1,40 -1,0 +1,60 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> implements UnfilteredRowIterator +{ + private DeletionTime partitionLevelDeletion; + + public UnfilteredRows(UnfilteredRowIterator input) + { + super(input); + partitionLevelDeletion = input.partitionLevelDeletion(); + } + + @Override + void add(Transformation add) + { + super.add(add); + partitionLevelDeletion = add.applyToDeletion(partitionLevelDeletion); + } + + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public EncodingStats stats() + { + return input.stats(); + } + + @Override + public boolean isEmpty() + { + return staticRow().isEmpty() && partitionLevelDeletion().isLive() && !hasNext(); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/Index.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/Index.java index ab6665d,0000000..469ef07 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@@ -1,452 -1,0 +1,472 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.index; + +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.function.BiFunction; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.transactions.IndexTransaction; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.utils.concurrent.OpOrder; + +/** + * Consisting of a top level Index interface and two sub-interfaces which handle read and write operations, + * Searcher and Indexer respectively, this defines a secondary index implementation. + * Instantiation is done via reflection and implementations must provide a constructor which takes the base + * table's ColumnFamilyStore and the IndexMetadata which defines the Index as arguments. e.g: + * {@code MyCustomIndex( ColumnFamilyStore baseCfs, IndexMetadata indexDef )} + * + * The main interface defines methods for index management, index selection at both write and query time, + * as well as validation of values that will ultimately be indexed. + * Two sub-interfaces are also defined, which represent single use helpers for short lived tasks at read and write time. + * Indexer: an event listener which receives notifications at particular points during an update of a single partition + * in the base table. + * Searcher: performs queries against the index based on a predicate defined in a RowFilter. An instance + * is expected to be single use, being involved in the execution of a single ReadCommand. + * + * The main interface includes factory methods for obtaining instances of both of the sub-interfaces; + * + * The methods defined in the top level interface can be grouped into 3 categories: + * + * Management Tasks: + * This group of methods is primarily concerned with maintenance of secondary indexes are are mainly called from + * SecondaryIndexManager. It includes methods for registering and un-registering an index, performing maintenance + * tasks such as (re)building an index from SSTable data, flushing, invalidating and so forth, as well as some to + * retrieve general metadata about the index (index name, any internal tables used for persistence etc). + * Several of these maintenance functions have a return type of Callable<?>; the expectation for these methods is + * that any work required to be performed by the method be done inside the Callable so that the responsibility for + * scheduling its execution can rest with SecondaryIndexManager. For instance, a task like reloading index metadata + * following potential updates caused by modifications to the base table may be performed in a blocking way. In + * contrast, adding a new index may require it to be built from existing SSTable data, a potentially expensive task + * which should be performed asyncronously. + * + * Index Selection: + * There are two facets to index selection, write time and read time selection. The former is concerned with + * identifying whether an index should be informed about a particular write operation. The latter is about providing + * means to use the index for search during query execution. + * + * Validation: + * Values that may be written to an index are checked as part of input validation, prior to an update or insert + * operation being accepted. + * + * + * Sub-interfaces: + * + * Update processing: + * Indexes are subscribed to the stream of events generated by modifications to the base table. Subscription is + * done via first registering the Index with the base table's SecondaryIndexManager. For each partition update, the set + * of registered indexes are then filtered based on the properties of the update using the selection methods on the main + * interface described above. Each of the indexes in the filtered set then provides an event listener to receive + * notifications about the update as it is processed. As such then, a event handler instance is scoped to a single + * partition update; SecondaryIndexManager obtains a new handler for every update it processes (via a call to the + * factory method, indexerFor. That handler will then receive all events for the update, before being + * discarded by the SecondaryIndexManager. Indexer instances are never re-used by SecondaryIndexManager and the + * expectation is that each call to indexerFor should return a unique instance, or at least if instances can + * be recycled, that a given instance is only used to process a single partition update at a time. + * + * Search: + * Each query (i.e. a single ReadCommand) that uses indexes will use a single instance of Index.Searcher. As with + * processing of updates, an Index must be registered with the primary table's SecondaryIndexManager to be able to + * support queries. During the processing of a ReadCommand, the Expressions in its RowFilter are examined to determine + * whether any of them are supported by a registered Index. supportsExpression is used to filter out Indexes which + * cannot support a given Expression. After filtering, the set of candidate indexes are ranked according to the result + * of getEstimatedResultRows and the most selective (i.e. the one expected to return the smallest number of results) is + * chosen. A Searcher instance is then obtained from the searcherFor method & used to perform the actual Index lookup. + * Finally, Indexes can define a post processing step to be performed on the coordinator, after results (partitions from + * the primary table) have been received from replicas and reconciled. This post processing is defined as a + * java.util.functions.BiFunction<PartitionIterator, RowFilter, PartitionIterator>, that is a function which takes as + * arguments a PartitionIterator (containing the reconciled result rows) and a RowFilter (from the ReadCommand being + * executed) and returns another iterator of partitions, possibly having transformed the initial results in some way. + * The post processing function is obtained from the Index's postProcessorFor method; the built-in indexes which ship + * with Cassandra return a no-op function here. + * + * An optional static method may be provided to validate custom index options (two variants are supported): + * + * <pre>{@code public static Map<String, String> validateOptions(Map<String, String> options);</pre> + * + * The input is the map of index options supplied in the WITH clause of a CREATE INDEX statement. + * + * <pre>{@code public static Map<String, String> validateOptions(Map<String, String> options, CFMetaData cfm);}</pre> + * + * In this version, the base table's metadata is also supplied as an argument. + * If both overloaded methods are provided, only the one including the base table's metadata will be invoked. + * + * The validation method should return a map containing any of the supplied options which are not valid for the + * implementation. If the returned map is not empty, validation is considered failed and an error is raised. + * Alternatively, the implementation may choose to throw an org.apache.cassandra.exceptions.ConfigurationException + * if invalid options are encountered. + * + */ +public interface Index +{ + + /* + * Management functions + */ + + /** + * Return a task to perform any initialization work when a new index instance is created. + * This may involve costly operations such as (re)building the index, and is performed asynchronously + * by SecondaryIndexManager + * @return a task to perform any necessary initialization work + */ + public Callable<?> getInitializationTask(); + + /** + * Returns the IndexMetadata which configures and defines the index instance. This should be the same + * object passed as the argument to setIndexMetadata. + * @return the index's metadata + */ + public IndexMetadata getIndexMetadata(); + + /** + * Return a task to reload the internal metadata of an index. + * Called when the base table metadata is modified or when the configuration of the Index is updated + * Implementations should return a task which performs any necessary work to be done due to + * updating the configuration(s) such as (re)building etc. This task is performed asynchronously + * by SecondaryIndexManager + * @return task to be executed by the index manager during a reload + */ + public Callable<?> getMetadataReloadTask(IndexMetadata indexMetadata); + + /** + * An index must be registered in order to be able to either subscribe to update events on the base + * table and/or to provide Searcher functionality for reads. The double dispatch involved here, where + * the Index actually performs its own registration by calling back to the supplied IndexRegistry's + * own registerIndex method, is to make the decision as to whether or not to register an index belong + * to the implementation, not the manager. + * @param registry the index registry to register the instance with + */ + public void register(IndexRegistry registry); + + /** + * If the index implementation uses a local table to store its index data this method should return a + * handle to it. If not, an empty Optional should be returned. Typically, this is useful for the built-in + * Index implementations. + * @return an Optional referencing the Index's backing storage table if it has one, or Optional.empty() if not. + */ + public Optional<ColumnFamilyStore> getBackingTable(); + + /** + * Return a task which performs a blocking flush of the index's data to persistent storage. + * @return task to be executed by the index manager to perform the flush. + */ + public Callable<?> getBlockingFlushTask(); + + /** + * Return a task which invalidates the index, indicating it should no longer be considered usable. + * This should include an clean up and releasing of resources required when dropping an index. + * @return task to be executed by the index manager to invalidate the index. + */ + public Callable<?> getInvalidateTask(); + + /** + * Return a task to truncate the index with the specified truncation timestamp. + * Called when the base table is truncated. + * @param truncatedAt timestamp of the truncation operation. This will be the same timestamp used + * in the truncation of the base table. + * @return task to be executed by the index manager when the base table is truncated. + */ + public Callable<?> getTruncateTask(long truncatedAt); + + /** + * Return true if this index can be built or rebuilt when the index manager determines it is necessary. Returning + * false enables the index implementation (or some other component) to control if and when SSTable data is + * incorporated into the index. + * + * This is called by SecondaryIndexManager in buildIndexBlocking, buildAllIndexesBlocking & rebuildIndexesBlocking + * where a return value of false causes the index to be exluded from the set of those which will process the + * SSTable data. + * @return if the index should be included in the set which processes SSTable data, false otherwise. + */ + public boolean shouldBuildBlocking(); + + + /* + * Index selection + */ + + /** + * Called to determine whether this index targets a specific column. + * Used during schema operations such as when dropping or renaming a column, to check if + * the index will be affected by the change. Typically, if an index answers that it does + * depend upon a column, then schema operations on that column are not permitted until the index + * is dropped or altered. + * + * @param column the column definition to check + * @return true if the index depends on the supplied column being present; false if the column may be + * safely dropped or modified without adversely affecting the index + */ + public boolean dependsOn(ColumnDefinition column); + + /** + * Called to determine whether this index can provide a searcher to execute a query on the + * supplied column using the specified operator. This forms part of the query validation done + * before a CQL select statement is executed. + * @param column the target column of a search query predicate + * @param operator the operator of a search query predicate + * @return true if this index is capable of supporting such expressions, false otherwise + */ + public boolean supportsExpression(ColumnDefinition column, Operator operator); + + /** + * If the index supports custom search expressions using the + * {@code}SELECT * FROM table WHERE expr(index_name, expression){@code} syntax, this + * method should return the expected type of the expression argument. + * For example, if the index supports custom expressions as Strings, calls to this + * method should return {@code}UTF8Type.instance{@code}. + * If the index implementation does not support custom expressions, then it should + * return null. + * @return an the type of custom index expressions supported by this index, or an + * null if custom expressions are not supported. + */ + public AbstractType<?> customExpressionValueType(); + + /** + * Transform an initial RowFilter into the filter that will still need to applied + * to a set of Rows after the index has performed it's initial scan. + * Used in ReadCommand#executeLocal to reduce the amount of filtering performed on the + * results of the index query. + * + * @param filter the intial filter belonging to a ReadCommand + * @return the (hopefully) reduced filter that would still need to be applied after + * the index was used to narrow the initial result set + */ + public RowFilter getPostIndexQueryFilter(RowFilter filter); + + /** + * Return an estimate of the number of results this index is expected to return for any given + * query that it can be used to answer. Used in conjunction with indexes() and supportsExpression() + * to determine the most selective index for a given ReadCommand. Additionally, this is also used + * by StorageProxy.estimateResultsPerRange to calculate the initial concurrency factor for range requests + * + * @return the estimated average number of results a Searcher may return for any given query + */ + public long getEstimatedResultRows(); + + /* + * Input validation + */ + + /** + * Called at write time to ensure that values present in the update + * are valid according to the rules of all registered indexes which + * will process it. The partition key as well as the clustering and + * cell values for each row in the update may be checked by index + * implementations + * @param update PartitionUpdate containing the values to be validated by registered Index implementations + * @throws InvalidRequestException + */ + public void validate(PartitionUpdate update) throws InvalidRequestException; + + /* + * Update processing + */ + + /** + * Creates an new {@code Indexer} object for updates to a given partition. + * + * @param key key of the partition being modified + * @param columns the regular and static columns the created indexer will have to deal with. + * This can be empty as an update might only contain partition, range and row deletions, but + * the indexer is guaranteed to not get any cells for a column that is not part of {@code columns}. + * @param nowInSec current time of the update operation + * @param opGroup operation group spanning the update operation + * @param transactionType indicates what kind of update is being performed on the base data + * i.e. a write time insert/update/delete or the result of compaction + * @return the newly created indexer or {@code null} if the index is not interested by the update + * (this could be because the index doesn't care about that particular partition, doesn't care about + * that type of transaction, ...). + */ + public Indexer indexerFor(DecoratedKey key, + PartitionColumns columns, + int nowInSec, + OpOrder.Group opGroup, + IndexTransaction.Type transactionType); + + /** + * Listener for processing events emitted during a single partition update. + * Instances of this are responsible for applying modifications to the index in response to a single update + * operation on a particular partition of the base table. + * + * That update may be generated by the normal write path, by iterating SSTables during streaming operations or when + * building or rebuilding an index from source. Updates also occur during compaction when multiple versions of a + * source partition from different SSTables are merged. + * + * Implementations should not make assumptions about resolution or filtering of the partition update being + * processed. That is to say that it is possible for an Indexer instance to receive notification of a + * PartitionDelete or RangeTombstones which shadow a Row it then receives via insertRow/updateRow. + * + * It is important to note that the only ordering guarantee made for the methods here is that the first call will + * be to begin() and the last call to finish(). The other methods may be called to process update events in any + * order. This can also include duplicate calls, in cases where a memtable partition is under contention from + * several updates. In that scenario, the same set of events may be delivered to the Indexer as memtable update + * which failed due to contention is re-applied. + */ + public interface Indexer + { + /** + * Notification of the start of a partition update. + * This event always occurs before any other during the update. + */ + public void begin(); + + /** + * Notification of a top level partition delete. + * @param deletionTime + */ + public void partitionDelete(DeletionTime deletionTime); + + /** + * Notification of a RangeTombstone. + * An update of a single partition may contain multiple RangeTombstones, + * and a notification will be passed for each of them. + * @param tombstone + */ + public void rangeTombstone(RangeTombstone tombstone); + + /** + * Notification that a new row was inserted into the Memtable holding the partition. + * This only implies that the inserted row was not already present in the Memtable, + * it *does not* guarantee that the row does not exist in an SSTable, potentially with + * additional column data. + * + * @param row the Row being inserted into the base table's Memtable. + */ + public void insertRow(Row row); + + /** + * Notification of a modification to a row in the base table's Memtable. + * This is allow an Index implementation to clean up entries for base data which is + * never flushed to disk (and so will not be purged during compaction). + * It's important to note that the old & new rows supplied here may not represent + * the totality of the data for the Row with this particular Clustering. There may be + * additional column data in SSTables which is not present in either the old or new row, + * so implementations should be aware of that. + * The supplied rows contain only column data which has actually been updated. + * oldRowData contains only the columns which have been removed from the Row's + * representation in the Memtable, while newRowData includes only new columns + * which were not previously present. Any column data which is unchanged by + * the update is not included. + * + * @param oldRowData data that was present in existing row and which has been removed from + * the base table's Memtable + * @param newRowData data that was not present in the existing row and is being inserted + * into the base table's Memtable + */ + public void updateRow(Row oldRowData, Row newRowData); + + /** + * Notification that a row was removed from the partition. + * Note that this is only called as part of either a compaction or a cleanup. + * This context is indicated by the TransactionType supplied to the indexerFor method. + * + * As with updateRow, it cannot be guaranteed that all data belonging to the Clustering + * of the supplied Row has been removed (although in the case of a cleanup, that is the + * ultimate intention). + * There may be data for the same row in other SSTables, so in this case Indexer implementations + * should *not* assume that all traces of the row have been removed. In particular, + * it is not safe to assert that all values associated with the Row's Clustering + * have been deleted, so implementations which index primary key columns should not + * purge those entries from their indexes. + * + * @param row data being removed from the base table + */ + public void removeRow(Row row); + + /** + * Notification of the end of the partition update. + * This event always occurs after all others for the particular update. + */ + public void finish(); + } + + /* + * Querying + */ + + /** + * Used to validate the various parameters of a supplied {@code}ReadCommand{@code}, + * this is called prior to execution. In theory, any command instance may be checked + * by any {@code}Index{@code} instance, but in practice the index will be the one + * returned by a call to the {@code}getIndex(ColumnFamilyStore cfs){@code} method on + * the supplied command. + * + * Custom index implementations should perform any validation of query expressions here and throw a meaningful + * InvalidRequestException when any expression or other parameter is invalid. + * + * @param command a ReadCommand whose parameters are to be verified + * @throws InvalidRequestException if the details of the command fail to meet the + * index's validation rules + */ + default void validate(ReadCommand command) throws InvalidRequestException + { + } + + /** + * Return a function which performs post processing on the results of a partition range read command. + * In future, this may be used as a generalized mechanism for transforming results on the coordinator prior + * to returning them to the caller. + * + * This is used on the coordinator during execution of a range command to perform post + * processing of merged results obtained from the necessary replicas. This is the only way in which results are + * transformed in this way but this may change over time as usage is generalized. + * See CASSANDRA-8717 for further discussion. + * + * The function takes a PartitionIterator of the results from the replicas which has already been collated + * & reconciled, along with the command being executed. It returns another PartitionIterator containing the results + * of the transformation (which may be the same as the input if the transformation is a no-op). + */ + public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command); + + /** + * Factory method for query time search helper. + * + * @param command the read command being executed + * @return an Searcher with which to perform the supplied command + */ + public Searcher searcherFor(ReadCommand command); + + /** + * Performs the actual index lookup during execution of a ReadCommand. + * An instance performs its query according to the RowFilter.Expression it was created for (see searcherFor) + * An Expression is a predicate of the form [column] [operator] [value]. + */ + public interface Searcher + { + /** + * @param orderGroup the collection of OpOrder.Groups which the ReadCommand is being performed under. + * @return partitions from the base table matching the criteria of the search. + */ + public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/IndexRegistry.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/IndexRegistry.java index 6a004fb,0000000..9f5ed02 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/IndexRegistry.java +++ b/src/java/org/apache/cassandra/index/IndexRegistry.java @@@ -1,22 -1,0 +1,42 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.index; + +import java.util.Collection; + +import org.apache.cassandra.schema.IndexMetadata; + +/** + * The collection of all Index instances for a base table. + * The SecondaryIndexManager for a ColumnFamilyStore contains an IndexRegistry + * (actually it implements this interface at present) and Index implementations + * register in order to: + * i) subscribe to the stream of updates being applied to partitions in the base table + * ii) provide searchers to support queries with the relevant search predicates + */ +public interface IndexRegistry +{ + void registerIndex(Index index); + void unregisterIndex(Index index); + + Index getIndex(IndexMetadata indexMetadata); + Collection<Index> listIndexes(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndex.java index 4bbf682,0000000..9d997a7 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@@ -1,862 -1,0 +1,882 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.index.internal; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.function.BiFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.marshal.EmptyType; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.index.IndexRegistry; +import org.apache.cassandra.index.SecondaryIndexBuilder; +import org.apache.cassandra.index.internal.composites.CompositesSearcher; +import org.apache.cassandra.index.internal.keys.KeysSearcher; +import org.apache.cassandra.index.transactions.IndexTransaction; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.Refs; + +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; + +/** + * Index implementation which indexes the values for a single column in the base + * table and which stores its index data in a local, hidden table. + */ +public abstract class CassandraIndex implements Index +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class); + + public static final Pattern TARGET_REGEX = Pattern.compile("^(keys|entries|values|full)\\((.+)\\)$"); + + public final ColumnFamilyStore baseCfs; + protected IndexMetadata metadata; + protected ColumnFamilyStore indexCfs; + protected ColumnDefinition indexedColumn; + protected CassandraIndexFunctions functions; + + protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) + { + this.baseCfs = baseCfs; + setMetadata(indexDef); + } + + /** + * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value] + * @param indexedColumn + * @param operator + * @return + */ + protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator) + { + return operator == Operator.EQ; + } + + /** + * Used to construct an the clustering for an entry in the index table based on values from the base data. + * The clustering columns in the index table encode the values required to retrieve the correct data from the base + * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details + * Used whenever a row in the index table is written or deleted. + * @param partitionKey from the base data being indexed + * @param prefix from the base data being indexed + * @param path from the base data being indexed + * @return a clustering prefix to be used to insert into the index table + */ + protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey, + ClusteringPrefix prefix, + CellPath path); + + /** + * Used at search time to convert a row in the index table into a simple struct containing the values required + * to retrieve the corresponding row from the base table. + * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed) + * @param indexEntry a row from the index table + * @return + */ + public abstract IndexEntry decodeEntry(DecoratedKey indexedValue, + Row indexEntry); + + /** + * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table. + * Used at read time to identify out of date index entries so that they can be excluded from search results and + * repaired + * @param row the current row from the primary data table + * @param indexValue the value we retrieved from the index + * @param nowInSec + * @return true if the index is out of date and the entry should be dropped + */ + public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec); + + /** + * Extract the value to be inserted into the index from the components of the base data + * @param partitionKey from the primary data + * @param clustering from the primary data + * @param path from the primary data + * @param cellValue from the primary data + * @return a ByteBuffer containing the value to be inserted in the index. This will be used to make the partition + * key in the index table + */ + protected abstract ByteBuffer getIndexedValue(ByteBuffer partitionKey, + Clustering clustering, + CellPath path, + ByteBuffer cellValue); + + public ColumnDefinition getIndexedColumn() + { + return indexedColumn; + } + + public ClusteringComparator getIndexComparator() + { + return indexCfs.metadata.comparator; + } + + public ColumnFamilyStore getIndexCfs() + { + return indexCfs; + } + + public void register(IndexRegistry registry) + { + registry.registerIndex(this); + } + + public Callable<?> getInitializationTask() + { + // if we're just linking in the index on an already-built index post-restart or if the base + // table is empty we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder + return isBuilt() || baseCfs.isEmpty() ? null : getBuildIndexTask(); + } + + public IndexMetadata getIndexMetadata() + { + return metadata; + } + + public Optional<ColumnFamilyStore> getBackingTable() + { + return indexCfs == null ? Optional.empty() : Optional.of(indexCfs); + } + + public Callable<Void> getBlockingFlushTask() + { + return () -> { + indexCfs.forceBlockingFlush(); + return null; + }; + } + + public Callable<?> getInvalidateTask() + { + return () -> { + invalidate(); + return null; + }; + } + + public Callable<?> getMetadataReloadTask(IndexMetadata indexDef) + { + return () -> { + indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata); + indexCfs.reload(); + return null; + }; + } + + @Override + public void validate(ReadCommand command) throws InvalidRequestException + { + Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions()); + + if (target.isPresent()) + { + ByteBuffer indexValue = target.get().getIndexValue(); + checkFalse(indexValue.remaining() > FBUtilities.MAX_UNSIGNED_SHORT, + "Index expression values may not be larger than 64K"); + } + } + + private void setMetadata(IndexMetadata indexDef) + { + metadata = indexDef; + Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef); + functions = getFunctions(indexDef, target); + CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef); + indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace, + cfm.cfName, + cfm, + baseCfs.getTracker().loadsstables); + indexedColumn = target.left; + } + + public Callable<?> getTruncateTask(final long truncatedAt) + { + return () -> { + indexCfs.discardSSTables(truncatedAt); + return null; + }; + } + + public boolean shouldBuildBlocking() + { + // built-in indexes are always included in builds initiated from SecondaryIndexManager + return true; + } + + public boolean dependsOn(ColumnDefinition column) + { + return indexedColumn.name.equals(column.name); + } + + public boolean supportsExpression(ColumnDefinition column, Operator operator) + { + return indexedColumn.name.equals(column.name) + && supportsOperator(indexedColumn, operator); + } + + private boolean supportsExpression(RowFilter.Expression expression) + { + return supportsExpression(expression.column(), expression.operator()); + } + + public AbstractType<?> customExpressionValueType() + { + return null; + } + + public long getEstimatedResultRows() + { + return indexCfs.getMeanColumns(); + } + + /** + * No post processing of query results, just return them unchanged + */ + public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command) + { + return (partitionIterator, readCommand) -> partitionIterator; + } + + public RowFilter getPostIndexQueryFilter(RowFilter filter) + { + return getTargetExpression(filter.getExpressions()).map(filter::without) + .orElse(filter); + } + + private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions) + { + return expressions.stream().filter(this::supportsExpression).findFirst(); + } + + public Index.Searcher searcherFor(ReadCommand command) + { + Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions()); + + if (target.isPresent()) + { + target.get().validateForIndexing(); + switch (getIndexMetadata().kind) + { + case COMPOSITES: + return new CompositesSearcher(command, target.get(), this); + case KEYS: + return new KeysSearcher(command, target.get(), this); + default: + throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s", + metadata.kind, + metadata.name, + indexedColumn.name.toString())); + } + } + + return null; + + } + + public void validate(PartitionUpdate update) throws InvalidRequestException + { + switch (indexedColumn.kind) + { + case PARTITION_KEY: + validatePartitionKey(update.partitionKey()); + break; + case CLUSTERING: + validateClusterings(update); + break; + case REGULAR: + if (update.columns().regulars.contains(indexedColumn)) + validateRows(update); + break; + case STATIC: + if (update.columns().statics.contains(indexedColumn)) + validateRows(Collections.singleton(update.staticRow())); + break; + } + } + + public Indexer indexerFor(final DecoratedKey key, + final PartitionColumns columns, + final int nowInSec, + final OpOrder.Group opGroup, + final IndexTransaction.Type transactionType) + { + /** + * Indexes on regular and static columns (the non primary-key ones) only care about updates with live + * data for the column they index. In particular, they don't care about having just row or range deletions + * as they don't know how to update the index table unless they know exactly the value that is deleted. + * + * Note that in practice this means that those indexes are only purged of stale entries on compaction, + * when we resolve both the deletion and the prior data it deletes. Of course, such stale entries are also + * filtered on read. + */ + if (!isPrimaryKeyIndex() && !columns.contains(indexedColumn)) + return null; + + return new Indexer() + { + public void begin() + { + } + + public void partitionDelete(DeletionTime deletionTime) + { + } + + public void rangeTombstone(RangeTombstone tombstone) + { + } + + public void insertRow(Row row) + { + if (row.isStatic() != indexedColumn.isStatic()) + return; + + if (isPrimaryKeyIndex()) + { + indexPrimaryKey(row.clustering(), + getPrimaryKeyIndexLiveness(row), + row.deletion()); + } + else + { + if (indexedColumn.isComplex()) + indexCells(row.clustering(), row.getComplexColumnData(indexedColumn)); + else + indexCell(row.clustering(), row.getCell(indexedColumn)); + } + } + + public void removeRow(Row row) + { + if (isPrimaryKeyIndex()) + return; + + if (indexedColumn.isComplex()) + removeCells(row.clustering(), row.getComplexColumnData(indexedColumn)); + else + removeCell(row.clustering(), row.getCell(indexedColumn)); + } + + public void updateRow(Row oldRow, Row newRow) + { + assert oldRow.isStatic() == newRow.isStatic(); + if (newRow.isStatic() != indexedColumn.isStatic()) + return; + + if (isPrimaryKeyIndex()) + indexPrimaryKey(newRow.clustering(), + newRow.primaryKeyLivenessInfo(), + newRow.deletion()); + + if (indexedColumn.isComplex()) + { + indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn)); + removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn)); + } + else + { + indexCell(newRow.clustering(), newRow.getCell(indexedColumn)); + removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn)); + } + } + + public void finish() + { + } + + private void indexCells(Clustering clustering, Iterable<Cell> cells) + { + if (cells == null) + return; + + for (Cell cell : cells) + indexCell(clustering, cell); + } + + private void indexCell(Clustering clustering, Cell cell) + { + if (cell == null || !cell.isLive(nowInSec)) + return; + + insert(key.getKey(), + clustering, + cell, + LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), + opGroup); + } + + private void removeCells(Clustering clustering, Iterable<Cell> cells) + { + if (cells == null) + return; + + for (Cell cell : cells) + removeCell(clustering, cell); + } + + private void removeCell(Clustering clustering, Cell cell) + { + if (cell == null || !cell.isLive(nowInSec)) + return; + + delete(key.getKey(), clustering, cell, opGroup, nowInSec); + } + + private void indexPrimaryKey(final Clustering clustering, + final LivenessInfo liveness, + final Row.Deletion deletion) + { + if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP) + insert(key.getKey(), clustering, null, liveness, opGroup); + + if (!deletion.isLive()) + delete(key.getKey(), clustering, deletion.time(), opGroup); + } + + private LivenessInfo getPrimaryKeyIndexLiveness(Row row) + { + long timestamp = row.primaryKeyLivenessInfo().timestamp(); + int ttl = row.primaryKeyLivenessInfo().ttl(); + for (Cell cell : row.cells()) + { + long cellTimestamp = cell.timestamp(); + if (cell.isLive(nowInSec)) + { + if (cellTimestamp > timestamp) + { + timestamp = cellTimestamp; + ttl = cell.ttl(); + } + } + } + return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec); + } + }; + } + + /** + * Specific to internal indexes, this is called by a + * searcher when it encounters a stale entry in the index + * @param indexKey the partition key in the index table + * @param indexClustering the clustering in the index table + * @param deletion deletion timestamp etc + * @param opGroup the operation under which to perform the deletion + */ + public void deleteStaleEntry(DecoratedKey indexKey, + Clustering indexClustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + doDelete(indexKey, indexClustering, deletion, opGroup); + logger.trace("Removed index entry for stale value {}", indexKey); + } + + /** + * Called when adding a new entry to the index + */ + private void insert(ByteBuffer rowKey, + Clustering clustering, + Cell cell, + LivenessInfo info, + OpOrder.Group opGroup) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + cell)); + Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info); + PartitionUpdate upd = partitionUpdate(valueKey, row); + indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + logger.trace("Inserted entry into index for value {}", valueKey); + } + + /** + * Called when deleting entries on non-primary key columns + */ + private void delete(ByteBuffer rowKey, + Clustering clustering, + Cell cell, + OpOrder.Group opGroup, + int nowInSec) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + cell)); + doDelete(valueKey, + buildIndexClustering(rowKey, clustering, cell), + new DeletionTime(cell.timestamp(), nowInSec), + opGroup); + } + + /** + * Called when deleting entries from indexes on primary key columns + */ + private void delete(ByteBuffer rowKey, + Clustering clustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + null)); + doDelete(valueKey, + buildIndexClustering(rowKey, clustering, null), + deletion, + opGroup); + } + + private void doDelete(DecoratedKey indexKey, + Clustering indexClustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion)); + PartitionUpdate upd = partitionUpdate(indexKey, row); + indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + logger.trace("Removed index entry for value {}", indexKey); + } + + private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException + { + assert indexedColumn.isPartitionKey(); + validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null)); + } + + private void validateClusterings(PartitionUpdate update) throws InvalidRequestException + { + assert indexedColumn.isClusteringColumn(); + for (Row row : update) + validateIndexedValue(getIndexedValue(null, row.clustering(), null)); + } + + private void validateRows(Iterable<Row> rows) + { + assert !indexedColumn.isPrimaryKeyColumn(); + for (Row row : rows) + { + if (indexedColumn.isComplex()) + { + ComplexColumnData data = row.getComplexColumnData(indexedColumn); + if (data != null) + { + for (Cell cell : data) + { + validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value())); + } + } + } + else + { + validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn))); + } + } + } + + private void validateIndexedValue(ByteBuffer value) + { + if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT) + throw new InvalidRequestException(String.format( + "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)", + value.remaining(), + metadata.name, + baseCfs.metadata.ksName, + baseCfs.metadata.cfName, + indexedColumn.name.toString(), + FBUtilities.MAX_UNSIGNED_SHORT)); + } + + private ByteBuffer getIndexedValue(ByteBuffer rowKey, + Clustering clustering, + Cell cell) + { + return getIndexedValue(rowKey, + clustering, + cell == null ? null : cell.path(), + cell == null ? null : cell.value() + ); + } + + private Clustering buildIndexClustering(ByteBuffer rowKey, + Clustering clustering, + Cell cell) + { + return buildIndexClusteringPrefix(rowKey, + clustering, + cell == null ? null : cell.path()).build(); + } + + private DecoratedKey getIndexKeyFor(ByteBuffer value) + { + return indexCfs.decorateKey(value); + } + + private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row) + { + return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row); + } + + private void invalidate() + { + // interrupt in-progress compactions + Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs); + CompactionManager.instance.interruptCompactionForCFs(cfss, true); + CompactionManager.instance.waitForCessation(cfss); + Keyspace.writeOrder.awaitNewBarrier(); + indexCfs.forceBlockingFlush(); + indexCfs.readOrdering.awaitNewBarrier(); + indexCfs.invalidate(); + } + + private boolean isBuilt() + { + return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), metadata.name); + } + + private boolean isPrimaryKeyIndex() + { + return indexedColumn.isPrimaryKeyColumn(); + } + + private Callable<?> getBuildIndexTask() + { + return () -> { + buildBlocking(); + return null; + }; + } + + private void buildBlocking() + { + baseCfs.forceBlockingFlush(); + + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + Refs<SSTableReader> sstables = viewFragment.refs) + { + if (sstables.isEmpty()) + { + logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built", + baseCfs.metadata.ksName, + baseCfs.metadata.cfName, + metadata.name); + baseCfs.indexManager.markIndexBuilt(metadata.name); + return; + } + + logger.info("Submitting index build of {} for data in {}", + metadata.name, + getSSTableNames(sstables)); + + SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, + Collections.singleton(this), + new ReducingKeyIterator(sstables)); + Future<?> future = CompactionManager.instance.submitIndexBuild(builder); + FBUtilities.waitOnFuture(future); + indexCfs.forceBlockingFlush(); + baseCfs.indexManager.markIndexBuilt(metadata.name); + } + logger.info("Index build of {} complete", metadata.name); + } + + private static String getSSTableNames(Collection<SSTableReader> sstables) + { + return StreamSupport.stream(sstables.spliterator(), false) + .map(SSTableReader::toString) + .collect(Collectors.joining(", ")); + } + + /** + * Construct the CFMetadata for an index table, the clustering columns in the index table + * vary dependent on the kind of the indexed value. + * @param baseCfsMetadata + * @param indexMetadata + * @return + */ + public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata) + { + Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfsMetadata, indexMetadata); + CassandraIndexFunctions utils = getFunctions(indexMetadata, target); + ColumnDefinition indexedColumn = target.left; + AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn); + + // Tables for legacy KEYS indexes are non-compound and dense + CFMetaData.Builder builder = indexMetadata.isKeys() + ? CFMetaData.Builder.create(baseCfsMetadata.ksName, + baseCfsMetadata.indexColumnFamilyName(indexMetadata), + true, false, false) + : CFMetaData.Builder.create(baseCfsMetadata.ksName, + baseCfsMetadata.indexColumnFamilyName(indexMetadata)); + + builder = builder.withId(baseCfsMetadata.cfId) + .withPartitioner(new LocalPartitioner(indexedValueType)) + .addPartitionKey(indexedColumn.name, indexedColumn.type) + .addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering()); + + if (indexMetadata.isKeys()) + { + // A dense, compact table for KEYS indexes must have a compact + // value column defined, even though it is never used + CompactTables.DefaultNames names = + CompactTables.defaultNameGenerator(ImmutableSet.of(indexedColumn.name.toString(), "partition_key")); + builder = builder.addRegularColumn(names.defaultCompactValueName(), EmptyType.instance); + } + else + { + // The clustering columns for a table backing a COMPOSITES index are dependent + // on the specific type of index (there are specializations for indexes on collections) + builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn); + } + + return builder.build().reloadIndexMetadataProperties(baseCfsMetadata); + } + + /** + * Factory method for new CassandraIndex instances + * @param baseCfs + * @param indexMetadata + * @return + */ + public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata) + { + return getFunctions(indexMetadata, parseTarget(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata); + } + + // Public because it's also used to convert index metadata into a thrift-compatible format + public static Pair<ColumnDefinition, IndexTarget.Type> parseTarget(CFMetaData cfm, + IndexMetadata indexDef) + { + String target = indexDef.options.get("target"); + assert target != null : String.format("No target definition found for index %s", indexDef.name); + + // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc + // if not, then it must be a simple column name and implictly its type is VALUES + Matcher matcher = TARGET_REGEX.matcher(target); + String columnName; + IndexTarget.Type targetType; + if (matcher.matches()) + { + targetType = IndexTarget.Type.fromString(matcher.group(1)); + columnName = matcher.group(2); + } + else + { + columnName = target; + targetType = IndexTarget.Type.VALUES; + } + + // in the case of a quoted column name the name in the target string + // will be enclosed in quotes, which we need to unwrap. It may also + // include quote characters internally, escaped like so: + // abc"def -> abc""def. + // Because the target string is stored in a CQL compatible form, we + // need to un-escape any such quotes to get the actual column name + if (columnName.startsWith("\"")) + { + columnName = StringUtils.substring(StringUtils.substring(columnName, 1), 0, -1); + columnName = columnName.replaceAll("\"\"", "\""); + } + + // if it's not a CQL table, we can't assume that the column name is utf8, so + // in that case we have to do a linear scan of the cfm's columns to get the matching one + if (cfm.isCQLTable()) + return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType); + else + for (ColumnDefinition column : cfm.allColumns()) + if (column.name.toString().equals(columnName)) + return Pair.create(column, targetType); + + throw new RuntimeException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target)); + } + + static CassandraIndexFunctions getFunctions(IndexMetadata indexDef, + Pair<ColumnDefinition, IndexTarget.Type> target) + { + if (indexDef.isKeys()) + return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS; + + ColumnDefinition indexedColumn = target.left; + if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell()) + { + switch (((CollectionType)indexedColumn.type).kind) + { + case LIST: + return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS; + case SET: + return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS; + case MAP: + switch (target.right) + { + case KEYS: + return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS; + case KEYS_AND_VALUES: + return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS; + case VALUES: + return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS; + } + throw new AssertionError(); + } + } + + switch (indexedColumn.kind) + { + case CLUSTERING: + return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS; + case REGULAR: + return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS; + case PARTITION_KEY: + return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS; + //case COMPACT_VALUE: + // return new CompositesIndexOnCompactValue(); + } + throw new AssertionError(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java index 72d2528,0000000..d6b39e6 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java @@@ -1,172 -1,0 +1,192 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.index.internal; + +import java.nio.ByteBuffer; +import java.util.NavigableSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.utils.btree.BTreeSet; + +public abstract class CassandraIndexSearcher implements Index.Searcher +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraIndexSearcher.class); + + private final RowFilter.Expression expression; + protected final CassandraIndex index; + protected final ReadCommand command; + + public CassandraIndexSearcher(ReadCommand command, + RowFilter.Expression expression, + CassandraIndex index) + { + this.command = command; + this.expression = expression; + this.index = index; + } + + @SuppressWarnings("resource") // Both the OpOrder and 'indexIter' are closed on exception, or through the closing of the result + // of this method. + public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup) + { + // the value of the index expression is the partition key in the index table + DecoratedKey indexKey = index.getBackingTable().get().decorateKey(expression.getIndexValue()); + UnfilteredRowIterator indexIter = queryIndex(indexKey, command, orderGroup); + try + { + return queryDataFromIndex(indexKey, UnfilteredRowIterators.filter(indexIter, command.nowInSec()), command, orderGroup); + } + catch (RuntimeException | Error e) + { + indexIter.close(); + throw e; + } + } + + private UnfilteredRowIterator queryIndex(DecoratedKey indexKey, ReadCommand command, ReadOrderGroup orderGroup) + { + ClusteringIndexFilter filter = makeIndexFilter(command); + ColumnFamilyStore indexCfs = index.getBackingTable().get(); + CFMetaData indexCfm = indexCfs.metadata; + return SinglePartitionReadCommand.create(indexCfm, command.nowInSec(), indexKey, ColumnFilter.all(indexCfm), filter) + .queryMemtableAndDisk(indexCfs, orderGroup.indexReadOpOrderGroup()); + } + + private ClusteringIndexFilter makeIndexFilter(ReadCommand command) + { + if (command instanceof SinglePartitionReadCommand) + { + // Note: as yet there's no route to get here - a 2i query *always* uses a + // PartitionRangeReadCommand. This is here in preparation for coming changes + // in SelectStatement. + SinglePartitionReadCommand sprc = (SinglePartitionReadCommand)command; + ByteBuffer pk = sprc.partitionKey().getKey(); + ClusteringIndexFilter filter = sprc.clusteringIndexFilter(); + + if (filter instanceof ClusteringIndexNamesFilter) + { + NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows(); + BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.getIndexComparator()); + for (Clustering c : requested) + clusterings.add(makeIndexClustering(pk, c)); + return new ClusteringIndexNamesFilter(clusterings.build(), filter.isReversed()); + } + else + { + Slices requested = ((ClusteringIndexSliceFilter)filter).requestedSlices(); + Slices.Builder builder = new Slices.Builder(index.getIndexComparator()); + for (Slice slice : requested) + builder.add(makeIndexBound(pk, slice.start()), makeIndexBound(pk, slice.end())); + return new ClusteringIndexSliceFilter(builder.build(), filter.isReversed()); + } + } + else + { + + DataRange dataRange = ((PartitionRangeReadCommand)command).dataRange(); + AbstractBounds<PartitionPosition> range = dataRange.keyRange(); + + Slice slice = Slice.ALL; + + /* + * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of + * the indexed row unfortunately (which will be inefficient), because we have no way to intuit the smallest possible + * key having a given token. A potential fix would be to actually store the token along the key in the indexed row. + */ + if (range.left instanceof DecoratedKey) + { + // the right hand side of the range may not be a DecoratedKey (for instance if we're paging), + // but if it is, we can optimise slightly by restricting the slice + if (range.right instanceof DecoratedKey) + { + + DecoratedKey startKey = (DecoratedKey) range.left; + DecoratedKey endKey = (DecoratedKey) range.right; + + Slice.Bound start = Slice.Bound.BOTTOM; + Slice.Bound end = Slice.Bound.TOP; + + /* + * For index queries over a range, we can't do a whole lot better than querying everything for the key range, though for + * slice queries where we can slightly restrict the beginning and end. + */ + if (!dataRange.isNamesQuery()) + { + ClusteringIndexSliceFilter startSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter( + startKey)); + ClusteringIndexSliceFilter endSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter( + endKey)); + + // We can't effectively support reversed queries when we have a range, so we don't support it + // (or through post-query reordering) and shouldn't get there. + assert !startSliceFilter.isReversed() && !endSliceFilter.isReversed(); + + Slices startSlices = startSliceFilter.requestedSlices(); + Slices endSlices = endSliceFilter.requestedSlices(); + + if (startSlices.size() > 0) + start = startSlices.get(0).start(); + + if (endSlices.size() > 0) + end = endSlices.get(endSlices.size() - 1).end(); + } + + slice = Slice.make(makeIndexBound(startKey.getKey(), start), + makeIndexBound(endKey.getKey(), end)); + } + else + { + // otherwise, just start the index slice from the key we do have + slice = Slice.make(makeIndexBound(((DecoratedKey)range.left).getKey(), Slice.Bound.BOTTOM), + Slice.Bound.TOP); + } + } + return new ClusteringIndexSliceFilter(Slices.with(index.getIndexComparator(), slice), false); + } + } + + private Slice.Bound makeIndexBound(ByteBuffer rowKey, Slice.Bound bound) + { + return index.buildIndexClusteringPrefix(rowKey, bound, null) + .buildBound(bound.isStart(), bound.isInclusive()); + } + + protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering) + { + return index.buildIndexClusteringPrefix(rowKey, clustering, null).build(); + } + + protected abstract UnfilteredPartitionIterator queryDataFromIndex(DecoratedKey indexKey, + RowIterator indexHits, + ReadCommand command, + ReadOrderGroup orderGroup); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/IndexEntry.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/internal/IndexEntry.java index 6f94ace,0000000..97525d6 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/internal/IndexEntry.java +++ b/src/java/org/apache/cassandra/index/internal/IndexEntry.java @@@ -1,34 -1,0 +1,54 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.index.internal; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DecoratedKey; + +/** + * Entries in indexes on non-compact tables (tables with composite comparators) + * can be encapsulated as IndexedEntry instances. These are not used when dealing + * with indexes on static/compact/thrift tables (i.e. KEYS indexes). + */ +public final class IndexEntry +{ + public final DecoratedKey indexValue; + public final Clustering indexClustering; + public final long timestamp; + + public final ByteBuffer indexedKey; + public final Clustering indexedEntryClustering; + + public IndexEntry(DecoratedKey indexValue, + Clustering indexClustering, + long timestamp, + ByteBuffer indexedKey, + Clustering indexedEntryClustering) + { + this.indexValue = indexValue; + this.indexClustering = indexClustering; + this.timestamp = timestamp; + this.indexedKey = indexedKey; + this.indexedEntryClustering = indexedEntryClustering; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java index 53ecd01,0000000..d680253 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java +++ b/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java @@@ -1,62 -1,0 +1,82 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.index.internal.keys; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.index.internal.CassandraIndex; +import org.apache.cassandra.index.internal.IndexEntry; +import org.apache.cassandra.schema.IndexMetadata; + +public class KeysIndex extends CassandraIndex +{ + public KeysIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) + { + super(baseCfs, indexDef); + } + + public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder, + CFMetaData baseMetadata, + ColumnDefinition cfDef) + { + // no additional clustering columns required + return builder; + } + + protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey, + ClusteringPrefix prefix, + CellPath path) + { + CBuilder builder = CBuilder.create(getIndexComparator()); + builder.add(partitionKey); + return builder; + } + + protected ByteBuffer getIndexedValue(ByteBuffer partitionKey, + Clustering clustering, + CellPath path, ByteBuffer cellValue) + { + return cellValue; + } + + public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry) + { + throw new UnsupportedOperationException("KEYS indexes do not use a specialized index entry format"); + } + + public boolean isStale(Row row, ByteBuffer indexValue, int nowInSec) + { + if (row == null) + return true; + + Cell cell = row.getCell(indexedColumn); + + return (cell == null + || !cell.isLive(nowInSec) + || indexedColumn.type.compare(indexValue, cell.value()) != 0); + } +}
