http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java index ff685cf..b4d7853 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.cql3.statements; -import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.Iterators; @@ -27,7 +26,8 @@ import org.apache.cassandra.cql3.restrictions.Restriction; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.utils.Pair; @@ -46,44 +46,57 @@ public class DeleteStatement extends ModificationStatement return false; } - public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, Composite prefix, UpdateParameters params) + public void addUpdateForKey(PartitionUpdate update, CBuilder cbuilder, UpdateParameters params) throws InvalidRequestException { - List<Operation> deletions = getOperations(); + List<Operation> regularDeletions = getRegularOperations(); + List<Operation> staticDeletions = getStaticOperations(); - if (prefix.size() < cfm.clusteringColumns().size() && !deletions.isEmpty()) + if (regularDeletions.isEmpty() && staticDeletions.isEmpty()) { - // In general, we can't delete specific columns if not all clustering columns have been specified. - // However, if we delete only static colums, it's fine since we won't really use the prefix anyway. - for (Operation deletion : deletions) - if (!deletion.column.isStatic()) - throw new InvalidRequestException(String.format("Primary key column '%s' must be specified in order to delete column '%s'", getFirstEmptyKey().name, deletion.column.name)); - } - - if (deletions.isEmpty()) - { - // We delete the slice selected by the prefix. - // However, for performance reasons, we distinguish 2 cases: - // - It's a full internal row delete - // - It's a full cell name (i.e it's a dense layout and the prefix is full) - if (prefix.isEmpty()) + // We're not deleting any specific columns so it's either a full partition deletion .... + if (cbuilder.count() == 0) { - // No columns specified, delete the row - cf.delete(new DeletionInfo(params.timestamp, params.localDeletionTime)); + update.addPartitionDeletion(params.deletionTime()); } - else if (cfm.comparator.isDense() && prefix.size() == cfm.clusteringColumns().size()) + // ... or a row deletion ... + else if (cbuilder.remainingCount() == 0) { - cf.addAtom(params.makeTombstone(cfm.comparator.create(prefix, null))); + Clustering clustering = cbuilder.build(); + Row.Writer writer = update.writer(); + params.writeClustering(clustering, writer); + params.writeRowDeletion(writer); + writer.endOfRow(); } + // ... or a range of rows deletion. else { - cf.addAtom(params.makeRangeTombstone(prefix.slice())); + update.addRangeTombstone(params.makeRangeTombstone(cbuilder)); } } else { - for (Operation op : deletions) - op.execute(key, cf, prefix, params); + if (!regularDeletions.isEmpty()) + { + // We can't delete specific (regular) columns if not all clustering columns have been specified. + if (cbuilder.remainingCount() > 0) + throw new InvalidRequestException(String.format("Primary key column '%s' must be specified in order to delete column '%s'", getFirstEmptyKey().name, regularDeletions.get(0).column.name)); + + Clustering clustering = cbuilder.build(); + Row.Writer writer = update.writer(); + params.writeClustering(clustering, writer); + for (Operation op : regularDeletions) + op.execute(update.partitionKey(), clustering, writer, params); + writer.endOfRow(); + } + + if (!staticDeletions.isEmpty()) + { + Row.Writer writer = update.staticWriter(); + for (Operation op : staticDeletions) + op.execute(update.partitionKey(), Clustering.STATIC_CLUSTERING, writer, params); + writer.endOfRow(); + } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java index ed10d00..8ad4f6c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java @@ -119,12 +119,6 @@ public class DropTypeStatement extends SchemaAlteringStatement if (isUsedBy(subtype)) return true; } - else if (toCheck instanceof ColumnToCollectionType) - { - for (CollectionType collection : ((ColumnToCollectionType)toCheck).defined.values()) - if (isUsedBy(collection)) - return true; - } else if (toCheck instanceof CollectionType) { if (toCheck instanceof ListType) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 888cdb7..6a6d186 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -21,7 +21,8 @@ import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.config.CFMetaData; @@ -33,19 +34,20 @@ import org.apache.cassandra.cql3.restrictions.Restriction; import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction; import org.apache.cassandra.cql3.selection.Selection; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.composites.CompositesBuilder; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.db.filter.SliceQueryFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.marshal.BooleanType; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.thrift.ThriftValidation; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.triggers.TriggerExecutor; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; @@ -58,6 +60,8 @@ import static org.apache.cassandra.cql3.statements.RequestValidations.invalidReq */ public abstract class ModificationStatement implements CQLStatement { + protected static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class); + private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]", false); public static enum StatementType { INSERT, UPDATE, DELETE } @@ -68,7 +72,15 @@ public abstract class ModificationStatement implements CQLStatement public final Attributes attrs; protected final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<>(); - private final List<Operation> columnOperations = new ArrayList<Operation>(); + private final List<Operation> regularOperations = new ArrayList<>(); + private final List<Operation> staticOperations = new ArrayList<>(); + + // TODO: If we had a builder for this statement, we could have updatedColumns/conditionColumns final and only have + // updatedColumnsBuilder/conditionColumnsBuilder in the builder ... + private PartitionColumns updatedColumns; + private PartitionColumns.Builder updatedColumnsBuilder = PartitionColumns.builder(); + private PartitionColumns conditionColumns; + private PartitionColumns.Builder conditionColumnsBuilder = PartitionColumns.builder(); // Separating normal and static conditions makes things somewhat easier private List<ColumnCondition> columnConditions; @@ -103,25 +115,24 @@ public abstract class ModificationStatement implements CQLStatement Iterable<Function> functions = attrs.getFunctions(); for (Restriction restriction : processedKeys.values()) - functions = Iterables.concat(functions, restriction.getFunctions()); - - if (columnOperations != null) - for (Operation operation : columnOperations) - functions = Iterables.concat(functions, operation.getFunctions()); + functions = Iterables.concat(functions, restriction.getFunctions()); - if (columnConditions != null) - for (ColumnCondition condition : columnConditions) - functions = Iterables.concat(functions, condition.getFunctions()); + for (Operation operation : allOperations()) + functions = Iterables.concat(functions, operation.getFunctions()); - if (staticConditions != null) - for (ColumnCondition condition : staticConditions) - functions = Iterables.concat(functions, condition.getFunctions()); + for (ColumnCondition condition : allConditions()) + functions = Iterables.concat(functions, condition.getFunctions()); return functions; } + public boolean hasNoClusteringColumns() + { + return hasNoClusteringColumns; + } + public abstract boolean requireFullClusteringKey(); - public abstract void addUpdateForKey(ColumnFamily updates, ByteBuffer key, Composite prefix, UpdateParameters params) throws InvalidRequestException; + public abstract void addUpdateForKey(PartitionUpdate update, CBuilder clusteringBuilder, UpdateParameters params) throws InvalidRequestException; public int getBoundTerms() { @@ -184,16 +195,75 @@ public abstract class ModificationStatement implements CQLStatement public void addOperation(Operation op) { + updatedColumnsBuilder.add(op.column); + // If the operation requires a read-before-write and we're doing a conditional read, we want to read + // the affected column as part of the read-for-conditions paxos phase (see #7499). + if (op.requiresRead()) + conditionColumnsBuilder.add(op.column); + if (op.column.isStatic()) + { setsStaticColumns = true; + staticOperations.add(op); + } else + { setsRegularColumns = true; - columnOperations.add(op); + regularOperations.add(op); + } + } + + public PartitionColumns updatedColumns() + { + return updatedColumns; + } + + public PartitionColumns conditionColumns() + { + return conditionColumns; } - public List<Operation> getOperations() + public boolean updatesRegularRows() { - return columnOperations; + // We're updating regular rows if all the clustering columns are provided. + // Note that the only case where we're allowed not to provide clustering + // columns is if we set some static columns, and in that case no clustering + // columns should be given. So in practice, it's enough to check if we have + // either the table has no clustering or if it has at least one of them set. + return cfm.clusteringColumns().isEmpty() || !hasNoClusteringColumns; + } + + public boolean updatesStaticRow() + { + return !staticOperations.isEmpty(); + } + + private void finishPreparation() + { + updatedColumns = updatedColumnsBuilder.build(); + // Compact tables have not row marker. So if we don't actually update any particular column, + // this means that we're only updating the PK, which we allow if only those were declared in + // the definition. In that case however, we do went to write the compactValueColumn (since again + // we can't use a "row marker") so add it automatically. + if (cfm.isCompactTable() && updatedColumns.isEmpty() && updatesRegularRows()) + updatedColumns = cfm.partitionColumns(); + + conditionColumns = conditionColumnsBuilder.build(); + } + + public List<Operation> getRegularOperations() + { + return regularOperations; + } + + public List<Operation> getStaticOperations() + { + return staticOperations; + } + + public Iterable<Operation> allOperations() + { + return Iterables.concat(staticOperations, regularOperations); } public Iterable<ColumnDefinition> getColumnsWithConditions() @@ -205,8 +275,19 @@ public abstract class ModificationStatement implements CQLStatement staticConditions == null ? Collections.<ColumnDefinition>emptyList() : Iterables.transform(staticConditions, getColumnForCondition)); } + public Iterable<ColumnCondition> allConditions() + { + if (staticConditions == null) + return columnConditions == null ? Collections.<ColumnCondition>emptySet(): columnConditions; + if (columnConditions == null) + return staticConditions; + return Iterables.concat(staticConditions, columnConditions); + } + public void addCondition(ColumnCondition cond) { + conditionColumnsBuilder.add(cond.column); + List<ColumnCondition> conds = null; if (cond.column.isStatic()) { @@ -255,7 +336,7 @@ public abstract class ModificationStatement implements CQLStatement public void addKeyValue(ColumnDefinition def, Term value) throws InvalidRequestException { - addKeyValues(def, new SingleColumnRestriction.EQ(def, value)); + addKeyValues(def, new SingleColumnRestriction.EQRestriction(def, value)); } public void processWhereClause(List<Relation> whereClause, VariableSpecifications names) throws InvalidRequestException @@ -303,26 +384,25 @@ public abstract class ModificationStatement implements CQLStatement public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options) throws InvalidRequestException { - CompositesBuilder keyBuilder = new CompositesBuilder(cfm.getKeyValidatorAsCType()); + MultiCBuilder keyBuilder = MultiCBuilder.create(cfm.getKeyValidatorAsClusteringComparator()); for (ColumnDefinition def : cfm.partitionKeyColumns()) { Restriction r = checkNotNull(processedKeys.get(def.name), "Missing mandatory PRIMARY KEY part %s", def.name); r.appendTo(keyBuilder, options); } - return Lists.transform(keyBuilder.build(), new com.google.common.base.Function<Composite, ByteBuffer>() + NavigableSet<Clustering> clusterings = keyBuilder.build(); + List<ByteBuffer> keys = new ArrayList<ByteBuffer>(clusterings.size()); + for (Clustering clustering : clusterings) { - @Override - public ByteBuffer apply(Composite composite) - { - ByteBuffer byteBuffer = composite.toByteBuffer(); - ThriftValidation.validateKey(cfm, byteBuffer); - return byteBuffer; - } - }); + ByteBuffer key = CFMetaData.serializePartitionKey(clustering); + ThriftValidation.validateKey(cfm, key); + keys.add(key); + } + return keys; } - public Composite createClusteringPrefix(QueryOptions options) + public CBuilder createClustering(QueryOptions options) throws InvalidRequestException { // If the only updated/deleted columns are static, then we don't need clustering columns. @@ -339,7 +419,7 @@ public abstract class ModificationStatement implements CQLStatement { // If we set no non-static columns, then it's fine not to have clustering columns if (hasNoClusteringColumns) - return cfm.comparator.staticPrefix(); + return CBuilder.STATIC_BUILDER; // If we do have clustering columns however, then either it's an INSERT and the query is valid // but we still need to build a proper prefix, or it's not an INSERT, and then we want to reject @@ -354,13 +434,15 @@ public abstract class ModificationStatement implements CQLStatement } } - return createClusteringPrefixBuilderInternal(options); + return createClusteringInternal(options); } - private Composite createClusteringPrefixBuilderInternal(QueryOptions options) + private CBuilder createClusteringInternal(QueryOptions options) throws InvalidRequestException { - CompositesBuilder builder = new CompositesBuilder(cfm.comparator); + CBuilder builder = CBuilder.create(cfm.comparator); + MultiCBuilder multiBuilder = MultiCBuilder.wrap(builder); + ColumnDefinition firstEmptyKey = null; for (ColumnDefinition def : cfm.clusteringColumns()) { @@ -368,7 +450,7 @@ public abstract class ModificationStatement implements CQLStatement if (r == null) { firstEmptyKey = def; - checkFalse(requireFullClusteringKey() && !cfm.comparator.isDense() && cfm.comparator.isCompound(), + checkFalse(requireFullClusteringKey() && !cfm.isDense() && cfm.isCompound(), "Missing mandatory PRIMARY KEY part %s", def.name); } else if (firstEmptyKey != null) @@ -377,10 +459,10 @@ public abstract class ModificationStatement implements CQLStatement } else { - r.appendTo(builder, options); + r.appendTo(multiBuilder, options); } } - return builder.build().get(0); // We only allow IN for row keys so far + return builder; } protected ColumnDefinition getFirstEmptyKey() @@ -396,14 +478,14 @@ public abstract class ModificationStatement implements CQLStatement public boolean requiresRead() { // Lists SET operation incurs a read. - for (Operation op : columnOperations) + for (Operation op : allOperations()) if (op.requiresRead()) return true; return false; } - protected Map<ByteBuffer, CQL3Row> readRequiredRows(Collection<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl) + protected Map<DecoratedKey, Partition> readRequiredLists(Collection<ByteBuffer> partitionKeys, CBuilder cbuilder, boolean local, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException { if (!requiresRead()) @@ -418,32 +500,54 @@ public abstract class ModificationStatement implements CQLStatement throw new InvalidRequestException(String.format("Write operation require a read but consistency %s is not supported on reads", cl)); } - ColumnSlice[] slices = new ColumnSlice[]{ clusteringPrefix.slice() }; - List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size()); - long now = System.currentTimeMillis(); + // TODO: no point in recomputing that every time. Should move to preparation phase. + PartitionColumns.Builder builder = PartitionColumns.builder(); + for (Operation op : allOperations()) + if (op.requiresRead()) + builder.add(op.column); + + PartitionColumns toRead = builder.build(); + + NavigableSet<Clustering> clusterings = FBUtilities.singleton(cbuilder.build(), cfm.comparator); + List<SinglePartitionReadCommand<?>> commands = new ArrayList<>(partitionKeys.size()); + int nowInSec = FBUtilities.nowInSeconds(); for (ByteBuffer key : partitionKeys) - commands.add(new SliceFromReadCommand(keyspace(), - key, - columnFamily(), - now, - new SliceQueryFilter(slices, false, Integer.MAX_VALUE))); - - List<Row> rows = local - ? SelectStatement.readLocally(keyspace(), commands) - : StorageProxy.read(commands, cl); - - Map<ByteBuffer, CQL3Row> map = new HashMap<ByteBuffer, CQL3Row>(); - for (Row row : rows) + commands.add(new SinglePartitionNamesCommand(cfm, + nowInSec, + ColumnFilter.selection(toRead), + RowFilter.NONE, + DataLimits.NONE, + StorageService.getPartitioner().decorateKey(key), + new ClusteringIndexNamesFilter(clusterings, false))); + + Map<DecoratedKey, Partition> map = new HashMap(); + + SinglePartitionReadCommand.Group group = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE); + + if (local) + { + try (ReadOrderGroup orderGroup = group.startOrderGroup(); PartitionIterator iter = group.executeInternal(orderGroup)) + { + return asMaterializedMap(iter); + } + } + else { - if (row.cf == null || row.cf.isEmpty()) - continue; + try (PartitionIterator iter = group.execute(cl, null)) + { + return asMaterializedMap(iter); + } + } + } - Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(row.cf.getSortedColumns().iterator()); - if (iter.hasNext()) + private Map<DecoratedKey, Partition> asMaterializedMap(PartitionIterator iterator) + { + Map<DecoratedKey, Partition> map = new HashMap(); + while (iterator.hasNext()) + { + try (RowIterator partition = iterator.next()) { - map.put(row.key.getKey(), iter.next()); - // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key) - assert !iter.hasNext(); + map.put(partition.partitionKey(), FilteredPartition.create(partition)); } } return map; @@ -492,14 +596,16 @@ public abstract class ModificationStatement implements CQLStatement { CQL3CasRequest request = makeCasRequest(queryState, options); - ColumnFamily result = StorageProxy.cas(keyspace(), - columnFamily(), - request.key, - request, - options.getSerialConsistency(), - options.getConsistency(), - queryState.getClientState()); - return new ResultMessage.Rows(buildCasResultSet(request.key, result, options)); + try (RowIterator result = StorageProxy.cas(keyspace(), + columnFamily(), + request.key, + request, + options.getSerialConsistency(), + options.getConsistency(), + queryState.getClientState())) + { + return new ResultMessage.Rows(buildCasResultSet(result, options)); + } } private CQL3CasRequest makeCasRequest(QueryState queryState, QueryOptions options) @@ -509,54 +615,54 @@ public abstract class ModificationStatement implements CQLStatement if (keys.size() > 1) throw new InvalidRequestException("IN on the partition key is not supported with conditional updates"); - ByteBuffer key = keys.get(0); + DecoratedKey key = StorageService.getPartitioner().decorateKey(keys.get(0)); long now = options.getTimestamp(queryState); - Composite prefix = createClusteringPrefix(options); + CBuilder cbuilder = createClustering(options); - CQL3CasRequest request = new CQL3CasRequest(cfm, key, false); - addConditions(prefix, request, options); - request.addRowUpdate(prefix, this, options, now); + CQL3CasRequest request = new CQL3CasRequest(cfm, key, false, conditionColumns(), updatesRegularRows(), updatesStaticRow()); + addConditions(cbuilder.build(), request, options); + request.addRowUpdate(cbuilder, this, options, now); return request; } - public void addConditions(Composite clusteringPrefix, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException + public void addConditions(Clustering clustering, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException { if (ifNotExists) { // If we use ifNotExists, if the statement applies to any non static columns, then the condition is on the row of the non-static - // columns and the prefix should be the clusteringPrefix. But if only static columns are set, then the ifNotExists apply to the existence + // columns and the prefix should be the clustering. But if only static columns are set, then the ifNotExists apply to the existence // of any static columns and we should use the prefix for the "static part" of the partition. - request.addNotExist(clusteringPrefix); + request.addNotExist(clustering); } else if (ifExists) { - request.addExist(clusteringPrefix); + request.addExist(clustering); } else { if (columnConditions != null) - request.addConditions(clusteringPrefix, columnConditions, options); + request.addConditions(clustering, columnConditions, options); if (staticConditions != null) - request.addConditions(cfm.comparator.staticPrefix(), staticConditions, options); + request.addConditions(Clustering.STATIC_CLUSTERING, staticConditions, options); } } - private ResultSet buildCasResultSet(ByteBuffer key, ColumnFamily cf, QueryOptions options) throws InvalidRequestException + private ResultSet buildCasResultSet(RowIterator partition, QueryOptions options) throws InvalidRequestException { - return buildCasResultSet(keyspace(), key, columnFamily(), cf, getColumnsWithConditions(), false, options); + return buildCasResultSet(keyspace(), columnFamily(), partition, getColumnsWithConditions(), false, options); } - public static ResultSet buildCasResultSet(String ksName, ByteBuffer key, String cfName, ColumnFamily cf, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options) + public static ResultSet buildCasResultSet(String ksName, String tableName, RowIterator partition, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options) throws InvalidRequestException { - boolean success = cf == null; + boolean success = partition == null; - ColumnSpecification spec = new ColumnSpecification(ksName, cfName, CAS_RESULT_COLUMN, BooleanType.instance); + ColumnSpecification spec = new ColumnSpecification(ksName, tableName, CAS_RESULT_COLUMN, BooleanType.instance); ResultSet.ResultMetadata metadata = new ResultSet.ResultMetadata(Collections.singletonList(spec)); List<List<ByteBuffer>> rows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success))); ResultSet rs = new ResultSet(metadata, rows); - return success ? rs : merge(rs, buildCasFailureResultSet(key, cf, columnsWithConditions, isBatch, options)); + return success ? rs : merge(rs, buildCasFailureResultSet(partition, columnsWithConditions, isBatch, options)); } private static ResultSet merge(ResultSet left, ResultSet right) @@ -582,10 +688,10 @@ public abstract class ModificationStatement implements CQLStatement return new ResultSet(new ResultSet.ResultMetadata(specs), rows); } - private static ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options) + private static ResultSet buildCasFailureResultSet(RowIterator partition, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options) throws InvalidRequestException { - CFMetaData cfm = cf.metadata(); + CFMetaData cfm = partition.metadata(); Selection selection; if (columnsWithConditions == null) { @@ -609,9 +715,8 @@ public abstract class ModificationStatement implements CQLStatement } - long now = System.currentTimeMillis(); - Selection.ResultSetBuilder builder = selection.resultSetBuilder(now, false); - SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, options, now, builder); + Selection.ResultSetBuilder builder = selection.resultSetBuilder(false); + SelectStatement.forSelection(cfm, selection).processPartition(partition, options, builder, FBUtilities.nowInSeconds()); return builder.build(options.getProtocolVersion()); } @@ -640,31 +745,31 @@ public abstract class ModificationStatement implements CQLStatement public ResultMessage executeInternalWithCondition(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException { CQL3CasRequest request = makeCasRequest(state, options); - ColumnFamily result = casInternal(request, state); - return new ResultMessage.Rows(buildCasResultSet(request.key, result, options)); + try (RowIterator result = casInternal(request, state)) + { + return new ResultMessage.Rows(buildCasResultSet(result, options)); + } } - static ColumnFamily casInternal(CQL3CasRequest request, QueryState state) + static RowIterator casInternal(CQL3CasRequest request, QueryState state) { UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp()); CFMetaData metadata = Schema.instance.getCFMetaData(request.cfm.ksName, request.cfm.cfName); - ReadCommand readCommand = ReadCommand.create(request.cfm.ksName, request.key, request.cfm.cfName, request.now, request.readFilter()); - Keyspace keyspace = Keyspace.open(request.cfm.ksName); - - Row row = readCommand.getRow(keyspace); - ColumnFamily current = row.cf; - if (!request.appliesTo(current)) + SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds()); + FilteredPartition current; + try (ReadOrderGroup orderGroup = readCommand.startOrderGroup(); PartitionIterator iter = readCommand.executeInternal(orderGroup)) { - if (current == null) - current = ArrayBackedSortedColumns.factory.create(metadata); - return current; + current = FilteredPartition.create(PartitionIterators.getOnlyElement(iter, readCommand)); } - ColumnFamily updates = request.makeUpdates(current); - updates = TriggerExecutor.instance.execute(request.key, updates); + if (!request.appliesTo(current)) + return current.rowIterator(); + + PartitionUpdate updates = request.makeUpdates(current); + updates = TriggerExecutor.instance.execute(updates); - Commit proposal = Commit.newProposal(request.key, ballot, updates); + Commit proposal = Commit.newProposal(ballot, updates); proposal.makeMutation().apply(); return null; } @@ -683,17 +788,18 @@ public abstract class ModificationStatement implements CQLStatement throws RequestExecutionException, RequestValidationException { List<ByteBuffer> keys = buildPartitionKeyNames(options); - Composite clusteringPrefix = createClusteringPrefix(options); + CBuilder clustering = createClustering(options); - UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, options, local, now); + UpdateParameters params = makeUpdateParameters(keys, clustering, options, local, now); Collection<IMutation> mutations = new ArrayList<IMutation>(keys.size()); for (ByteBuffer key: keys) { ThriftValidation.validateKey(cfm, key); - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm); - addUpdateForKey(cf, key, clusteringPrefix, params); - Mutation mut = new Mutation(cfm.ksName, key, cf); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + PartitionUpdate upd = new PartitionUpdate(cfm, dk, updatedColumns(), 1); + addUpdateForKey(upd, clustering, params); + Mutation mut = new Mutation(upd); mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut); } @@ -701,15 +807,15 @@ public abstract class ModificationStatement implements CQLStatement } public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys, - Composite prefix, + CBuilder clustering, QueryOptions options, boolean local, long now) throws RequestExecutionException, RequestValidationException { // Some lists operation requires reading - Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, options.getConsistency()); - return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), rows); + Map<DecoratedKey, Partition> lists = readRequiredLists(keys, clustering, local, options.getConsistency()); + return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), lists, true); } /** @@ -803,6 +909,8 @@ public abstract class ModificationStatement implements CQLStatement stmt.validateWhereClauseForConditions(); } + + stmt.finishPreparation(); return stmt; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index e2708cd..6a7f429 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -23,7 +23,8 @@ import java.util.*; import com.google.common.base.Objects; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.config.CFMetaData; @@ -34,7 +35,8 @@ import org.apache.cassandra.cql3.restrictions.StatementRestrictions; import org.apache.cassandra.cql3.selection.RawSelector; import org.apache.cassandra.cql3.selection.Selection; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.CollectionType; @@ -43,12 +45,9 @@ import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.service.QueryState; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.pager.Pageable; +import org.apache.cassandra.service.*; import org.apache.cassandra.service.pager.QueryPager; -import org.apache.cassandra.service.pager.QueryPagers; +import org.apache.cassandra.service.pager.PagingState; import org.apache.cassandra.thrift.ThriftValidation; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; @@ -71,6 +70,8 @@ import static org.apache.cassandra.utils.ByteBufferUtil.UNSET_BYTE_BUFFER; */ public class SelectStatement implements CQLStatement { + private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class); + private static final int DEFAULT_COUNT_PAGE_SIZE = 10000; private final int boundTerms; @@ -88,6 +89,8 @@ public class SelectStatement implements CQLStatement */ private final Comparator<List<ByteBuffer>> orderingComparator; + private final ColumnFilter queriedColumns; + // Used by forSelection below private static final Parameters defaultParameters = new Parameters(Collections.<ColumnIdentifier.Raw, Boolean>emptyMap(), false, false, false); @@ -108,6 +111,7 @@ public class SelectStatement implements CQLStatement this.orderingComparator = orderingComparator; this.parameters = parameters; this.limit = limit; + this.queriedColumns = gatherQueriedColumns(); } public Iterable<Function> getFunctions() @@ -117,6 +121,23 @@ public class SelectStatement implements CQLStatement limit != null ? limit.getFunctions() : Collections.<Function>emptySet()); } + // Note that the queried columns internally is different from the one selected by the + // user as it also include any column for which we have a restriction on. + private ColumnFilter gatherQueriedColumns() + { + if (selection.isWildcard()) + return ColumnFilter.all(cfm); + + ColumnFilter.Builder builder = ColumnFilter.allColumnsBuilder(cfm); + // Adds all selected columns + for (ColumnDefinition def : selection.getColumns()) + if (!def.isPrimaryKeyColumn()) + builder.add(def); + // as well as any restricted column (so we can actually apply the restriction) + builder.addAll(restrictions.nonPKRestrictedColumns()); + return builder.build(); + } + // Creates a simple select based on the given selection. // Note that the results select statement should not be used for actual queries, but only for processing already // queried data through processColumnFamily. @@ -161,31 +182,16 @@ public class SelectStatement implements CQLStatement cl.validateForRead(keyspace()); - int limit = getLimit(options); - long now = System.currentTimeMillis(); - Pageable command = getPageableCommand(options, limit, now); - int pageSize = getPageSize(options); + int nowInSec = FBUtilities.nowInSeconds(); + ReadQuery query = getQuery(options, nowInSec); - if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) - return execute(command, options, limit, now, state); - - QueryPager pager = QueryPagers.pager(command, cl, state.getClientState(), options.getPagingState()); - return execute(pager, options, limit, now, pageSize); - } + int pageSize = getPageSize(options); - private Pageable getPageableCommand(QueryOptions options, int limit, long now) throws RequestValidationException - { - int limitForQuery = updateLimitForQuery(limit); - if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing()) - return getRangeCommand(options, limitForQuery, now); + if (pageSize <= 0 || query.limits().count() <= pageSize) + return execute(query, options, state, nowInSec); - List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now); - return commands == null ? null : new Pageable.ReadCommands(commands, limitForQuery); - } - - public Pageable getPageableCommand(QueryOptions options) throws RequestValidationException - { - return getPageableCommand(options, getLimit(options), System.currentTimeMillis()); + QueryPager pager = query.getPager(options.getPagingState()); + return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec); } private int getPageSize(QueryOptions options) @@ -201,103 +207,162 @@ public class SelectStatement implements CQLStatement return pageSize; } - private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now, QueryState state) - throws RequestValidationException, RequestExecutionException + public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException + { + DataLimits limit = getLimit(options); + if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing()) + return getRangeCommand(options, limit, nowInSec); + + return getSliceCommands(options, limit, nowInSec); + } + + private ResultMessage.Rows execute(ReadQuery query, QueryOptions options, QueryState state, int nowInSec) throws RequestValidationException, RequestExecutionException { - List<Row> rows; - if (command == null) + try (PartitionIterator data = query.execute(options.getConsistency(), state.getClientState())) { - rows = Collections.<Row>emptyList(); + return processResults(data, options, nowInSec); } - else + } + + // Simple wrapper class to avoid some code duplication + private static abstract class Pager + { + protected QueryPager pager; + + protected Pager(QueryPager pager) + { + this.pager = pager; + } + + public static Pager forInternalQuery(QueryPager pager, ReadOrderGroup orderGroup) + { + return new InternalPager(pager, orderGroup); + } + + public static Pager forDistributedQuery(QueryPager pager, ConsistencyLevel consistency, ClientState clientState) + { + return new NormalPager(pager, consistency, clientState); + } + + public boolean isExhausted() + { + return pager.isExhausted(); + } + + public PagingState state() + { + return pager.state(); + } + + public abstract PartitionIterator fetchPage(int pageSize); + + public static class NormalPager extends Pager { - rows = command instanceof Pageable.ReadCommands - ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency(), state.getClientState()) - : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency()); + private final ConsistencyLevel consistency; + private final ClientState clientState; + + private NormalPager(QueryPager pager, ConsistencyLevel consistency, ClientState clientState) + { + super(pager); + this.consistency = consistency; + this.clientState = clientState; + } + + public PartitionIterator fetchPage(int pageSize) + { + return pager.fetchPage(pageSize, consistency, clientState); + } } - return processResults(rows, options, limit, now); + public static class InternalPager extends Pager + { + private final ReadOrderGroup orderGroup; + + private InternalPager(QueryPager pager, ReadOrderGroup orderGroup) + { + super(pager); + this.orderGroup = orderGroup; + } + + public PartitionIterator fetchPage(int pageSize) + { + return pager.fetchPageInternal(pageSize, orderGroup); + } + } } - private ResultMessage.Rows execute(QueryPager pager, QueryOptions options, int limit, long now, int pageSize) + private ResultMessage.Rows execute(Pager pager, QueryOptions options, int pageSize, int nowInSec) throws RequestValidationException, RequestExecutionException { if (selection.isAggregate()) - return pageAggregateQuery(pager, options, pageSize, now); + return pageAggregateQuery(pager, options, pageSize, nowInSec); // We can't properly do post-query ordering if we page (see #6722) checkFalse(needsPostQueryOrdering(), - "Cannot page queries with both ORDER BY and a IN restriction on the partition key;" - + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query"); + "Cannot page queries with both ORDER BY and a IN restriction on the partition key;" + + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query"); - List<Row> page = pager.fetchPage(pageSize); - ResultMessage.Rows msg = processResults(page, options, limit, now); + ResultMessage.Rows msg; + try (PartitionIterator page = pager.fetchPage(pageSize)) + { + msg = processResults(page, options, nowInSec); + } + // Please note that the isExhausted state of the pager only gets updated when we've closed the page, so this + // shouldn't be moved inside the 'try' above. if (!pager.isExhausted()) msg.result.metadata.setHasMorePages(pager.state()); return msg; } - private ResultMessage.Rows pageAggregateQuery(QueryPager pager, QueryOptions options, int pageSize, long now) - throws RequestValidationException, RequestExecutionException + private ResultMessage.Rows pageAggregateQuery(Pager pager, QueryOptions options, int pageSize, int nowInSec) + throws RequestValidationException, RequestExecutionException { - Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson); + Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson); while (!pager.isExhausted()) { - for (org.apache.cassandra.db.Row row : pager.fetchPage(pageSize)) + try (PartitionIterator iter = pager.fetchPage(pageSize)) { - // Not columns match the query, skip - if (row.cf == null) - continue; - - processColumnFamily(row.key.getKey(), row.cf, options, now, result); + while (iter.hasNext()) + processPartition(iter.next(), options, result, nowInSec); } } return new ResultMessage.Rows(result.build(options.getProtocolVersion())); } - public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException + private ResultMessage.Rows processResults(PartitionIterator partitions, QueryOptions options, int nowInSec) throws RequestValidationException { - ResultSet rset = process(rows, options, limit, now); + ResultSet rset = process(partitions, options, nowInSec); return new ResultMessage.Rows(rset); } - static List<Row> readLocally(String keyspaceName, List<ReadCommand> cmds) - { - Keyspace keyspace = Keyspace.open(keyspaceName); - List<Row> rows = new ArrayList<Row>(cmds.size()); - for (ReadCommand cmd : cmds) - rows.add(cmd.getRow(keyspace)); - return rows; - } - public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { - int limit = getLimit(options); - long now = System.currentTimeMillis(); - Pageable command = getPageableCommand(options, limit, now); + int nowInSec = FBUtilities.nowInSeconds(); + ReadQuery query = getQuery(options, nowInSec); int pageSize = getPageSize(options); - if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) + try (ReadOrderGroup orderGroup = query.startOrderGroup()) { - List<Row> rows = command == null - ? Collections.<Row>emptyList() - : (command instanceof Pageable.ReadCommands - ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands) - : ((RangeSliceCommand)command).executeLocally()); - - return processResults(rows, options, limit, now); + if (pageSize <= 0 || query.limits().count() <= pageSize) + { + try (PartitionIterator data = query.executeInternal(orderGroup)) + { + return processResults(data, options, nowInSec); + } + } + else + { + QueryPager pager = query.getPager(options.getPagingState()); + return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec); + } } - - QueryPager pager = QueryPagers.localPager(command); - return execute(pager, options, limit, now, pageSize); } - public ResultSet process(List<Row> rows) throws InvalidRequestException + public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException { - QueryOptions options = QueryOptions.DEFAULT; - return process(rows, options, getLimit(options), System.currentTimeMillis()); + return process(partitions, QueryOptions.DEFAULT, nowInSec); } public String keyspace() @@ -326,372 +391,239 @@ public class SelectStatement implements CQLStatement return restrictions; } - private List<ReadCommand> getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException + private ReadQuery getSliceCommands(QueryOptions options, DataLimits limit, int nowInSec) throws RequestValidationException { Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options); + if (keys.isEmpty()) + return ReadQuery.EMPTY; - List<ReadCommand> commands = new ArrayList<>(keys.size()); - - IDiskAtomFilter filter = makeFilter(options, limit); + ClusteringIndexFilter filter = makeClusteringIndexFilter(options); if (filter == null) - return null; + return ReadQuery.EMPTY; + + RowFilter rowFilter = getRowFilter(options); // Note that we use the total limit for every key, which is potentially inefficient. // However, IN + LIMIT is not a very sensible choice. + List<SinglePartitionReadCommand<?>> commands = new ArrayList<>(keys.size()); for (ByteBuffer key : keys) { QueryProcessor.validateKey(key); - // We should not share the slice filter amongst the commands (hence the cloneShallow), due to - // SliceQueryFilter not being immutable due to its columnCounter used by the lastCounted() method - // (this is fairly ugly and we should change that but that's probably not a tiny refactor to do that cleanly) - commands.add(ReadCommand.create(keyspace(), ByteBufferUtil.clone(key), columnFamily(), now, filter.cloneShallow())); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBufferUtil.clone(key)); + commands.add(SinglePartitionReadCommand.create(cfm, nowInSec, queriedColumns, rowFilter, limit, dk, filter)); } - return commands; + return new SinglePartitionReadCommand.Group(commands, limit); } - private RangeSliceCommand getRangeCommand(QueryOptions options, int limit, long now) throws RequestValidationException + private ReadQuery getRangeCommand(QueryOptions options, DataLimits limit, int nowInSec) throws RequestValidationException { - IDiskAtomFilter filter = makeFilter(options, limit); - if (filter == null) - return null; + ClusteringIndexFilter clusteringIndexFilter = makeClusteringIndexFilter(options); + if (clusteringIndexFilter == null) + return ReadQuery.EMPTY; - List<IndexExpression> expressions = getValidatedIndexExpressions(options); + RowFilter rowFilter = getRowFilter(options); // The LIMIT provided by the user is the number of CQL row he wants returned. // We want to have getRangeSlice to count the number of columns, not the number of keys. - AbstractBounds<RowPosition> keyBounds = restrictions.getPartitionKeyBounds(options); + AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options); return keyBounds == null - ? null - : new RangeSliceCommand(keyspace(), columnFamily(), now, filter, keyBounds, expressions, limit, !parameters.isDistinct, false); - } - - private ColumnSlice makeStaticSlice() - { - // Note: we could use staticPrefix.start() for the start bound, but EMPTY gives us the - // same effect while saving a few CPU cycles. - return isReversed - ? new ColumnSlice(cfm.comparator.staticPrefix().end(), Composites.EMPTY) - : new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()); + ? ReadQuery.EMPTY + : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter)); } - private IDiskAtomFilter makeFilter(QueryOptions options, int limit) + private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions options) throws InvalidRequestException { - int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size(); if (parameters.isDistinct) { - // For distinct, we only care about fetching the beginning of each partition. If we don't have - // static columns, we in fact only care about the first cell, so we query only that (we don't "group"). - // If we do have static columns, we do need to fetch the first full group (to have the static columns values). - - // See the comments on IGNORE_TOMBSTONED_PARTITIONS and CASSANDRA-8490 for why we use a special value for - // DISTINCT queries on the partition key only. - toGroup = selection.containsStaticColumns() ? toGroup : SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS; - return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, toGroup); + // We need to be able to distinguish between partition having live rows and those that don't. But + // doing so is not trivial since "having a live row" depends potentially on + // 1) when the query is performed, due to TTLs + // 2) how thing reconcile together between different nodes + // so that it's hard to really optimize properly internally. So to keep it simple, we simply query + // for the first row of the partition and hence uses Slices.ALL. We'll limit it to the first live + // row however in getLimit(). + return new ClusteringIndexSliceFilter(Slices.ALL, false); } - else if (restrictions.isColumnRange()) - { - List<Composite> startBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.START, options); - List<Composite> endBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.END, options); - assert startBounds.size() == endBounds.size(); - - // Handles fetching static columns. Note that for 2i, the filter is just used to restrict - // the part of the index to query so adding the static slice would be useless and confusing. - // For 2i, static columns are retrieve in CompositesSearcher with each index hit. - ColumnSlice staticSlice = selection.containsStaticColumns() && !restrictions.usesSecondaryIndexing() - ? makeStaticSlice() - : null; - - // The case where startBounds == 1 is common enough that it's worth optimizing - if (startBounds.size() == 1) - { - ColumnSlice slice = new ColumnSlice(startBounds.get(0), endBounds.get(0)); - if (slice.isAlwaysEmpty(cfm.comparator, isReversed)) - return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup); - - if (staticSlice == null) - return sliceFilter(slice, limit, toGroup); - - if (isReversed) - return slice.includes(cfm.comparator.reverseComparator(), staticSlice.start) - ? sliceFilter(new ColumnSlice(slice.start, staticSlice.finish), limit, toGroup) - : sliceFilter(new ColumnSlice[]{ slice, staticSlice }, limit, toGroup); - else - return slice.includes(cfm.comparator, staticSlice.finish) - ? sliceFilter(new ColumnSlice(staticSlice.start, slice.finish), limit, toGroup) - : sliceFilter(new ColumnSlice[]{ staticSlice, slice }, limit, toGroup); - } - - List<ColumnSlice> l = new ArrayList<ColumnSlice>(startBounds.size()); - for (int i = 0; i < startBounds.size(); i++) - { - ColumnSlice slice = new ColumnSlice(startBounds.get(i), endBounds.get(i)); - if (!slice.isAlwaysEmpty(cfm.comparator, isReversed)) - l.add(slice); - } - if (l.isEmpty()) - return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup); - if (staticSlice == null) - return sliceFilter(l.toArray(new ColumnSlice[l.size()]), limit, toGroup); + if (restrictions.isColumnRange()) + { + Slices slices = makeSlices(options); + if (slices == Slices.NONE && !selection.containsStaticColumns()) + return null; - // The slices should not overlap. We know the slices built from startBounds/endBounds don't, but if there is - // a static slice, it could overlap with the 2nd slice. Check for it and correct if that's the case - ColumnSlice[] slices; - if (isReversed) - { - if (l.get(l.size() - 1).includes(cfm.comparator.reverseComparator(), staticSlice.start)) - { - slices = l.toArray(new ColumnSlice[l.size()]); - slices[slices.length-1] = new ColumnSlice(slices[slices.length-1].start, Composites.EMPTY); - } - else - { - slices = l.toArray(new ColumnSlice[l.size()+1]); - slices[slices.length-1] = staticSlice; - } - } - else - { - if (l.get(0).includes(cfm.comparator, staticSlice.finish)) - { - slices = new ColumnSlice[l.size()]; - slices[0] = new ColumnSlice(Composites.EMPTY, l.get(0).finish); - for (int i = 1; i < l.size(); i++) - slices[i] = l.get(i); - } - else - { - slices = new ColumnSlice[l.size()+1]; - slices[0] = staticSlice; - for (int i = 0; i < l.size(); i++) - slices[i+1] = l.get(i); - } - } - return sliceFilter(slices, limit, toGroup); + return new ClusteringIndexSliceFilter(slices, isReversed); } else { - SortedSet<CellName> cellNames = getRequestedColumns(options); - if (cellNames == null) // in case of IN () for the last column of the key + NavigableSet<Clustering> clusterings = getRequestedRows(options); + if (clusterings.isEmpty() && !selection.containsStaticColumns()) // in case of IN () for the last column of the key return null; - QueryProcessor.validateCellNames(cellNames, cfm.comparator); - return new NamesQueryFilter(cellNames, true); + + return new ClusteringIndexNamesFilter(clusterings, isReversed); } } - private SliceQueryFilter sliceFilter(ColumnSlice slice, int limit, int toGroup) + private Slices makeSlices(QueryOptions options) + throws InvalidRequestException { - return sliceFilter(new ColumnSlice[]{ slice }, limit, toGroup); - } + SortedSet<Slice.Bound> startBounds = restrictions.getClusteringColumnsBounds(Bound.START, options); + SortedSet<Slice.Bound> endBounds = restrictions.getClusteringColumnsBounds(Bound.END, options); + assert startBounds.size() == endBounds.size(); - private SliceQueryFilter sliceFilter(ColumnSlice[] slices, int limit, int toGroup) - { - assert ColumnSlice.validateSlices(slices, cfm.comparator, isReversed) : String.format("Invalid slices: " + Arrays.toString(slices) + (isReversed ? " (reversed)" : "")); - return new SliceQueryFilter(slices, isReversed, limit, toGroup); + // The case where startBounds == 1 is common enough that it's worth optimizing + if (startBounds.size() == 1) + { + Slice.Bound start = startBounds.first(); + Slice.Bound end = endBounds.first(); + return cfm.comparator.compare(start, end) > 0 + ? Slices.NONE + : Slices.with(cfm.comparator, Slice.make(start, end)); + } + + Slices.Builder builder = new Slices.Builder(cfm.comparator, startBounds.size()); + Iterator<Slice.Bound> startIter = startBounds.iterator(); + Iterator<Slice.Bound> endIter = endBounds.iterator(); + while (startIter.hasNext() && endIter.hasNext()) + { + Slice.Bound start = startIter.next(); + Slice.Bound end = endIter.next(); + + // Ignore slices that are nonsensical + if (cfm.comparator.compare(start, end) > 0) + continue; + + builder.add(start, end); + } + + return builder.build(); } /** * May be used by custom QueryHandler implementations */ - public int getLimit(QueryOptions options) throws InvalidRequestException + public DataLimits getLimit(QueryOptions options) throws InvalidRequestException { - if (limit != null) + int userLimit = -1; + // If we aggregate, the limit really apply to the number of rows returned to the user, not to what is queried, and + // since in practice we currently only aggregate at top level (we have no GROUP BY support yet), we'll only ever + // return 1 result and can therefore basically ignore the user LIMIT in this case. + // Whenever we support GROUP BY, we'll have to add a new DataLimits kind that knows how things are grouped and is thus + // able to apply the user limit properly. + if (limit != null && !selection.isAggregate()) { ByteBuffer b = checkNotNull(limit.bindAndGet(options), "Invalid null value of limit"); // treat UNSET limit value as 'unlimited' - if (b == UNSET_BYTE_BUFFER) - return Integer.MAX_VALUE; - try - { - Int32Type.instance.validate(b); - int l = Int32Type.instance.compose(b); - checkTrue(l > 0, "LIMIT must be strictly positive"); - return l; - } - catch (MarshalException e) + if (b != UNSET_BYTE_BUFFER) { - throw new InvalidRequestException("Invalid limit value"); + try + { + Int32Type.instance.validate(b); + userLimit = Int32Type.instance.compose(b); + checkTrue(userLimit > 0, "LIMIT must be strictly positive"); + } + catch (MarshalException e) + { + throw new InvalidRequestException("Invalid limit value"); + } } } - return Integer.MAX_VALUE; - } - private int updateLimitForQuery(int limit) - { - // Internally, we don't support exclusive bounds for slices. Instead, we query one more element if necessary - // and exclude it later (in processColumnFamily) - return restrictions.isNonCompositeSliceWithExclusiveBounds() && limit != Integer.MAX_VALUE - ? limit + 1 - : limit; + if (parameters.isDistinct) + return userLimit < 0 ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(userLimit); + + return userLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(userLimit); } - private SortedSet<CellName> getRequestedColumns(QueryOptions options) throws InvalidRequestException + private NavigableSet<Clustering> getRequestedRows(QueryOptions options) throws InvalidRequestException { // Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762 // we always do a slice for CQL3 tables, so it's ok to ignore them here assert !restrictions.isColumnRange(); - SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator); - for (Composite composite : restrictions.getClusteringColumnsAsComposites(options)) - columns.addAll(addSelectedColumns(composite)); - return columns; - } - - private SortedSet<CellName> addSelectedColumns(Composite prefix) - { - if (cfm.comparator.isDense()) - { - return FBUtilities.singleton(cfm.comparator.create(prefix, null), cfm.comparator); - } - else - { - SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator); - - // We need to query the selected column as well as the marker - // column (for the case where the row exists but has no columns outside the PK) - // Two exceptions are "static CF" (non-composite non-compact CF) and "super CF" - // that don't have marker and for which we must query all columns instead - if (cfm.comparator.isCompound() && !cfm.isSuper()) - { - // marker - columns.add(cfm.comparator.rowMarker(prefix)); - - // selected columns - for (ColumnDefinition def : selection.getColumns()) - if (def.isRegular() || def.isStatic()) - columns.add(cfm.comparator.create(prefix, def)); - } - else - { - // We now that we're not composite so we can ignore static columns - for (ColumnDefinition def : cfm.regularColumns()) - columns.add(cfm.comparator.create(prefix, def)); - } - return columns; - } + return restrictions.getClusteringColumns(options); } /** * May be used by custom QueryHandler implementations */ - public List<IndexExpression> getValidatedIndexExpressions(QueryOptions options) throws InvalidRequestException + public RowFilter getRowFilter(QueryOptions options) throws InvalidRequestException { - if (!restrictions.usesSecondaryIndexing()) - return Collections.emptyList(); - ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily()); SecondaryIndexManager secondaryIndexManager = cfs.indexManager; - - List<IndexExpression> expressions = restrictions.getIndexExpressions(secondaryIndexManager, options); - - secondaryIndexManager.validateIndexSearchersForQuery(expressions); - - return expressions; - } - - private CellName makeExclusiveSliceBound(Bound bound, CellNameType type, QueryOptions options) throws InvalidRequestException - { - if (restrictions.areRequestedBoundsInclusive(bound)) - return null; - - return type.makeCellName(restrictions.getClusteringColumnsBounds(bound, options).get(0)); + RowFilter filter = restrictions.getRowFilter(secondaryIndexManager, options); + secondaryIndexManager.validateFilter(filter); + return filter; } - private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final QueryOptions options) throws InvalidRequestException + private ResultSet process(PartitionIterator partitions, QueryOptions options, int nowInSec) throws InvalidRequestException { - final CellNameType type = cfm.comparator; - - final CellName excludedStart = makeExclusiveSliceBound(Bound.START, type, options); - final CellName excludedEnd = makeExclusiveSliceBound(Bound.END, type, options); - - return Iterators.filter(cells, new Predicate<Cell>() + Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson); + while (partitions.hasNext()) { - public boolean apply(Cell c) + try (RowIterator partition = partitions.next()) { - // For dynamic CF, the column could be out of the requested bounds (because we don't support strict bounds internally (unless - // the comparator is composite that is)), filter here - return !((excludedStart != null && type.compare(c.name(), excludedStart) == 0) - || (excludedEnd != null && type.compare(c.name(), excludedEnd) == 0)); + processPartition(partition, options, result, nowInSec); } - }); - } - - private ResultSet process(List<Row> rows, QueryOptions options, int limit, long now) throws InvalidRequestException - { - Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson); - for (org.apache.cassandra.db.Row row : rows) - { - // Not columns match the query, skip - if (row.cf == null) - continue; - - processColumnFamily(row.key.getKey(), row.cf, options, now, result); } ResultSet cqlRows = result.build(options.getProtocolVersion()); orderResults(cqlRows); - // Internal calls always return columns in the comparator order, even when reverse was set - if (isReversed) - cqlRows.reverse(); - - // Trim result if needed to respect the user limit - cqlRows.trim(limit); return cqlRows; } - // Used by ModificationStatement for CAS operations - void processColumnFamily(ByteBuffer key, ColumnFamily cf, QueryOptions options, long now, Selection.ResultSetBuilder result) - throws InvalidRequestException + public static ByteBuffer[] getComponents(CFMetaData cfm, DecoratedKey dk) { - CFMetaData cfm = cf.metadata(); - ByteBuffer[] keyComponents = null; + ByteBuffer key = dk.getKey(); if (cfm.getKeyValidator() instanceof CompositeType) { - keyComponents = ((CompositeType)cfm.getKeyValidator()).split(key); + return ((CompositeType)cfm.getKeyValidator()).split(key); } else { - keyComponents = new ByteBuffer[]{ key }; + return new ByteBuffer[]{ key }; } + } - Iterator<Cell> cells = cf.getSortedColumns().iterator(); - if (restrictions.isNonCompositeSliceWithExclusiveBounds()) - cells = applySliceRestriction(cells, options); - + // Used by ModificationStatement for CAS operations + void processPartition(RowIterator partition, QueryOptions options, Selection.ResultSetBuilder result, int nowInSec) + throws InvalidRequestException + { int protocolVersion = options.getProtocolVersion(); - CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells); - // If there is static columns but there is no non-static row, then provided the select was a full - // partition selection (i.e. not a 2ndary index search and there was no condition on clustering columns) - // then we want to include the static columns in the result set (and we're done). - CQL3Row staticRow = iter.getStaticRow(); - if (staticRow != null && !iter.hasNext() && !restrictions.usesSecondaryIndexing() && restrictions.hasNoClusteringColumnsRestriction()) + ByteBuffer[] keyComponents = getComponents(cfm, partition.partitionKey()); + + Row staticRow = partition.staticRow().takeAlias(); + // If there is no rows, then provided the select was a full partition selection + // (i.e. not a 2ndary index search and there was no condition on clustering columns), + // we want to include static columns and we're done. + if (!partition.hasNext()) { - result.newRow(protocolVersion); - for (ColumnDefinition def : selection.getColumns()) + if (!staticRow.isEmpty() && (!restrictions.usesSecondaryIndexing() || cfm.isStaticCompactTable()) && !restrictions.hasClusteringColumnsRestriction()) { - switch (def.kind) + result.newRow(protocolVersion); + for (ColumnDefinition def : selection.getColumns()) { - case PARTITION_KEY: - result.add(keyComponents[def.position()]); - break; - case STATIC: - addValue(result, def, staticRow, options); - break; - default: - result.add((ByteBuffer)null); + switch (def.kind) + { + case PARTITION_KEY: + result.add(keyComponents[def.position()]); + break; + case STATIC: + addValue(result, def, staticRow, nowInSec, protocolVersion); + break; + default: + result.add((ByteBuffer)null); + } } } return; } - while (iter.hasNext()) + while (partition.hasNext()) { - CQL3Row cql3Row = iter.next(); - - // Respect requested order + Row row = partition.next(); result.newRow(protocolVersion); // Respect selection order for (ColumnDefinition def : selection.getColumns()) @@ -702,41 +634,35 @@ public class SelectStatement implements CQLStatement result.add(keyComponents[def.position()]); break; case CLUSTERING_COLUMN: - result.add(cql3Row.getClusteringColumn(def.position())); - break; - case COMPACT_VALUE: - result.add(cql3Row.getColumn(null)); + result.add(row.clustering().get(def.position())); break; case REGULAR: - addValue(result, def, cql3Row, options); + addValue(result, def, row, nowInSec, protocolVersion); break; case STATIC: - addValue(result, def, staticRow, options); + addValue(result, def, staticRow, nowInSec, protocolVersion); break; } } } } - private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row, QueryOptions options) + private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, Row row, int nowInSec, int protocolVersion) { - if (row == null) + if (def.isComplex()) { - result.add((ByteBuffer)null); - return; + // Collections are the only complex types we have so far + assert def.type.isCollection() && def.type.isMultiCell(); + Iterator<Cell> cells = row.getCells(def); + if (cells == null) + result.add((ByteBuffer)null); + else + result.add(((CollectionType)def.type).serializeForNativeProtocol(def, cells, protocolVersion)); } - - if (def.type.isMultiCell()) + else { - List<Cell> cells = row.getMultiCellColumn(def.name); - ByteBuffer buffer = cells == null - ? null - : ((CollectionType)def.type).serializeForNativeProtocol(def, cells, options.getProtocolVersion()); - result.add(buffer); - return; + result.add(row.getCell(def), nowInSec); } - - result.add(row.getColumn(def.name)); } private boolean needsPostQueryOrdering() @@ -796,9 +722,6 @@ public class SelectStatement implements CQLStatement isReversed = isReversed(cfm); } - if (isReversed) - restrictions.reverse(); - checkNeedsFiltering(restrictions); SelectStatement stmt = new SelectStatement(cfm, @@ -832,7 +755,8 @@ public class SelectStatement implements CQLStatement whereClause, boundNames, selection.containsOnlyStaticColumns(), - selection.containsACollection()); + selection.containsACollection(), + parameters.allowFiltering); } catch (UnrecognizedEntityException e) { @@ -980,47 +904,12 @@ public class SelectStatement implements CQLStatement { // We will potentially filter data if either: // - Have more than one IndexExpression - // - Have no index expression and the column filter is not the identity + // - Have no index expression and the row filter is not the identity checkFalse(restrictions.needFiltering(), "Cannot execute this query as it might involve data filtering and " + "thus may have unpredictable performance. If you want to execute " + "this query despite the performance unpredictability, use ALLOW FILTERING"); } - - // We don't internally support exclusive slice bounds on non-composite tables. To deal with it we do an - // inclusive slice and remove post-query the value that shouldn't be returned. One problem however is that - // if there is a user limit, that limit may make the query return before the end of the slice is reached, - // in which case, once we'll have removed bound post-query, we might end up with less results than - // requested which would be incorrect. For single-partition query, this is not a problem, we just ask for - // one more result (see updateLimitForQuery()) since that's enough to compensate for that problem. For key - // range however, each returned row may include one result that will have to be trimmed, so we would have - // to bump the query limit by N where N is the number of rows we will return, but we don't know that in - // advance. So, since we currently don't have a good way to handle such query, we refuse it (#7059) rather - // than answering with something that is wrong. - if (restrictions.isNonCompositeSliceWithExclusiveBounds() && restrictions.isKeyRange() && limit != null) - { - SingleColumnRelation rel = findInclusiveClusteringRelationForCompact(restrictions.cfm); - throw invalidRequest("The query requests a restriction of rows with a strict bound (%s) over a range of partitions. " - + "This is not supported by the underlying storage engine for COMPACT tables if a LIMIT is provided. " - + "Please either make the condition non strict (%s) or remove the user LIMIT", rel, rel.withNonStrictOperator()); - } - } - - private SingleColumnRelation findInclusiveClusteringRelationForCompact(CFMetaData cfm) - { - for (Relation r : whereClause) - { - // We only call this when sliceRestriction != null, i.e. for compact table with non composite comparator, - // so it can't be a MultiColumnRelation. - SingleColumnRelation rel = (SingleColumnRelation)r; - - if (cfm.getColumnDefinition(rel.getEntity().prepare(cfm)).isClusteringColumn() - && (rel.operator() == Operator.GT || rel.operator() == Operator.LT)) - return rel; - } - - // We're not supposed to call this method unless we know this can't happen - throw new AssertionError(); } private boolean containsAlias(final ColumnIdentifier name)
