http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 04d3d13..d6f0757 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -39,10 +39,11 @@ import org.apache.cassandra.config.*; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.context.CounterContext; -import org.apache.cassandra.db.filter.ColumnSlice; import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; @@ -55,9 +56,7 @@ import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.service.*; import org.apache.cassandra.service.pager.QueryPagers; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.*; import org.apache.thrift.TException; public class CassandraServer implements Cassandra.Iface @@ -84,19 +83,15 @@ public class CassandraServer implements Cassandra.Iface return ThriftSessionManager.instance.currentSession(); } - protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState) + protected PartitionIterator read(List<SinglePartitionReadCommand<?>> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState) throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException { - // TODO - Support multiple column families per row, right now row only contains 1 column family - Map<DecoratedKey, ColumnFamily> columnFamilyKeyMap = new HashMap<DecoratedKey, ColumnFamily>(); - - List<Row> rows = null; try { schedule(DatabaseDescriptor.getReadRpcTimeout()); try { - rows = StorageProxy.read(commands, consistency_level, cState); + return StorageProxy.read(new SinglePartitionReadCommand.Group(commands, DataLimits.NONE), consistency_level, cState); } finally { @@ -105,180 +100,176 @@ public class CassandraServer implements Cassandra.Iface } catch (RequestExecutionException e) { - ThriftConversion.rethrow(e); - } - - for (Row row: rows) - { - columnFamilyKeyMap.put(row.key, row.cf); + throw ThriftConversion.rethrow(e); } - return columnFamilyKeyMap; } - public List<ColumnOrSuperColumn> thriftifyColumns(Collection<Cell> cells, boolean reverseOrder, long now) + public List<ColumnOrSuperColumn> thriftifyColumns(CFMetaData metadata, Iterator<LegacyLayout.LegacyCell> cells) { - ArrayList<ColumnOrSuperColumn> thriftColumns = new ArrayList<ColumnOrSuperColumn>(cells.size()); - for (Cell cell : cells) + ArrayList<ColumnOrSuperColumn> thriftColumns = new ArrayList<ColumnOrSuperColumn>(); + while (cells.hasNext()) { - if (!cell.isLive(now)) - continue; - - thriftColumns.add(thriftifyColumnWithName(cell, cell.name().toByteBuffer())); + LegacyLayout.LegacyCell cell = cells.next(); + thriftColumns.add(thriftifyColumnWithName(metadata, cell, cell.name.encode(metadata))); } - - // we have to do the reversing here, since internally we pass results around in ColumnFamily - // objects, which always sort their cells in the "natural" order - // TODO this is inconvenient for direct users of StorageProxy - if (reverseOrder) - Collections.reverse(thriftColumns); return thriftColumns; } - private ColumnOrSuperColumn thriftifyColumnWithName(Cell cell, ByteBuffer newName) + private ColumnOrSuperColumn thriftifyColumnWithName(CFMetaData metadata, LegacyLayout.LegacyCell cell, ByteBuffer newName) { - if (cell instanceof CounterCell) - return new ColumnOrSuperColumn().setCounter_column(thriftifySubCounter(cell).setName(newName)); + if (cell.isCounter()) + return new ColumnOrSuperColumn().setCounter_column(thriftifySubCounter(metadata, cell).setName(newName)); else - return new ColumnOrSuperColumn().setColumn(thriftifySubColumn(cell).setName(newName)); + return new ColumnOrSuperColumn().setColumn(thriftifySubColumn(cell, newName)); } - private Column thriftifySubColumn(Cell cell) + private Column thriftifySubColumn(CFMetaData metadata, LegacyLayout.LegacyCell cell) { - assert !(cell instanceof CounterCell); + return thriftifySubColumn(cell, cell.name.encode(metadata)); + } - Column thrift_column = new Column(cell.name().toByteBuffer()).setValue(cell.value()).setTimestamp(cell.timestamp()); - if (cell instanceof ExpiringCell) - { - thrift_column.setTtl(((ExpiringCell) cell).getTimeToLive()); - } + private Column thriftifySubColumn(LegacyLayout.LegacyCell cell, ByteBuffer name) + { + assert !cell.isCounter(); + + Column thrift_column = new Column(name).setValue(cell.value).setTimestamp(cell.timestamp); + if (cell.isExpiring()) + thrift_column.setTtl(cell.ttl); return thrift_column; } - private List<Column> thriftifyColumnsAsColumns(Collection<Cell> cells, long now) + private List<Column> thriftifyColumnsAsColumns(CFMetaData metadata, Iterator<LegacyLayout.LegacyCell> cells) { - List<Column> thriftColumns = new ArrayList<Column>(cells.size()); - for (Cell cell : cells) - { - if (!cell.isLive(now)) - continue; - - thriftColumns.add(thriftifySubColumn(cell)); - } + List<Column> thriftColumns = new ArrayList<Column>(); + while (cells.hasNext()) + thriftColumns.add(thriftifySubColumn(metadata, cells.next())); return thriftColumns; } - private CounterColumn thriftifySubCounter(Cell cell) + private CounterColumn thriftifySubCounter(CFMetaData metadata, LegacyLayout.LegacyCell cell) { - assert cell instanceof CounterCell; - return new CounterColumn(cell.name().toByteBuffer(), CounterContext.instance().total(cell.value())); + assert cell.isCounter(); + return new CounterColumn(cell.name.encode(metadata), CounterContext.instance().total(cell.value)); } - private List<ColumnOrSuperColumn> thriftifySuperColumns(Collection<Cell> cells, - boolean reverseOrder, - long now, + private List<ColumnOrSuperColumn> thriftifySuperColumns(CFMetaData metadata, + Iterator<LegacyLayout.LegacyCell> cells, boolean subcolumnsOnly, - boolean isCounterCF) + boolean isCounterCF, + boolean reversed) { if (subcolumnsOnly) { - ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(cells.size()); - for (Cell cell : cells) + ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(); + while (cells.hasNext()) { - if (!cell.isLive(now)) - continue; - - thriftSuperColumns.add(thriftifyColumnWithName(cell, SuperColumns.subName(cell.name()))); + LegacyLayout.LegacyCell cell = cells.next(); + thriftSuperColumns.add(thriftifyColumnWithName(metadata, cell, cell.name.superColumnSubName())); } - if (reverseOrder) + // Generally, cells come reversed if the query is reverse. However, this is not the case within a super column because + // internally a super column is a map within a row and those are never returned reversed. + if (reversed) Collections.reverse(thriftSuperColumns); return thriftSuperColumns; } else { if (isCounterCF) - return thriftifyCounterSuperColumns(cells, reverseOrder, now); + return thriftifyCounterSuperColumns(metadata, cells, reversed); else - return thriftifySuperColumns(cells, reverseOrder, now); + return thriftifySuperColumns(cells, reversed); } } - private List<ColumnOrSuperColumn> thriftifySuperColumns(Collection<Cell> cells, boolean reverseOrder, long now) + private List<ColumnOrSuperColumn> thriftifySuperColumns(Iterator<LegacyLayout.LegacyCell> cells, boolean reversed) { - ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(cells.size()); + ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(); SuperColumn current = null; - for (Cell cell : cells) + while (cells.hasNext()) { - if (!cell.isLive(now)) - continue; - - ByteBuffer scName = SuperColumns.scName(cell.name()); + LegacyLayout.LegacyCell cell = cells.next(); + ByteBuffer scName = cell.name.superColumnName(); if (current == null || !scName.equals(current.bufferForName())) { + // Generally, cells come reversed if the query is reverse. However, this is not the case within a super column because + // internally a super column is a map within a row and those are never returned reversed. + if (current != null && reversed) + Collections.reverse(current.columns); + current = new SuperColumn(scName, new ArrayList<Column>()); thriftSuperColumns.add(new ColumnOrSuperColumn().setSuper_column(current)); } - current.getColumns().add(thriftifySubColumn(cell).setName(SuperColumns.subName(cell.name()))); + current.getColumns().add(thriftifySubColumn(cell, cell.name.superColumnSubName())); } - if (reverseOrder) - Collections.reverse(thriftSuperColumns); + if (current != null && reversed) + Collections.reverse(current.columns); return thriftSuperColumns; } - private List<ColumnOrSuperColumn> thriftifyCounterSuperColumns(Collection<Cell> cells, boolean reverseOrder, long now) + private List<ColumnOrSuperColumn> thriftifyCounterSuperColumns(CFMetaData metadata, Iterator<LegacyLayout.LegacyCell> cells, boolean reversed) { - ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(cells.size()); + ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(); CounterSuperColumn current = null; - for (Cell cell : cells) + while (cells.hasNext()) { - if (!cell.isLive(now)) - continue; - - ByteBuffer scName = SuperColumns.scName(cell.name()); + LegacyLayout.LegacyCell cell = cells.next(); + ByteBuffer scName = cell.name.superColumnName(); if (current == null || !scName.equals(current.bufferForName())) { + // Generally, cells come reversed if the query is reverse. However, this is not the case within a super column because + // internally a super column is a map within a row and those are never returned reversed. + if (current != null && reversed) + Collections.reverse(current.columns); + current = new CounterSuperColumn(scName, new ArrayList<CounterColumn>()); thriftSuperColumns.add(new ColumnOrSuperColumn().setCounter_super_column(current)); } - current.getColumns().add(thriftifySubCounter(cell).setName(SuperColumns.subName(cell.name()))); + current.getColumns().add(thriftifySubCounter(metadata, cell).setName(cell.name.superColumnSubName())); } - - if (reverseOrder) - Collections.reverse(thriftSuperColumns); - return thriftSuperColumns; } - private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<ReadCommand> commands, boolean subColumnsOnly, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState) - throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException + private List<ColumnOrSuperColumn> thriftifyPartition(RowIterator partition, boolean subcolumnsOnly, boolean reversed, int cellLimit) { - Map<DecoratedKey, ColumnFamily> columnFamilies = readColumnFamily(commands, consistency_level, cState); - Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<ByteBuffer, List<ColumnOrSuperColumn>>(); - for (ReadCommand command: commands) + if (partition.isEmpty()) + return EMPTY_COLUMNS; + + Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.fromRowIterator(partition); + List<ColumnOrSuperColumn> result; + if (partition.metadata().isSuper()) { - ColumnFamily cf = columnFamilies.get(StorageService.getPartitioner().decorateKey(command.key)); - boolean reverseOrder = command instanceof SliceFromReadCommand && ((SliceFromReadCommand)command).filter.reversed; - List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(cf, subColumnsOnly, reverseOrder, command.timestamp); - columnFamiliesMap.put(command.key, thriftifiedColumns); + boolean isCounterCF = partition.metadata().isCounter(); + result = thriftifySuperColumns(partition.metadata(), cells, subcolumnsOnly, isCounterCF, reversed); + } + else + { + result = thriftifyColumns(partition.metadata(), cells); } - return columnFamiliesMap; + // Thrift count cells, but internally we only count them at "row" boundaries, which means that if the limit stops in the middle + // of an internal row we'll include a few additional cells. So trim it here. + return result.size() > cellLimit + ? result.subList(0, cellLimit) + : result; } - private List<ColumnOrSuperColumn> thriftifyColumnFamily(ColumnFamily cf, boolean subcolumnsOnly, boolean reverseOrder, long now) + private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<SinglePartitionReadCommand<?>> commands, boolean subColumnsOnly, int cellLimit, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState) + throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException { - if (cf == null || !cf.hasColumns()) - return EMPTY_COLUMNS; - - if (cf.metadata().isSuper()) + try (PartitionIterator results = read(commands, consistency_level, cState)) { - boolean isCounterCF = cf.metadata().isCounter(); - return thriftifySuperColumns(cf.getSortedColumns(), reverseOrder, now, subcolumnsOnly, isCounterCF); - } - else - { - return thriftifyColumns(cf.getSortedColumns(), reverseOrder, now); + Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<ByteBuffer, List<ColumnOrSuperColumn>>(); + while (results.hasNext()) + { + try (RowIterator iter = results.next()) + { + List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyPartition(iter, subColumnsOnly, iter.isReverseOrder(), cellLimit); + columnFamiliesMap.put(iter.partitionKey().getKey(), thriftifiedColumns); + } + } + return columnFamiliesMap; } } @@ -303,7 +294,8 @@ public class CassandraServer implements Cassandra.Iface ClientState cState = state(); String keyspace = cState.getKeyspace(); state().hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT); - return getSliceInternal(keyspace, key, column_parent, System.currentTimeMillis(), predicate, consistency_level, cState); + List<ColumnOrSuperColumn> result = getSliceInternal(keyspace, key, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState); + return result == null ? Collections.<ColumnOrSuperColumn>emptyList() : result; } catch (RequestValidationException e) { @@ -318,13 +310,13 @@ public class CassandraServer implements Cassandra.Iface private List<ColumnOrSuperColumn> getSliceInternal(String keyspace, ByteBuffer key, ColumnParent column_parent, - long timestamp, + int nowInSec, SlicePredicate predicate, ConsistencyLevel consistency_level, ClientState cState) throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException { - return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, timestamp, predicate, consistency_level, cState).get(key); + return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, nowInSec, predicate, consistency_level, cState).get(key); } public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level) @@ -351,7 +343,7 @@ public class CassandraServer implements Cassandra.Iface ClientState cState = state(); String keyspace = cState.getKeyspace(); cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT); - return multigetSliceInternal(keyspace, keys, column_parent, System.currentTimeMillis(), predicate, consistency_level, cState); + return multigetSliceInternal(keyspace, keys, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState); } catch (RequestValidationException e) { @@ -363,55 +355,179 @@ public class CassandraServer implements Cassandra.Iface } } - private SliceQueryFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SliceRange range) + private ClusteringIndexFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SliceRange range) + { + if (metadata.isSuper() && parent.isSetSuper_column()) + return new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(new SimpleClustering(parent.bufferForSuper_column()), metadata.comparator), range.reversed); + else + return new ClusteringIndexSliceFilter(makeSlices(metadata, range), range.reversed); + } + + private Slices makeSlices(CFMetaData metadata, SliceRange range) { - if (metadata.isSuper()) + // Note that in thrift, the bounds are reversed if the query is reversed, but not internally. + ByteBuffer start = range.reversed ? range.finish : range.start; + ByteBuffer finish = range.reversed ? range.start : range.finish; + return Slices.with(metadata.comparator, Slice.make(LegacyLayout.decodeBound(metadata, start, true).bound, LegacyLayout.decodeBound(metadata, finish, false).bound)); + } + + private ClusteringIndexFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SlicePredicate predicate) + throws org.apache.cassandra.exceptions.InvalidRequestException + { + try { - CellNameType columnType = new SimpleDenseCellNameType(metadata.comparator.subtype(parent.isSetSuper_column() ? 1 : 0)); - Composite start = columnType.fromByteBuffer(range.start); - Composite finish = columnType.fromByteBuffer(range.finish); - SliceQueryFilter filter = new SliceQueryFilter(start, finish, range.reversed, range.count); - return SuperColumns.fromSCSliceFilter(metadata.comparator, parent.bufferForSuper_column(), filter); + if (predicate.column_names != null) + { + if (metadata.isSuper()) + { + if (parent.isSetSuper_column()) + { + return new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(new SimpleClustering(parent.bufferForSuper_column()), metadata.comparator), false); + } + else + { + NavigableSet<Clustering> clusterings = new TreeSet<>(metadata.comparator); + for (ByteBuffer bb : predicate.column_names) + clusterings.add(new SimpleClustering(bb)); + return new ClusteringIndexNamesFilter(clusterings, false); + } + } + else + { + NavigableSet<Clustering> clusterings = new TreeSet<>(metadata.comparator); + for (ByteBuffer bb : predicate.column_names) + { + LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, parent.bufferForSuper_column(), bb); + clusterings.add(name.clustering); + } + return new ClusteringIndexNamesFilter(clusterings, false); + } + } + else + { + return toInternalFilter(metadata, parent, predicate.slice_range); + } } + catch (UnknownColumnException e) + { + throw new org.apache.cassandra.exceptions.InvalidRequestException(e.getMessage()); + } + } - Composite start = metadata.comparator.fromByteBuffer(range.start); - Composite finish = metadata.comparator.fromByteBuffer(range.finish); - return new SliceQueryFilter(start, finish, range.reversed, range.count); + private ColumnFilter makeColumnFilter(CFMetaData metadata, ColumnParent parent, SliceRange range) + { + if (metadata.isSuper() && parent.isSetSuper_column()) + { + // We want a slice of the dynamic columns + ColumnFilter.Builder builder = ColumnFilter.selectionBuilder(); + ColumnDefinition def = metadata.compactValueColumn(); + ByteBuffer start = range.reversed ? range.finish : range.start; + ByteBuffer finish = range.reversed ? range.start : range.finish; + builder.slice(def, start.hasRemaining() ? CellPath.create(start) : CellPath.BOTTOM, finish.hasRemaining() ? CellPath.create(finish) : CellPath.TOP); + + // We also want to add any staticly defined column if it's within the range + AbstractType<?> cmp = metadata.thriftColumnNameType(); + for (ColumnDefinition column : metadata.partitionColumns()) + { + if (CompactTables.isSuperColumnMapColumn(column)) + continue; + + ByteBuffer name = column.name.bytes; + if (cmp.compare(name, start) < 0 || cmp.compare(finish, name) > 0) + continue; + + builder.add(column); + } + return builder.build(); + } + return makeColumnFilter(metadata, makeSlices(metadata, range)); } - private IDiskAtomFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SlicePredicate predicate) + private ColumnFilter makeColumnFilter(CFMetaData metadata, Slices slices) { - IDiskAtomFilter filter; + PartitionColumns columns = metadata.partitionColumns(); + if (metadata.isStaticCompactTable() && !columns.statics.isEmpty()) + { + PartitionColumns.Builder builder = PartitionColumns.builder(); + builder.addAll(columns.regulars); + // We only want to include the static columns that are selected by the slices + for (ColumnDefinition def : columns.statics) + { + if (slices.selects(new SimpleClustering(def.name.bytes))) + builder.add(def); + } + columns = builder.build(); + } + return ColumnFilter.selection(columns); + } - if (predicate.column_names != null) + private ColumnFilter makeColumnFilter(CFMetaData metadata, ColumnParent parent, SlicePredicate predicate) + throws org.apache.cassandra.exceptions.InvalidRequestException + { + try { - if (metadata.isSuper()) + if (predicate.column_names != null) { - CellNameType columnType = new SimpleDenseCellNameType(metadata.comparator.subtype(parent.isSetSuper_column() ? 1 : 0)); - SortedSet<CellName> s = new TreeSet<>(columnType); - for (ByteBuffer bb : predicate.column_names) - s.add(columnType.cellFromByteBuffer(bb)); - filter = SuperColumns.fromSCNamesFilter(metadata.comparator, parent.bufferForSuper_column(), new NamesQueryFilter(s)); + if (metadata.isSuper()) + { + if (parent.isSetSuper_column()) + { + ColumnFilter.Builder builder = ColumnFilter.selectionBuilder(); + ColumnDefinition dynamicDef = metadata.compactValueColumn(); + for (ByteBuffer bb : predicate.column_names) + { + ColumnDefinition staticDef = metadata.getColumnDefinition(bb); + if (staticDef == null) + builder.select(dynamicDef, CellPath.create(bb)); + else + builder.add(staticDef); + } + return builder.build(); + } + else + { + return ColumnFilter.all(metadata); + } + } + else + { + PartitionColumns.Builder builder = new PartitionColumns.Builder(); + for (ByteBuffer bb : predicate.column_names) + { + LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, parent.bufferForSuper_column(), bb); + builder.add(name.column); + } + return ColumnFilter.selection(builder.build()); + } } else { - SortedSet<CellName> s = new TreeSet<CellName>(metadata.comparator); - for (ByteBuffer bb : predicate.column_names) - s.add(metadata.comparator.cellFromByteBuffer(bb)); - filter = new NamesQueryFilter(s); + return makeColumnFilter(metadata, parent, predicate.slice_range); } } - else + catch (UnknownColumnException e) { - filter = toInternalFilter(metadata, parent, predicate.slice_range); + throw new org.apache.cassandra.exceptions.InvalidRequestException(e.getMessage()); } - return filter; + } + + private DataLimits getLimits(int partitionLimit, boolean countSuperColumns, SlicePredicate predicate) + { + int cellsPerPartition = predicate.slice_range == null ? Integer.MAX_VALUE : predicate.slice_range.count; + return getLimits(partitionLimit, countSuperColumns, cellsPerPartition); + } + + private DataLimits getLimits(int partitionLimit, boolean countSuperColumns, int perPartitionCount) + { + return countSuperColumns + ? DataLimits.superColumnCountingLimits(partitionLimit, perPartitionCount) + : DataLimits.thriftLimits(partitionLimit, perPartitionCount); } private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<ByteBuffer> keys, ColumnParent column_parent, - long timestamp, + int nowInSec, SlicePredicate predicate, ConsistencyLevel consistency_level, ClientState cState) @@ -424,18 +540,19 @@ public class CassandraServer implements Cassandra.Iface org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level); consistencyLevel.validateForRead(keyspace); - List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size()); - IDiskAtomFilter filter = toInternalFilter(metadata, column_parent, predicate); + List<SinglePartitionReadCommand<?>> commands = new ArrayList<>(keys.size()); + ColumnFilter columnFilter = makeColumnFilter(metadata, column_parent, predicate); + ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, predicate); + DataLimits limits = getLimits(1, metadata.isSuper() && !column_parent.isSetSuper_column(), predicate); for (ByteBuffer key: keys) { ThriftValidation.validateKey(metadata, key); - // Note that we should not share a slice filter amongst the command, due to SliceQueryFilter not being immutable - // due to its columnCounter used by the lastCounted() method (also see SelectStatement.getSliceCommands) - commands.add(ReadCommand.create(keyspace, key, column_parent.getColumn_family(), timestamp, filter.cloneShallow())); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + commands.add(SinglePartitionReadCommand.create(true, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, dk, filter)); } - return getSlice(commands, column_parent.isSetSuper_column(), consistencyLevel, cState); + return getSlice(commands, column_parent.isSetSuper_column(), limits.perPartitionCount(), consistencyLevel, cState); } public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level) @@ -466,35 +583,58 @@ public class CassandraServer implements Cassandra.Iface ThriftValidation.validateKey(metadata, key); - IDiskAtomFilter filter; + ColumnFilter columns; + ClusteringIndexFilter filter; if (metadata.isSuper()) { - CellNameType columnType = new SimpleDenseCellNameType(metadata.comparator.subtype(column_path.column == null ? 0 : 1)); - SortedSet<CellName> names = new TreeSet<CellName>(columnType); - names.add(columnType.cellFromByteBuffer(column_path.column == null ? column_path.super_column : column_path.column)); - filter = SuperColumns.fromSCNamesFilter(metadata.comparator, column_path.column == null ? null : column_path.bufferForSuper_column(), new NamesQueryFilter(names)); + if (column_path.column == null) + { + // Selects a full super column + columns = ColumnFilter.all(metadata); + } + else + { + // Selects a single column within a super column + ColumnFilter.Builder builder = ColumnFilter.selectionBuilder(); + ColumnDefinition staticDef = metadata.getColumnDefinition(column_path.column); + ColumnDefinition dynamicDef = metadata.compactValueColumn(); + + if (staticDef != null) + builder.add(staticDef); + // Note that even if there is a staticDef, we still query the dynamicDef since we can't guarantee the static one hasn't + // been created after data has been inserted for that definition + builder.select(dynamicDef, CellPath.create(column_path.column)); + columns = builder.build(); + } + filter = new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(new SimpleClustering(column_path.super_column), metadata.comparator), + false); } else { - SortedSet<CellName> names = new TreeSet<CellName>(metadata.comparator); - names.add(metadata.comparator.cellFromByteBuffer(column_path.column)); - filter = new NamesQueryFilter(names); + LegacyLayout.LegacyCellName cellname = LegacyLayout.decodeCellName(metadata, column_path.super_column, column_path.column); + columns = ColumnFilter.selection(PartitionColumns.of(cellname.column)); + filter = new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(cellname.clustering, metadata.comparator), false); } long now = System.currentTimeMillis(); - ReadCommand command = ReadCommand.create(keyspace, key, column_path.column_family, now, filter); - - Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistencyLevel, cState); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + SinglePartitionReadCommand<?> command = SinglePartitionReadCommand.create(true, metadata, FBUtilities.nowInSeconds(), columns, RowFilter.NONE, DataLimits.NONE, dk, filter); - ColumnFamily cf = cfamilies.get(StorageService.getPartitioner().decorateKey(command.key)); - - if (cf == null) - throw new NotFoundException(); - List<ColumnOrSuperColumn> tcolumns = thriftifyColumnFamily(cf, metadata.isSuper() && column_path.column != null, false, now); - if (tcolumns.isEmpty()) - throw new NotFoundException(); - assert tcolumns.size() == 1; - return tcolumns.get(0); + try (RowIterator result = PartitionIterators.getOnlyElement(read(Arrays.<SinglePartitionReadCommand<?>>asList(command), consistencyLevel, cState), command)) + { + if (!result.hasNext()) + throw new NotFoundException(); + + List<ColumnOrSuperColumn> tcolumns = thriftifyPartition(result, metadata.isSuper() && column_path.column != null, result.isReverseOrder(), 1); + if (tcolumns.isEmpty()) + throw new NotFoundException(); + assert tcolumns.size() == 1; + return tcolumns.get(0); + } + } + catch (UnknownColumnException e) + { + throw new InvalidRequestException(e.getMessage()); } catch (RequestValidationException e) { @@ -529,10 +669,10 @@ public class CassandraServer implements Cassandra.Iface cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT); Keyspace keyspaceName = Keyspace.open(keyspace); ColumnFamilyStore cfs = keyspaceName.getColumnFamilyStore(column_parent.column_family); - long timestamp = System.currentTimeMillis(); + int nowInSec = FBUtilities.nowInSeconds(); if (predicate.column_names != null) - return getSliceInternal(keyspace, key, column_parent, timestamp, predicate, consistency_level, cState).size(); + return getSliceInternal(keyspace, key, column_parent, nowInSec, predicate, consistency_level, cState).size(); int pageSize; // request by page if this is a large row @@ -551,16 +691,34 @@ public class CassandraServer implements Cassandra.Iface SliceRange sliceRange = predicate.slice_range == null ? new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE) : predicate.slice_range; - SliceQueryFilter filter = toInternalFilter(cfs.metadata, column_parent, sliceRange); - return QueryPagers.countPaged(keyspace, - column_parent.column_family, - key, + ColumnFilter columnFilter; + ClusteringIndexFilter filter; + if (cfs.metadata.isSuper() && !column_parent.isSetSuper_column()) + { + // If we count on a super column table without having set the super column name, we're in fact interested by the count of super columns + columnFilter = ColumnFilter.all(cfs.metadata); + filter = new ClusteringIndexSliceFilter(makeSlices(cfs.metadata, sliceRange), sliceRange.reversed); + } + else + { + columnFilter = makeColumnFilter(cfs.metadata, column_parent, sliceRange); + filter = toInternalFilter(cfs.metadata, column_parent, sliceRange); + } + + DataLimits limits = getLimits(1, cfs.metadata.isSuper() && !column_parent.isSetSuper_column(), predicate); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + + return QueryPagers.countPaged(cfs.metadata, + dk, + columnFilter, filter, + limits, ThriftConversion.fromThrift(consistency_level), cState, pageSize, - timestamp); + nowInSec, + true); } catch (IllegalArgumentException e) { @@ -612,7 +770,7 @@ public class CassandraServer implements Cassandra.Iface Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = multigetSliceInternal(keyspace, keys, column_parent, - System.currentTimeMillis(), + FBUtilities.nowInSeconds(), predicate, consistency_level, cState); @@ -642,25 +800,30 @@ public class CassandraServer implements Cassandra.Iface ThriftValidation.validateKey(metadata, key); ThriftValidation.validateColumnParent(metadata, column_parent); // SuperColumn field is usually optional, but not when we're inserting - if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null) + if (metadata.isSuper() && column_parent.super_column == null) { throw new org.apache.cassandra.exceptions.InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family); } ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name)); - ThriftValidation.validateColumnData(metadata, key, column_parent.super_column, column); + ThriftValidation.validateColumnData(metadata, column_parent.super_column, column); org.apache.cassandra.db.Mutation mutation; try { - CellName name = metadata.isSuper() - ? metadata.comparator.makeCellName(column_parent.super_column, column.name) - : metadata.comparator.cellFromByteBuffer(column.name); + LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name); + + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + PartitionUpdate update = new PartitionUpdate(metadata, dk, PartitionColumns.of(name.column), 1); - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cState.getKeyspace(), column_parent.column_family); - cf.addColumn(name, column.value, column.timestamp, column.ttl); - mutation = new org.apache.cassandra.db.Mutation(cState.getKeyspace(), key, cf); + Row.Writer writer = name.column.isStatic() ? update.staticWriter() : update.writer(); + name.clustering.writeTo(writer); + CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement); + writer.writeCell(name.column, false, column.value, SimpleLivenessInfo.forUpdate(column.timestamp, column.ttl, FBUtilities.nowInSeconds(), metadata), path); + writer.endOfRow(); + + mutation = new org.apache.cassandra.db.Mutation(update); } - catch (MarshalException e) + catch (MarshalException|UnknownColumnException e) { throw new org.apache.cassandra.exceptions.InvalidRequestException(e.getMessage()); } @@ -728,7 +891,7 @@ public class CassandraServer implements Cassandra.Iface CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family, false); ThriftValidation.validateKey(metadata, key); - if (metadata.cfType == ColumnFamilyType.Super) + if (metadata.isSuper()) throw new org.apache.cassandra.exceptions.InvalidRequestException("CAS does not support supercolumns"); Iterable<ByteBuffer> names = Iterables.transform(updates, new Function<Column, ByteBuffer>() @@ -740,36 +903,36 @@ public class CassandraServer implements Cassandra.Iface }); ThriftValidation.validateColumnNames(metadata, new ColumnParent(column_family), names); for (Column column : updates) - ThriftValidation.validateColumnData(metadata, key, null, column); + ThriftValidation.validateColumnData(metadata, null, column); CFMetaData cfm = Schema.instance.getCFMetaData(cState.getKeyspace(), column_family); - ColumnFamily cfUpdates = ArrayBackedSortedColumns.factory.create(cfm); - for (Column column : updates) - cfUpdates.addColumn(cfm.comparator.cellFromByteBuffer(column.name), column.value, column.timestamp); - ColumnFamily cfExpected; - if (expected.isEmpty()) - { - cfExpected = null; - } - else - { - cfExpected = ArrayBackedSortedColumns.factory.create(cfm); - for (Column column : expected) - cfExpected.addColumn(cfm.comparator.cellFromByteBuffer(column.name), column.value, column.timestamp); - } + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + int nowInSec = FBUtilities.nowInSeconds(); + + PartitionUpdate partitionUpdates = RowIterators.toUpdate(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec)); + + FilteredPartition partitionExpected = null; + if (!expected.isEmpty()) + partitionExpected = FilteredPartition.create(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, expected, nowInSec).iterator(), nowInSec)); schedule(DatabaseDescriptor.getWriteRpcTimeout()); - ColumnFamily result = StorageProxy.cas(cState.getKeyspace(), - column_family, - key, - new ThriftCASRequest(cfExpected, cfUpdates), - ThriftConversion.fromThrift(serial_consistency_level), - ThriftConversion.fromThrift(commit_consistency_level), - cState); - return result == null - ? new CASResult(true) - : new CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(result.getSortedColumns(), System.currentTimeMillis())); + try (RowIterator result = StorageProxy.cas(cState.getKeyspace(), + column_family, + dk, + new ThriftCASRequest(partitionExpected, partitionUpdates), + ThriftConversion.fromThrift(serial_consistency_level), + ThriftConversion.fromThrift(commit_consistency_level), + cState)) + { + return result == null + ? new CASResult(true) + : new CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(metadata, LegacyLayout.fromRowIterator(result))); + } + } + catch (UnknownColumnException e) + { + throw new InvalidRequestException(e.getMessage()); } catch (RequestTimeoutException e) { @@ -789,18 +952,129 @@ public class CassandraServer implements Cassandra.Iface } } + private LegacyLayout.LegacyCell toLegacyCell(CFMetaData metadata, Column column, int nowInSec) throws UnknownColumnException + { + return toLegacyCell(metadata, null, column, nowInSec); + } + + private LegacyLayout.LegacyCell toLegacyCell(CFMetaData metadata, ByteBuffer superColumnName, Column column, int nowInSec) + throws UnknownColumnException + { + return column.ttl > 0 + ? LegacyLayout.LegacyCell.expiring(metadata, superColumnName, column.name, column.value, column.timestamp, column.ttl, nowInSec) + : LegacyLayout.LegacyCell.regular(metadata, superColumnName, column.name, column.value, column.timestamp); + } + + private LegacyLayout.LegacyCell toLegacyDeletion(CFMetaData metadata, ByteBuffer name, long timestamp, int nowInSec) + throws UnknownColumnException + { + return toLegacyDeletion(metadata, null, name, timestamp, nowInSec); + } + + private LegacyLayout.LegacyCell toLegacyDeletion(CFMetaData metadata, ByteBuffer superColumnName, ByteBuffer name, long timestamp, int nowInSec) + throws UnknownColumnException + { + return LegacyLayout.LegacyCell.tombstone(metadata, superColumnName, name, timestamp, nowInSec); + } + + private LegacyLayout.LegacyCell toCounterLegacyCell(CFMetaData metadata, CounterColumn column) + throws UnknownColumnException + { + return toCounterLegacyCell(metadata, null, column); + } + + private LegacyLayout.LegacyCell toCounterLegacyCell(CFMetaData metadata, ByteBuffer superColumnName, CounterColumn column) + throws UnknownColumnException + { + return LegacyLayout.LegacyCell.counter(metadata, superColumnName, column.name, column.value); + } + + private void sortAndMerge(CFMetaData metadata, List<LegacyLayout.LegacyCell> cells, int nowInSec) + { + Collections.sort(cells, LegacyLayout.legacyCellComparator(metadata)); + + // After sorting, if we have multiple cells for the same "cellname", we want to merge those together. + Comparator<LegacyLayout.LegacyCellName> comparator = LegacyLayout.legacyCellNameComparator(metadata, false); + + int previous = 0; // The last element that was set + for (int current = 1; current < cells.size(); current++) + { + LegacyLayout.LegacyCell pc = cells.get(previous); + LegacyLayout.LegacyCell cc = cells.get(current); + + // There is really only 2 possible comparison: < 0 or == 0 since we've sorted already + int cmp = comparator.compare(pc.name, cc.name); + if (cmp == 0) + { + // current and previous are the same cell. Merge current into previous + // (and so previous + 1 will be "free"). + Conflicts.Resolution res; + if (metadata.isCounter()) + { + res = Conflicts.resolveCounter(pc.timestamp, pc.isLive(nowInSec), pc.value, + cc.timestamp, cc.isLive(nowInSec), cc.value); + + } + else + { + res = Conflicts.resolveRegular(pc.timestamp, pc.isLive(nowInSec), pc.localDeletionTime, pc.value, + cc.timestamp, cc.isLive(nowInSec), cc.localDeletionTime, cc.value); + } + + switch (res) + { + case LEFT_WINS: + // The previous cell wins, we'll just ignore current + break; + case RIGHT_WINS: + cells.set(previous, cc); + break; + case MERGE: + assert metadata.isCounter(); + ByteBuffer merged = Conflicts.mergeCounterValues(pc.value, cc.value); + cells.set(previous, LegacyLayout.LegacyCell.counter(pc.name, merged)); + break; + } + } + else + { + // cell.get(previous) < cells.get(current), so move current just after previous if needs be + ++previous; + if (previous != current) + cells.set(previous, cc); + } + } + + // The last element we want is previous, so trim anything after that + for (int i = cells.size() - 1; i > previous; i--) + cells.remove(i); + } + + private List<LegacyLayout.LegacyCell> toLegacyCells(CFMetaData metadata, List<Column> columns, int nowInSec) + throws UnknownColumnException + { + List<LegacyLayout.LegacyCell> cells = new ArrayList<>(columns.size()); + for (Column column : columns) + cells.add(toLegacyCell(metadata, column, nowInSec)); + + sortAndMerge(metadata, cells, nowInSec); + return cells; + } + private List<IMutation> createMutationList(ConsistencyLevel consistency_level, Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, boolean allowCounterMutations) - throws RequestValidationException + throws RequestValidationException, InvalidRequestException { List<IMutation> mutations = new ArrayList<>(); ThriftClientState cState = state(); String keyspace = cState.getKeyspace(); + int nowInSec = FBUtilities.nowInSeconds(); for (Map.Entry<ByteBuffer, Map<String, List<Mutation>>> mutationEntry: mutation_map.entrySet()) { ByteBuffer key = mutationEntry.getKey(); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); // We need to separate mutation for standard cf and counter cf (that will be encapsulated in a // CounterMutation) because it doesn't follow the same code path @@ -811,38 +1085,46 @@ public class CassandraServer implements Cassandra.Iface for (Map.Entry<String, List<Mutation>> columnFamilyMutations : columnFamilyToMutations.entrySet()) { String cfName = columnFamilyMutations.getKey(); + List<Mutation> muts = columnFamilyMutations.getValue(); cState.hasColumnFamilyAccess(keyspace, cfName, Permission.MODIFY); CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, cfName); ThriftValidation.validateKey(metadata, key); - - org.apache.cassandra.db.Mutation mutation; if (metadata.isCounter()) - { ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata); - counterMutation = counterMutation == null ? new org.apache.cassandra.db.Mutation(keyspace, key) : counterMutation; - mutation = counterMutation; - } - else - { - standardMutation = standardMutation == null ? new org.apache.cassandra.db.Mutation(keyspace, key) : standardMutation; - mutation = standardMutation; - } - for (Mutation m : columnFamilyMutations.getValue()) + DeletionInfo delInfo = DeletionInfo.live(); + List<LegacyLayout.LegacyCell> cells = new ArrayList<>(); + for (Mutation m : muts) { - ThriftValidation.validateMutation(metadata, key, m); + ThriftValidation.validateMutation(metadata, m); if (m.deletion != null) { - deleteColumnOrSuperColumn(mutation, metadata, m.deletion); + deleteColumnOrSuperColumn(delInfo, cells, metadata, m.deletion, nowInSec); } if (m.column_or_supercolumn != null) { - addColumnOrSuperColumn(mutation, metadata, m.column_or_supercolumn); + addColumnOrSuperColumn(cells, metadata, m.column_or_supercolumn, nowInSec); } } + + sortAndMerge(metadata, cells, nowInSec); + PartitionUpdate update = UnfilteredRowIterators.toUpdate(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator())); + + org.apache.cassandra.db.Mutation mutation; + if (metadata.isCounter()) + { + counterMutation = counterMutation == null ? new org.apache.cassandra.db.Mutation(keyspace, dk) : counterMutation; + mutation = counterMutation; + } + else + { + standardMutation = standardMutation == null ? new org.apache.cassandra.db.Mutation(keyspace, dk) : standardMutation; + mutation = standardMutation; + } + mutation.add(update); } if (standardMutation != null && !standardMutation.isEmpty()) mutations.add(standardMutation); @@ -859,70 +1141,91 @@ public class CassandraServer implements Cassandra.Iface return mutations; } - private void addColumnOrSuperColumn(org.apache.cassandra.db.Mutation mutation, CFMetaData cfm, ColumnOrSuperColumn cosc) + private void addColumnOrSuperColumn(List<LegacyLayout.LegacyCell> cells, CFMetaData cfm, ColumnOrSuperColumn cosc, int nowInSec) + throws InvalidRequestException { - if (cosc.super_column != null) + try { - for (Column column : cosc.super_column.columns) + if (cosc.super_column != null) { - mutation.add(cfm.cfName, cfm.comparator.makeCellName(cosc.super_column.name, column.name), column.value, column.timestamp, column.ttl); + for (Column column : cosc.super_column.columns) + cells.add(toLegacyCell(cfm, cosc.super_column.name, column, nowInSec)); } - } - else if (cosc.column != null) - { - mutation.add(cfm.cfName, cfm.comparator.cellFromByteBuffer(cosc.column.name), cosc.column.value, cosc.column.timestamp, cosc.column.ttl); - } - else if (cosc.counter_super_column != null) - { - for (CounterColumn column : cosc.counter_super_column.columns) + else if (cosc.column != null) + { + cells.add(toLegacyCell(cfm, cosc.column, nowInSec)); + } + else if (cosc.counter_super_column != null) + { + for (CounterColumn column : cosc.counter_super_column.columns) + cells.add(toCounterLegacyCell(cfm, cosc.counter_super_column.name, column)); + } + else // cosc.counter_column != null { - mutation.addCounter(cfm.cfName, cfm.comparator.makeCellName(cosc.counter_super_column.name, column.name), column.value); + cells.add(toCounterLegacyCell(cfm, cosc.counter_column)); } } - else // cosc.counter_column != null + catch (UnknownColumnException e) { - mutation.addCounter(cfm.cfName, cfm.comparator.cellFromByteBuffer(cosc.counter_column.name), cosc.counter_column.value); + throw new InvalidRequestException(e.getMessage()); } } - private void deleteColumnOrSuperColumn(org.apache.cassandra.db.Mutation mutation, CFMetaData cfm, Deletion del) + private void addRange(CFMetaData cfm, DeletionInfo delInfo, Slice.Bound start, Slice.Bound end, long timestamp, int nowInSec) + { + delInfo.add(new RangeTombstone(Slice.make(start, end), new SimpleDeletionTime(timestamp, nowInSec)), cfm.comparator); + } + + private void deleteColumnOrSuperColumn(DeletionInfo delInfo, List<LegacyLayout.LegacyCell> cells, CFMetaData cfm, Deletion del, int nowInSec) + throws InvalidRequestException { if (del.predicate != null && del.predicate.column_names != null) { for (ByteBuffer c : del.predicate.column_names) { - if (del.super_column == null && cfm.isSuper()) - mutation.deleteRange(cfm.cfName, SuperColumns.startOf(c), SuperColumns.endOf(c), del.timestamp); - else if (del.super_column != null) - mutation.delete(cfm.cfName, cfm.comparator.makeCellName(del.super_column, c), del.timestamp); - else - mutation.delete(cfm.cfName, cfm.comparator.cellFromByteBuffer(c), del.timestamp); + try + { + if (del.super_column == null && cfm.isSuper()) + addRange(cfm, delInfo, Slice.Bound.inclusiveStartOf(c), Slice.Bound.inclusiveEndOf(c), del.timestamp, nowInSec); + else if (del.super_column != null) + cells.add(toLegacyDeletion(cfm, del.super_column, c, del.timestamp, nowInSec)); + else + cells.add(toLegacyDeletion(cfm, c, del.timestamp, nowInSec)); + } + catch (UnknownColumnException e) + { + throw new InvalidRequestException(e.getMessage()); + } } } else if (del.predicate != null && del.predicate.slice_range != null) { - if (del.super_column == null && cfm.isSuper()) - mutation.deleteRange(cfm.cfName, - SuperColumns.startOf(del.predicate.getSlice_range().start), - SuperColumns.endOf(del.predicate.getSlice_range().finish), - del.timestamp); - else if (del.super_column != null) - mutation.deleteRange(cfm.cfName, - cfm.comparator.makeCellName(del.super_column, del.predicate.getSlice_range().start), - cfm.comparator.makeCellName(del.super_column, del.predicate.getSlice_range().finish), - del.timestamp); + if (del.super_column == null) + { + addRange(cfm, + delInfo, + LegacyLayout.decodeBound(cfm, del.predicate.getSlice_range().start, true).bound, + LegacyLayout.decodeBound(cfm, del.predicate.getSlice_range().finish, false).bound, + del.timestamp, + nowInSec); + } else - mutation.deleteRange(cfm.cfName, - cfm.comparator.fromByteBuffer(del.predicate.getSlice_range().start), - cfm.comparator.fromByteBuffer(del.predicate.getSlice_range().finish), - del.timestamp); + { + // Since we use a map for subcolumns, we would need range tombstone for collections to support this. + // And while we may want those some day, this require a bit of additional work. And since super columns + // are basically deprecated since a long time, and range tombstone on them has been only very recently + // added so that no thrift driver actually supports it to the best of my knowledge, it's likely ok to + // discontinue support for this. If it turns out that this is blocking the update of someone, we can + // decide then if we want to tackle the addition of range tombstone for collections then. + throw new InvalidRequestException("Cannot delete a range of subcolumns in a super column"); + } } else { if (del.super_column != null) - mutation.deleteRange(cfm.cfName, SuperColumns.startOf(del.super_column), SuperColumns.endOf(del.super_column), del.timestamp); + addRange(cfm, delInfo, Slice.Bound.inclusiveStartOf(del.super_column), Slice.Bound.inclusiveEndOf(del.super_column), del.timestamp, nowInSec); else - mutation.delete(cfm.cfName, del.timestamp); + delInfo.add(new SimpleDeletionTime(del.timestamp, nowInSec)); } } @@ -1005,15 +1308,41 @@ public class CassandraServer implements Cassandra.Iface if (isCommutativeOp) ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata); - org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(keyspace, key); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + + int nowInSec = FBUtilities.nowInSeconds(); + PartitionUpdate update; if (column_path.super_column == null && column_path.column == null) - mutation.delete(column_path.column_family, timestamp); - else if (column_path.super_column == null) - mutation.delete(column_path.column_family, metadata.comparator.cellFromByteBuffer(column_path.column), timestamp); - else if (column_path.column == null) - mutation.deleteRange(column_path.column_family, SuperColumns.startOf(column_path.super_column), SuperColumns.endOf(column_path.super_column), timestamp); + { + update = PartitionUpdate.fullPartitionDelete(metadata, dk, timestamp, nowInSec); + } + else if (column_path.super_column != null && column_path.column == null) + { + update = new PartitionUpdate(metadata, dk, PartitionColumns.NONE, 1); + Row.Writer writer = update.writer(); + writer.writeClusteringValue(column_path.super_column); + writer.writeRowDeletion(new SimpleDeletionTime(timestamp, nowInSec)); + writer.endOfRow(); + } else - mutation.delete(column_path.column_family, metadata.comparator.makeCellName(column_path.super_column, column_path.column), timestamp); + { + try + { + LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_path.super_column, column_path.column); + update = new PartitionUpdate(metadata, dk, PartitionColumns.of(name.column), 1); + Row.Writer writer = name.column.isStatic() ? update.staticWriter() : update.writer(); + name.clustering.writeTo(writer); + CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement); + writer.writeCell(name.column, false, ByteBufferUtil.EMPTY_BYTE_BUFFER, SimpleLivenessInfo.forDeletion(timestamp, nowInSec), path); + writer.endOfRow(); + } + catch (UnknownColumnException e) + { + throw new org.apache.cassandra.exceptions.InvalidRequestException(e.getMessage()); + } + } + + org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update); if (isCommutativeOp) doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level)))); @@ -1138,10 +1467,8 @@ public class CassandraServer implements Cassandra.Iface org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level); consistencyLevel.validateForRead(keyspace); - List<Row> rows = null; - IPartitioner p = StorageService.getPartitioner(); - AbstractBounds<RowPosition> bounds; + AbstractBounds<PartitionPosition> bounds; if (range.start_key == null) { Token.TokenFactory tokenFactory = p.getTokenFactory(); @@ -1151,32 +1478,36 @@ public class CassandraServer implements Cassandra.Iface } else { - RowPosition end = range.end_key == null + PartitionPosition end = range.end_key == null ? p.getTokenFactory().fromString(range.end_token).maxKeyBound() - : RowPosition.ForKey.get(range.end_key, p); - bounds = new Bounds<RowPosition>(RowPosition.ForKey.get(range.start_key, p), end); + : PartitionPosition.ForKey.get(range.end_key, p); + bounds = new Bounds<PartitionPosition>(PartitionPosition.ForKey.get(range.start_key, p), end); } - long now = System.currentTimeMillis(); + int nowInSec = FBUtilities.nowInSeconds(); schedule(DatabaseDescriptor.getRangeRpcTimeout()); try { - IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, metadata, column_parent.super_column); - rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, - column_parent.column_family, - now, - filter, - bounds, - ThriftConversion.indexExpressionsFromThrift(range.row_filter), - range.count), - consistencyLevel); + ColumnFilter columns = makeColumnFilter(metadata, column_parent, predicate); + ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, predicate); + DataLimits limits = getLimits(range.count, metadata.isSuper() && !column_parent.isSetSuper_column(), predicate); + PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, + true, + metadata, + nowInSec, + columns, + ThriftConversion.rowFilterFromThrift(metadata, range.row_filter), + limits, + new DataRange(bounds, filter)); + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) + { + assert results != null; + return thriftifyKeySlices(results, column_parent, limits.perPartitionCount()); + } } finally { release(); } - assert rows != null; - - return thriftifyKeySlices(rows, column_parent, predicate, now); } catch (RequestValidationException e) { @@ -1221,10 +1552,8 @@ public class CassandraServer implements Cassandra.Iface org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level); consistencyLevel.validateForRead(keyspace); - SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(start_column, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, -1)); - IPartitioner p = StorageService.getPartitioner(); - AbstractBounds<RowPosition> bounds; + AbstractBounds<PartitionPosition> bounds; if (range.start_key == null) { // (token, key) is unsupported, assume (token, token) @@ -1235,30 +1564,45 @@ public class CassandraServer implements Cassandra.Iface } else { - RowPosition end = range.end_key == null + PartitionPosition end = range.end_key == null ? p.getTokenFactory().fromString(range.end_token).maxKeyBound() - : RowPosition.ForKey.get(range.end_key, p); - bounds = new Bounds<RowPosition>(RowPosition.ForKey.get(range.start_key, p), end); + : PartitionPosition.ForKey.get(range.end_key, p); + bounds = new Bounds<PartitionPosition>(PartitionPosition.ForKey.get(range.start_key, p), end); } if (range.row_filter != null && !range.row_filter.isEmpty()) throw new InvalidRequestException("Cross-row paging is not supported along with index clauses"); - List<Row> rows; - long now = System.currentTimeMillis(); + int nowInSec = FBUtilities.nowInSeconds(); schedule(DatabaseDescriptor.getRangeRpcTimeout()); try { - IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, metadata, null); - rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, now, filter, bounds, null, range.count, true, true), consistencyLevel); + ClusteringIndexFilter filter = new ClusteringIndexSliceFilter(Slices.ALL, false); + DataLimits limits = getLimits(range.count, true, Integer.MAX_VALUE); + Clustering pageFrom = metadata.isSuper() + ? new SimpleClustering(start_column) + : LegacyLayout.decodeCellName(metadata, start_column).clustering; + PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, + true, + metadata, + nowInSec, + ColumnFilter.all(metadata), + RowFilter.NONE, + limits, + new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true)); + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) + { + return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount()); + } + } + catch (UnknownColumnException e) + { + throw new InvalidRequestException(e.getMessage()); } finally { release(); } - assert rows != null; - - return thriftifyKeySlices(rows, new ColumnParent(column_family), predicate, now); } catch (RequestValidationException e) { @@ -1274,17 +1618,22 @@ public class CassandraServer implements Cassandra.Iface } } - private List<KeySlice> thriftifyKeySlices(List<Row> rows, ColumnParent column_parent, SlicePredicate predicate, long now) + private List<KeySlice> thriftifyKeySlices(PartitionIterator results, ColumnParent column_parent, int cellLimit) { - List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size()); - boolean reversed = predicate.slice_range != null && predicate.slice_range.reversed; - for (Row row : rows) + try (PartitionIterator iter = results) { - List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(row.cf, column_parent.super_column != null, reversed, now); - keySlices.add(new KeySlice(row.key.getKey(), thriftifiedColumns)); - } + List<KeySlice> keySlices = new ArrayList<KeySlice>(); + while (iter.hasNext()) + { + try (RowIterator partition = iter.next()) + { + List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyPartition(partition, column_parent.super_column != null, partition.isReverseOrder(), cellLimit); + keySlices.add(new KeySlice(partition.partitionKey().getKey(), thriftifiedColumns)); + } + } - return keySlices; + return keySlices; + } } public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) @@ -1316,21 +1665,25 @@ public class CassandraServer implements Cassandra.Iface consistencyLevel.validateForRead(keyspace); IPartitioner p = StorageService.getPartitioner(); - AbstractBounds<RowPosition> bounds = new Bounds<RowPosition>(RowPosition.ForKey.get(index_clause.start_key, p), + AbstractBounds<PartitionPosition> bounds = new Bounds<PartitionPosition>(PartitionPosition.ForKey.get(index_clause.start_key, p), p.getMinimumToken().minKeyBound()); - IDiskAtomFilter filter = ThriftValidation.asIFilter(column_predicate, metadata, column_parent.super_column); - long now = System.currentTimeMillis(); - RangeSliceCommand command = new RangeSliceCommand(keyspace, - column_parent.column_family, - now, - filter, - bounds, - ThriftConversion.indexExpressionsFromThrift(index_clause.expressions), - index_clause.count); - - List<Row> rows = StorageProxy.getRangeSlice(command, consistencyLevel); - return thriftifyKeySlices(rows, column_parent, column_predicate, now); + int nowInSec = FBUtilities.nowInSeconds(); + ColumnFilter columns = makeColumnFilter(metadata, column_parent, column_predicate); + ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, column_predicate); + DataLimits limits = getLimits(index_clause.count, metadata.isSuper() && !column_parent.isSetSuper_column(), column_predicate); + PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, + true, + metadata, + nowInSec, + columns, + ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions), + limits, + new DataRange(bounds, filter)); + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) + { + return thriftifyKeySlices(results, column_parent, limits.perPartitionCount()); + } } catch (RequestValidationException e) { @@ -1751,24 +2104,34 @@ public class CassandraServer implements Cassandra.Iface ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata); ThriftValidation.validateColumnParent(metadata, column_parent); // SuperColumn field is usually optional, but not when we're adding - if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null) + if (metadata.isSuper() && column_parent.super_column == null) throw new InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family); ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name)); - org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(keyspace, key); try { - if (metadata.isSuper()) - mutation.addCounter(column_parent.column_family, metadata.comparator.makeCellName(column_parent.super_column, column.name), column.value); - else - mutation.addCounter(column_parent.column_family, metadata.comparator.cellFromByteBuffer(column.name), column.value); + LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + PartitionUpdate update = new PartitionUpdate(metadata, dk, PartitionColumns.of(name.column), 1); + + Row.Writer writer = name.column.isStatic() ? update.staticWriter() : update.writer(); + name.clustering.writeTo(writer); + CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement); + + // See UpdateParameters.addCounter() for more details on this + ByteBuffer value = CounterContext.instance().createLocal(column.value); + + writer.writeCell(name.column, true, value, SimpleLivenessInfo.forUpdate(FBUtilities.timestampMicros(), LivenessInfo.NO_TTL, FBUtilities.nowInSeconds(), metadata), path); + writer.endOfRow(); + + org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update); + doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level)))); } - catch (MarshalException e) + catch (MarshalException|UnknownColumnException e) { throw new InvalidRequestException(e.getMessage()); } - doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level)))); } catch (RequestValidationException e) { @@ -1797,7 +2160,7 @@ public class CassandraServer implements Cassandra.Iface try { - internal_remove(key, path, System.currentTimeMillis(), consistency_level, true); + internal_remove(key, path, FBUtilities.timestampMicros(), consistency_level, true); } catch (RequestValidationException e) { @@ -1869,7 +2232,7 @@ public class CassandraServer implements Cassandra.Iface public CqlResult execute_cql_query(ByteBuffer query, Compression compression) throws TException { - throw new InvalidRequestException("CQL2 has been removed in Cassandra 2.2. Please use CQL3 instead"); + throw new InvalidRequestException("CQL2 has been removed in Cassandra 3.0. Please use CQL3 instead"); } public CqlResult execute_cql3_query(ByteBuffer query, Compression compression, ConsistencyLevel cLevel) throws TException @@ -1890,7 +2253,8 @@ public class CassandraServer implements Cassandra.Iface ThriftClientState cState = state(); return ClientState.getCQLQueryHandler().process(queryString, cState.getQueryState(), - QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList()), + QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), + Collections.<ByteBuffer>emptyList()), null).toThriftResult(); } catch (RequestExecutionException e) @@ -1909,7 +2273,7 @@ public class CassandraServer implements Cassandra.Iface public CqlPreparedResult prepare_cql_query(ByteBuffer query, Compression compression) throws TException { - throw new InvalidRequestException("CQL2 has been removed in Cassandra 2.2. Please use CQL3 instead"); + throw new InvalidRequestException("CQL2 has been removed in Cassandra 3.0. Please use CQL3 instead"); } public CqlPreparedResult prepare_cql3_query(ByteBuffer query, Compression compression) throws TException @@ -1922,9 +2286,7 @@ public class CassandraServer implements Cassandra.Iface try { cState.validateLogin(); - return ClientState.getCQLQueryHandler().prepare(queryString, - cState.getQueryState(), - null).toThriftPreparedResult(); + return ClientState.getCQLQueryHandler().prepare(queryString, cState.getQueryState(), null).toThriftPreparedResult(); } catch (RequestValidationException e) { @@ -1934,7 +2296,7 @@ public class CassandraServer implements Cassandra.Iface public CqlResult execute_prepared_cql_query(int itemId, List<ByteBuffer> bindVariables) throws TException { - throw new InvalidRequestException("CQL2 has been removed in Cassandra 2.2. Please use CQL3 instead"); + throw new InvalidRequestException("CQL2 has been removed in Cassandra 3.0. Please use CQL3 instead"); } public CqlResult execute_prepared_cql3_query(int itemId, List<ByteBuffer> bindVariables, ConsistencyLevel cLevel) throws TException @@ -2003,33 +2365,41 @@ public class CassandraServer implements Cassandra.Iface String keyspace = cState.getKeyspace(); state().hasColumnFamilyAccess(keyspace, request.getColumn_parent().column_family, Permission.SELECT); CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, request.getColumn_parent().column_family); - if (metadata.cfType == ColumnFamilyType.Super) + if (metadata.isSuper()) throw new org.apache.cassandra.exceptions.InvalidRequestException("get_multi_slice does not support super columns"); ThriftValidation.validateColumnParent(metadata, request.getColumn_parent()); org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(request.getConsistency_level()); consistencyLevel.validateForRead(keyspace); - List<ReadCommand> commands = new ArrayList<>(1); - ColumnSlice[] slices = new ColumnSlice[request.getColumn_slices().size()]; + + Slices.Builder builder = new Slices.Builder(metadata.comparator, request.getColumn_slices().size()); for (int i = 0 ; i < request.getColumn_slices().size() ; i++) { fixOptionalSliceParameters(request.getColumn_slices().get(i)); - Composite start = metadata.comparator.fromByteBuffer(request.getColumn_slices().get(i).start); - Composite finish = metadata.comparator.fromByteBuffer(request.getColumn_slices().get(i).finish); - if (!start.isEmpty() && !finish.isEmpty()) - { - int compare = metadata.comparator.compare(start, finish); - if (!request.reversed && compare > 0) - throw new InvalidRequestException(String.format("Column slice at index %d had start greater than finish", i)); - else if (request.reversed && compare < 0) - throw new InvalidRequestException(String.format("Reversed column slice at index %d had start less than finish", i)); - } - slices[i] = new ColumnSlice(start, finish); + Slice.Bound start = LegacyLayout.decodeBound(metadata, request.getColumn_slices().get(i).start, true).bound; + Slice.Bound finish = LegacyLayout.decodeBound(metadata, request.getColumn_slices().get(i).finish, false).bound; + + int compare = metadata.comparator.compare(start, finish); + if (!request.reversed && compare > 0) + throw new InvalidRequestException(String.format("Column slice at index %d had start greater than finish", i)); + else if (request.reversed && compare < 0) + throw new InvalidRequestException(String.format("Reversed column slice at index %d had start less than finish", i)); + + builder.add(request.reversed ? Slice.make(finish, start) : Slice.make(start, finish)); } - ColumnSlice[] deoverlapped = ColumnSlice.deoverlapSlices(slices, request.reversed ? metadata.comparator.reverseComparator() : metadata.comparator); - SliceQueryFilter filter = new SliceQueryFilter(deoverlapped, request.reversed, request.count); +
<TRUNCATED>
