http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RowUpdateBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java new file mode 100644 index 0000000..c3f3d29 --- /dev/null +++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.ListType; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.*; + +/** + * Convenience object to create single row updates. + * + * This is meant for system table update, when performance is not of the utmost importance. + */ +public class RowUpdateBuilder +{ + private final PartitionUpdate update; + + private final LivenessInfo defaultLiveness; + private final LivenessInfo deletionLiveness; + private final DeletionTime deletionTime; + + private final Mutation mutation; + + private Row.Writer regularWriter; + private Row.Writer staticWriter; + + private boolean hasSetClustering; + private boolean useRowMarker = true; + + private RowUpdateBuilder(PartitionUpdate update, long timestamp, int ttl, int localDeletionTime, Mutation mutation) + { + this.update = update; + + this.defaultLiveness = SimpleLivenessInfo.forUpdate(timestamp, ttl, localDeletionTime, update.metadata()); + this.deletionLiveness = SimpleLivenessInfo.forDeletion(timestamp, localDeletionTime); + this.deletionTime = new SimpleDeletionTime(timestamp, localDeletionTime); + + // note that the created mutation may get further update later on, so we don't use the ctor that create a singletonMap + // underneath (this class if for convenience, not performance) + this.mutation = mutation == null ? new Mutation(update.metadata().ksName, update.partitionKey()).add(update) : mutation; + } + + private RowUpdateBuilder(PartitionUpdate update, long timestamp, int ttl, Mutation mutation) + { + this(update, timestamp, ttl, FBUtilities.nowInSeconds(), mutation); + } + + private Row.Writer writer() + { + assert staticWriter == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; + if (regularWriter == null) + { + regularWriter = update.writer(); + + // If a CQL table, add the "row marker" + if (update.metadata().isCQLTable() && useRowMarker) + regularWriter.writePartitionKeyLivenessInfo(defaultLiveness); + } + return regularWriter; + } + + private Row.Writer staticWriter() + { + assert regularWriter == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; + if (staticWriter == null) + staticWriter = update.staticWriter(); + return staticWriter; + } + + private Row.Writer writer(ColumnDefinition c) + { + return c.isStatic() ? staticWriter() : writer(); + } + + public RowUpdateBuilder(CFMetaData metadata, long timestamp, Object partitionKey) + { + this(metadata, FBUtilities.nowInSeconds(), timestamp, partitionKey); + } + + public RowUpdateBuilder(CFMetaData metadata, int localDeletionTime, long timestamp, Object partitionKey) + { + this(metadata, localDeletionTime, timestamp, metadata.getDefaultTimeToLive(), partitionKey); + } + + public RowUpdateBuilder(CFMetaData metadata, long timestamp, int ttl, Object partitionKey) + { + this(metadata, FBUtilities.nowInSeconds(), timestamp, ttl, partitionKey); + } + + public RowUpdateBuilder(CFMetaData metadata, int localDeletionTime, long timestamp, int ttl, Object partitionKey) + { + this(new PartitionUpdate(metadata, makeKey(metadata, partitionKey), metadata.partitionColumns(), 1), timestamp, ttl, localDeletionTime, null); + } + + public RowUpdateBuilder(CFMetaData metadata, long timestamp, Mutation mutation) + { + this(metadata, timestamp, LivenessInfo.NO_TTL, mutation); + } + + public RowUpdateBuilder(CFMetaData metadata, long timestamp, int ttl, Mutation mutation) + { + this(getOrAdd(metadata, mutation), timestamp, ttl, mutation); + } + + public RowUpdateBuilder(PartitionUpdate update, long timestamp, int ttl) + { + this(update, timestamp, ttl, null); + } + + // This must be called before any addition or deletion if used. + public RowUpdateBuilder noRowMarker() + { + this.useRowMarker = false; + return this; + } + + public RowUpdateBuilder clustering(Object... clusteringValues) + { + assert clusteringValues.length == update.metadata().comparator.size() + : "Invalid clustering values length. Expected: " + update.metadata().comparator.size() + " got: " + clusteringValues.length; + hasSetClustering = true; + if (clusteringValues.length > 0) + Rows.writeClustering(update.metadata().comparator.make(clusteringValues), writer()); + return this; + } + + public Mutation build() + { + Row.Writer writer = regularWriter == null ? staticWriter : regularWriter; + if (writer != null) + writer.endOfRow(); + return mutation; + } + + public PartitionUpdate buildUpdate() + { + build(); + return update; + } + + private static void deleteRow(PartitionUpdate update, long timestamp, Object...clusteringValues) + { + assert clusteringValues.length == update.metadata().comparator.size() || (clusteringValues.length == 0 && !update.columns().statics.isEmpty()); + + Row.Writer writer = clusteringValues.length == update.metadata().comparator.size() + ? update.writer() + : update.staticWriter(); + + if (clusteringValues.length > 0) + Rows.writeClustering(update.metadata().comparator.make(clusteringValues), writer); + writer.writeRowDeletion(new SimpleDeletionTime(timestamp, FBUtilities.nowInSeconds())); + writer.endOfRow(); + } + + public static Mutation deleteRow(CFMetaData metadata, long timestamp, Mutation mutation, Object... clusteringValues) + { + deleteRow(getOrAdd(metadata, mutation), timestamp, clusteringValues); + return mutation; + } + + public static Mutation deleteRow(CFMetaData metadata, long timestamp, Object key, Object... clusteringValues) + { + PartitionUpdate update = new PartitionUpdate(metadata, makeKey(metadata, key), metadata.partitionColumns(), 0); + deleteRow(update, timestamp, clusteringValues); + // note that the created mutation may get further update later on, so we don't use the ctor that create a singletonMap + // underneath (this class if for convenience, not performance) + return new Mutation(update.metadata().ksName, update.partitionKey()).add(update); + } + + private static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey) + { + if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey) + return (DecoratedKey)partitionKey[0]; + + ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey)); + return StorageService.getPartitioner().decorateKey(key); + } + + private static PartitionUpdate getOrAdd(CFMetaData metadata, Mutation mutation) + { + PartitionUpdate upd = mutation.get(metadata); + if (upd == null) + { + upd = new PartitionUpdate(metadata, mutation.key(), metadata.partitionColumns(), 1); + mutation.add(upd); + } + return upd; + } + + public RowUpdateBuilder resetCollection(String columnName) + { + ColumnDefinition c = getDefinition(columnName); + assert c != null : "Cannot find column " + columnName; + assert c.isStatic() || update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non static column " + c + " since no clustering hasn't been provided"; + assert c.type.isCollection() && c.type.isMultiCell(); + writer(c).writeComplexDeletion(c, new SimpleDeletionTime(defaultLiveness.timestamp() - 1, deletionTime.localDeletionTime())); + return this; + } + + public RowUpdateBuilder addRangeTombstone(RangeTombstone rt) + { + update.addRangeTombstone(rt); + return this; + } + + public RowUpdateBuilder addRangeTombstone(Slice slice) + { + update.addRangeTombstone(slice, deletionTime); + return this; + } + + public RowUpdateBuilder addRangeTombstone(Object start, Object end) + { + ClusteringComparator cmp = update.metadata().comparator; + Slice slice = Slice.make(cmp.make(start), cmp.make(end)); + return addRangeTombstone(slice); + } + + public RowUpdateBuilder add(String columnName, Object value) + { + ColumnDefinition c = getDefinition(columnName); + assert c != null : "Cannot find column " + columnName; + return add(c, value); + } + + public RowUpdateBuilder add(ColumnDefinition columnDefinition, Object value) + { + assert columnDefinition.isStatic() || update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non static column " + columnDefinition + " since no clustering hasn't been provided"; + if (value == null) + writer(columnDefinition).writeCell(columnDefinition, false, ByteBufferUtil.EMPTY_BYTE_BUFFER, deletionLiveness, null); + else + writer(columnDefinition).writeCell(columnDefinition, false, bb(value, columnDefinition.type), defaultLiveness, null); + return this; + } + + public RowUpdateBuilder delete(String columnName) + { + ColumnDefinition c = getDefinition(columnName); + assert c != null : "Cannot find column " + columnName; + return delete(c); + } + + public RowUpdateBuilder delete(ColumnDefinition columnDefinition) + { + return add(columnDefinition, null); + } + + private ByteBuffer bb(Object value, AbstractType<?> type) + { + if (value instanceof ByteBuffer) + return (ByteBuffer)value; + + if (type.isCounter()) + { + // See UpdateParameters.addCounter() + assert value instanceof Long : "Attempted to adjust Counter cell with non-long value."; + return CounterContext.instance().createGlobal(CounterId.getLocalId(), 1, (Long)value); + } + return ((AbstractType)type).decompose(value); + } + + public RowUpdateBuilder addMapEntry(String columnName, Object key, Object value) + { + ColumnDefinition c = getDefinition(columnName); + assert c.isStatic() || update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non static column " + c + " since no clustering hasn't been provided"; + assert c.type instanceof MapType; + MapType mt = (MapType)c.type; + writer(c).writeCell(c, false, bb(value, mt.getValuesType()), defaultLiveness, CellPath.create(bb(key, mt.getKeysType()))); + return this; + } + + public RowUpdateBuilder addListEntry(String columnName, Object value) + { + ColumnDefinition c = getDefinition(columnName); + assert c.isStatic() || hasSetClustering : "Cannot set non static column " + c + " since no clustering hasn't been provided"; + assert c.type instanceof ListType; + ListType lt = (ListType)c.type; + writer(c).writeCell(c, false, bb(value, lt.getElementsType()), defaultLiveness, CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()))); + return this; + } + + private ColumnDefinition getDefinition(String name) + { + return update.metadata().getColumnDefinition(new ColumnIdentifier(name, true)); + } + + public UnfilteredRowIterator unfilteredIterator() + { + return update.unfilteredIterator(); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SerializationHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java new file mode 100644 index 0000000..304332e --- /dev/null +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.base.Function; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.TypeParser; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; +import org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class SerializationHeader +{ + private static final int DEFAULT_BASE_DELETION = computeDefaultBaseDeletion(); + + public static final Serializer serializer = new Serializer(); + + private final AbstractType<?> keyType; + private final List<AbstractType<?>> clusteringTypes; + + private final PartitionColumns columns; + private final RowStats stats; + + private final Map<ByteBuffer, AbstractType<?>> typeMap; + + private final long baseTimestamp; + public final int baseDeletionTime; + private final int baseTTL; + + // Whether or not to store cell in a sparse or dense way. See UnfilteredSerializer for details. + private final boolean useSparseColumnLayout; + + private SerializationHeader(AbstractType<?> keyType, + List<AbstractType<?>> clusteringTypes, + PartitionColumns columns, + RowStats stats, + Map<ByteBuffer, AbstractType<?>> typeMap) + { + this.keyType = keyType; + this.clusteringTypes = clusteringTypes; + this.columns = columns; + this.stats = stats; + this.typeMap = typeMap; + + // Not that if a given stats is unset, it means that either it's unused (there is + // no tombstone whatsoever for instance) or that we have no information on it. In + // that former case, it doesn't matter which base we use but in the former, we use + // bases that are more likely to provide small encoded values than the default + // "unset" value. + this.baseTimestamp = stats.hasMinTimestamp() ? stats.minTimestamp : 0; + this.baseDeletionTime = stats.hasMinLocalDeletionTime() ? stats.minLocalDeletionTime : DEFAULT_BASE_DELETION; + this.baseTTL = stats.minTTL; + + // For the dense layout, we have a 1 byte overhead for absent columns. For the sparse layout, it's a 1 + // overhead for present columns (in fact we use a 2 byte id, but assuming vint encoding, we'll pay 2 bytes + // only for the columns after the 128th one and for simplicity we assume that once you have that many column, + // you'll tend towards a clearly dense or clearly sparse case so that the heurstic above shouldn't still be + // too inapropriate). So if on average more than half of our columns are set per row, we better go for sparse. + this.useSparseColumnLayout = stats.avgColumnSetPerRow <= (columns.regulars.columnCount()/ 2); + } + + public boolean useSparseColumnLayout(boolean isStatic) + { + // We always use a dense layout for the static row. Having very many static columns with only a few set at + // any given time doesn't feel very common at all (and we already optimize the case where no static at all + // are provided). + return isStatic ? false : useSparseColumnLayout; + } + + public static SerializationHeader forKeyCache(CFMetaData metadata) + { + // We don't save type information in the key cache (we could change + // that but it's easier right now), so instead we simply use BytesType + // for both serialization and deserialization. Note that we also only + // serializer clustering prefixes in the key cache, so only the clusteringTypes + // really matter. + int size = metadata.clusteringColumns().size(); + List<AbstractType<?>> clusteringTypes = new ArrayList<>(size); + for (int i = 0; i < size; i++) + clusteringTypes.add(BytesType.instance); + return new SerializationHeader(BytesType.instance, + clusteringTypes, + PartitionColumns.NONE, + RowStats.NO_STATS, + Collections.<ByteBuffer, AbstractType<?>>emptyMap()); + } + + public static SerializationHeader make(CFMetaData metadata, Collection<SSTableReader> sstables) + { + // The serialization header has to be computed before the start of compaction (since it's used to write) + // the result. This means that when compacting multiple sources, we won't have perfectly accurate stats + // (for RowStats) since compaction may delete, purge and generally merge rows in unknown ways. This is + // kind of ok because those stats are only used for optimizing the underlying storage format and so we + // just have to strive for as good as possible. Currently, we stick to a relatively naive merge of existing + // global stats because it's simple and probably good enough in most situation but we could probably + // improve our marging of inaccuracy through the use of more fine-grained stats in the future. + // Note however that to avoid seeing our accuracy degrade through successive compactions, we don't base + // our stats merging on the compacted files headers, which as we just said can be somewhat inaccurate, + // but rather on their stats stored in StatsMetadata that are fully accurate. + RowStats.Collector stats = new RowStats.Collector(); + PartitionColumns.Builder columns = PartitionColumns.builder(); + for (SSTableReader sstable : sstables) + { + stats.updateTimestamp(sstable.getMinTimestamp()); + stats.updateLocalDeletionTime(sstable.getMinLocalDeletionTime()); + stats.updateTTL(sstable.getMinTTL()); + stats.updateColumnSetPerRow(sstable.getTotalColumnsSet(), sstable.getTotalRows()); + if (sstable.header == null) + columns.addAll(metadata.partitionColumns()); + else + columns.addAll(sstable.header.columns()); + } + return new SerializationHeader(metadata, columns.build(), stats.get()); + } + + public SerializationHeader(CFMetaData metadata, + PartitionColumns columns, + RowStats stats) + { + this(metadata.getKeyValidator(), + typesOf(metadata.clusteringColumns()), + columns, + stats, + null); + } + + private static List<AbstractType<?>> typesOf(List<ColumnDefinition> columns) + { + return ImmutableList.copyOf(Lists.transform(columns, new Function<ColumnDefinition, AbstractType<?>>() + { + public AbstractType<?> apply(ColumnDefinition column) + { + return column.type; + } + })); + } + + public PartitionColumns columns() + { + return columns; + } + + public boolean hasStatic() + { + return !columns.statics.isEmpty(); + } + + private static int computeDefaultBaseDeletion() + { + // We need a fixed default, but one that is likely to provide small values (close to 0) when + // substracted to deletion times. Since deletion times are 'the current time in seconds', we + // use as base Jan 1, 2015 (in seconds). + Calendar c = Calendar.getInstance(TimeZone.getTimeZone("GMT-0"), Locale.US); + c.set(Calendar.YEAR, 2015); + c.set(Calendar.MONTH, Calendar.JANUARY); + c.set(Calendar.DAY_OF_MONTH, 1); + c.set(Calendar.HOUR_OF_DAY, 0); + c.set(Calendar.MINUTE, 0); + c.set(Calendar.SECOND, 0); + c.set(Calendar.MILLISECOND, 0); + return (int)(c.getTimeInMillis() / 1000); + } + + public RowStats stats() + { + return stats; + } + + public AbstractType<?> keyType() + { + return keyType; + } + + public List<AbstractType<?>> clusteringTypes() + { + return clusteringTypes; + } + + public Columns columns(boolean isStatic) + { + return isStatic ? columns.statics : columns.regulars; + } + + public AbstractType<?> getType(ColumnDefinition column) + { + return typeMap == null ? column.type : typeMap.get(column.name.bytes); + } + + public long encodeTimestamp(long timestamp) + { + return timestamp - baseTimestamp; + } + + public long decodeTimestamp(long timestamp) + { + return baseTimestamp + timestamp; + } + + public int encodeDeletionTime(int deletionTime) + { + return deletionTime - baseDeletionTime; + } + + public int decodeDeletionTime(int deletionTime) + { + return baseDeletionTime + deletionTime; + } + + public int encodeTTL(int ttl) + { + return ttl - baseTTL; + } + + public int decodeTTL(int ttl) + { + return baseTTL + ttl; + } + + public Component toComponent() + { + Map<ByteBuffer, AbstractType<?>> staticColumns = new LinkedHashMap<>(); + Map<ByteBuffer, AbstractType<?>> regularColumns = new LinkedHashMap<>(); + for (ColumnDefinition column : columns.statics) + staticColumns.put(column.name.bytes, column.type); + for (ColumnDefinition column : columns.regulars) + regularColumns.put(column.name.bytes, column.type); + return new Component(keyType, clusteringTypes, staticColumns, regularColumns, stats); + } + + @Override + public String toString() + { + return String.format("SerializationHeader[key=%s, cks=%s, columns=%s, stats=%s, typeMap=%s, baseTs=%d, baseDt=%s, baseTTL=%s]", + keyType, clusteringTypes, columns, stats, typeMap, baseTimestamp, baseDeletionTime, baseTTL); + } + + /** + * We need the CFMetadata to properly deserialize a SerializationHeader but it's clunky to pass that to + * a SSTable component, so we use this temporary object to delay the actual need for the metadata. + */ + public static class Component extends MetadataComponent + { + private final AbstractType<?> keyType; + private final List<AbstractType<?>> clusteringTypes; + private final Map<ByteBuffer, AbstractType<?>> staticColumns; + private final Map<ByteBuffer, AbstractType<?>> regularColumns; + private final RowStats stats; + + private Component(AbstractType<?> keyType, + List<AbstractType<?>> clusteringTypes, + Map<ByteBuffer, AbstractType<?>> staticColumns, + Map<ByteBuffer, AbstractType<?>> regularColumns, + RowStats stats) + { + this.keyType = keyType; + this.clusteringTypes = clusteringTypes; + this.staticColumns = staticColumns; + this.regularColumns = regularColumns; + this.stats = stats; + } + + public MetadataType getType() + { + return MetadataType.HEADER; + } + + public SerializationHeader toHeader(CFMetaData metadata) + { + Map<ByteBuffer, AbstractType<?>> typeMap = new HashMap<>(staticColumns.size() + regularColumns.size()); + typeMap.putAll(staticColumns); + typeMap.putAll(regularColumns); + + PartitionColumns.Builder builder = PartitionColumns.builder(); + for (ByteBuffer name : typeMap.keySet()) + { + ColumnDefinition column = metadata.getColumnDefinition(name); + if (column == null) + { + // TODO: this imply we don't read data for a column we don't yet know about, which imply this is theoretically + // racy with column addition. Currently, it is up to the user to not write data before the schema has propagated + // and this is far from being the only place that has such problem in practice. This doesn't mean we shouldn't + // improve this. + + // If we don't find the definition, it could be we have data for a dropped column, and we shouldn't + // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper + // deserialization. The column will be ignore later on anyway. + column = metadata.getDroppedColumnDefinition(name); + if (column == null) + throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization"); + } + builder.add(column); + } + return new SerializationHeader(keyType, clusteringTypes, builder.build(), stats, typeMap); + } + + @Override + public boolean equals(Object o) + { + if(!(o instanceof Component)) + return false; + + Component that = (Component)o; + return Objects.equals(this.keyType, that.keyType) + && Objects.equals(this.clusteringTypes, that.clusteringTypes) + && Objects.equals(this.staticColumns, that.staticColumns) + && Objects.equals(this.regularColumns, that.regularColumns) + && Objects.equals(this.stats, that.stats); + } + + @Override + public int hashCode() + { + return Objects.hash(keyType, clusteringTypes, staticColumns, regularColumns, stats); + } + + @Override + public String toString() + { + return String.format("SerializationHeader.Component[key=%s, cks=%s, statics=%s, regulars=%s, stats=%s]", + keyType, clusteringTypes, staticColumns, regularColumns, stats); + } + } + + public static class Serializer implements IMetadataComponentSerializer<Component> + { + public void serializeForMessaging(SerializationHeader header, DataOutputPlus out, boolean hasStatic) throws IOException + { + RowStats.serializer.serialize(header.stats, out); + + if (hasStatic) + Columns.serializer.serialize(header.columns.statics, out); + Columns.serializer.serialize(header.columns.regulars, out); + } + + public SerializationHeader deserializeForMessaging(DataInput in, CFMetaData metadata, boolean hasStatic) throws IOException + { + RowStats stats = RowStats.serializer.deserialize(in); + + AbstractType<?> keyType = metadata.getKeyValidator(); + List<AbstractType<?>> clusteringTypes = typesOf(metadata.clusteringColumns()); + + Columns statics = hasStatic ? Columns.serializer.deserialize(in, metadata) : Columns.NONE; + Columns regulars = Columns.serializer.deserialize(in, metadata); + + return new SerializationHeader(keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null); + } + + public long serializedSizeForMessaging(SerializationHeader header, TypeSizes sizes, boolean hasStatic) + { + long size = RowStats.serializer.serializedSize(header.stats, sizes); + + if (hasStatic) + size += Columns.serializer.serializedSize(header.columns.statics, sizes); + size += Columns.serializer.serializedSize(header.columns.regulars, sizes); + return size; + } + + // For SSTables + public void serialize(Component header, DataOutputPlus out) throws IOException + { + RowStats.serializer.serialize(header.stats, out); + + writeType(header.keyType, out); + out.writeShort(header.clusteringTypes.size()); + for (AbstractType<?> type : header.clusteringTypes) + writeType(type, out); + + writeColumnsWithTypes(header.staticColumns, out); + writeColumnsWithTypes(header.regularColumns, out); + } + + // For SSTables + public Component deserialize(Version version, DataInput in) throws IOException + { + RowStats stats = RowStats.serializer.deserialize(in); + + AbstractType<?> keyType = readType(in); + int size = in.readUnsignedShort(); + List<AbstractType<?>> clusteringTypes = new ArrayList<>(size); + for (int i = 0; i < size; i++) + clusteringTypes.add(readType(in)); + + Map<ByteBuffer, AbstractType<?>> staticColumns = new LinkedHashMap<>(); + Map<ByteBuffer, AbstractType<?>> regularColumns = new LinkedHashMap<>(); + + readColumnsWithType(in, staticColumns); + readColumnsWithType(in, regularColumns); + + return new Component(keyType, clusteringTypes, staticColumns, regularColumns, stats); + } + + // For SSTables + public int serializedSize(Component header) + { + TypeSizes sizes = TypeSizes.NATIVE; + int size = RowStats.serializer.serializedSize(header.stats, sizes); + + size += sizeofType(header.keyType, sizes); + size += sizes.sizeof((short)header.clusteringTypes.size()); + for (AbstractType<?> type : header.clusteringTypes) + size += sizeofType(type, sizes); + + size += sizeofColumnsWithTypes(header.staticColumns, sizes); + size += sizeofColumnsWithTypes(header.regularColumns, sizes); + return size; + } + + private void writeColumnsWithTypes(Map<ByteBuffer, AbstractType<?>> columns, DataOutputPlus out) throws IOException + { + out.writeShort(columns.size()); + for (Map.Entry<ByteBuffer, AbstractType<?>> entry : columns.entrySet()) + { + ByteBufferUtil.writeWithShortLength(entry.getKey(), out); + writeType(entry.getValue(), out); + } + } + + private long sizeofColumnsWithTypes(Map<ByteBuffer, AbstractType<?>> columns, TypeSizes sizes) + { + long size = sizes.sizeof((short)columns.size()); + for (Map.Entry<ByteBuffer, AbstractType<?>> entry : columns.entrySet()) + { + size += sizes.sizeofWithShortLength(entry.getKey()); + size += sizeofType(entry.getValue(), sizes); + } + return size; + } + + private void readColumnsWithType(DataInput in, Map<ByteBuffer, AbstractType<?>> typeMap) throws IOException + { + int length = in.readUnsignedShort(); + for (int i = 0; i < length; i++) + { + ByteBuffer name = ByteBufferUtil.readWithShortLength(in); + typeMap.put(name, readType(in)); + } + } + + private void writeType(AbstractType<?> type, DataOutputPlus out) throws IOException + { + // TODO: we should have a terser serializaion format. Not a big deal though + ByteBufferUtil.writeWithLength(UTF8Type.instance.decompose(type.toString()), out); + } + + private AbstractType<?> readType(DataInput in) throws IOException + { + ByteBuffer raw = ByteBufferUtil.readWithLength(in); + return TypeParser.parse(UTF8Type.instance.compose(raw)); + } + + private int sizeofType(AbstractType<?> type, TypeSizes sizes) + { + return sizes.sizeofWithLength(UTF8Type.instance.decompose(type.toString())); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Serializers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java new file mode 100644 index 0000000..862d02e --- /dev/null +++ b/src/java/org/apache/cassandra/db/Serializers.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.sstable.format.Version; + +import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; + +/** + * Holds references on serializers that depend on the table definition. + */ +public class Serializers +{ + private final CFMetaData metadata; + + public Serializers(CFMetaData metadata) + { + this.metadata = metadata; + } + + public IndexInfo.Serializer indexSerializer(Version version) + { + return new IndexInfo.Serializer(metadata, version); + } + + // Note that for the old layout, this will actually discard the cellname parts that are not strictly + // part of the clustering prefix. Don't use this if that's not what you want. + public ISerializer<ClusteringPrefix> clusteringPrefixSerializer(final Version version, final SerializationHeader header) + { + if (!version.storeRows()) + throw new UnsupportedOperationException(); + + return new ISerializer<ClusteringPrefix>() + { + public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException + { + ClusteringPrefix.serializer.serialize(clustering, out, version.correspondingMessagingVersion(), header.clusteringTypes()); + } + + public ClusteringPrefix deserialize(DataInput in) throws IOException + { + return ClusteringPrefix.serializer.deserialize(in, version.correspondingMessagingVersion(), header.clusteringTypes()); + } + + public long serializedSize(ClusteringPrefix clustering, TypeSizes sizes) + { + return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), header.clusteringTypes(), sizes); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SimpleClustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SimpleClustering.java b/src/java/org/apache/cassandra/db/SimpleClustering.java new file mode 100644 index 0000000..8b1cb7b --- /dev/null +++ b/src/java/org/apache/cassandra/db/SimpleClustering.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.utils.ObjectSizes; + +public class SimpleClustering extends Clustering +{ + private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleClustering(new ByteBuffer[0])); + + private final ByteBuffer[] values; + + public SimpleClustering(ByteBuffer... values) + { + this.values = values; + } + + public SimpleClustering(ByteBuffer value) + { + this(new ByteBuffer[]{ value }); + } + + public int size() + { + return values.length; + } + + public ByteBuffer get(int i) + { + return values[i]; + } + + public ByteBuffer[] getRawValues() + { + return values; + } + + @Override + public long unsharedHeapSize() + { + return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values); + } + + @Override + public Clustering takeAlias() + { + return this; + } + + public static Builder builder(int size) + { + return new Builder(size); + } + + public static class Builder implements Writer + { + private final ByteBuffer[] values; + private int idx; + + private Builder(int size) + { + this.values = new ByteBuffer[size]; + } + + public void writeClusteringValue(ByteBuffer value) + { + values[idx++] = value; + } + + public SimpleClustering build() + { + assert idx == values.length; + return new SimpleClustering(values); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SimpleDeletionTime.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SimpleDeletionTime.java b/src/java/org/apache/cassandra/db/SimpleDeletionTime.java new file mode 100644 index 0000000..738c5e6 --- /dev/null +++ b/src/java/org/apache/cassandra/db/SimpleDeletionTime.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; + +import org.apache.cassandra.cache.IMeasurableMemory; +import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.ObjectSizes; + +/** + * Simple implementation of DeletionTime. + */ +public class SimpleDeletionTime extends DeletionTime +{ + public final long markedForDeleteAt; + public final int localDeletionTime; + + @VisibleForTesting + public SimpleDeletionTime(long markedForDeleteAt, int localDeletionTime) + { + this.markedForDeleteAt = markedForDeleteAt; + this.localDeletionTime = localDeletionTime; + } + + public long markedForDeleteAt() + { + return markedForDeleteAt; + } + + public int localDeletionTime() + { + return localDeletionTime; + } + + public DeletionTime takeAlias() + { + return this; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java b/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java new file mode 100644 index 0000000..fea1b86 --- /dev/null +++ b/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.util.Objects; +import java.security.MessageDigest; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.utils.FBUtilities; + +public class SimpleLivenessInfo extends AbstractLivenessInfo +{ + private final long timestamp; + private final int ttl; + private final int localDeletionTime; + + // Note that while some code use this ctor, the two following static creation methods + // are usually less error prone. + SimpleLivenessInfo(long timestamp, int ttl, int localDeletionTime) + { + this.timestamp = timestamp; + this.ttl = ttl; + this.localDeletionTime = localDeletionTime; + } + + public static SimpleLivenessInfo forUpdate(long timestamp, int ttl, int nowInSec, CFMetaData metadata) + { + if (ttl == NO_TTL) + ttl = metadata.getDefaultTimeToLive(); + + return new SimpleLivenessInfo(timestamp, ttl, ttl == NO_TTL ? NO_DELETION_TIME : nowInSec + ttl); + } + + public static SimpleLivenessInfo forDeletion(long timestamp, int localDeletionTime) + { + return new SimpleLivenessInfo(timestamp, NO_TTL, localDeletionTime); + } + + public long timestamp() + { + return timestamp; + } + + public int ttl() + { + return ttl; + } + + public int localDeletionTime() + { + return localDeletionTime; + } + + @Override + public LivenessInfo takeAlias() + { + return this; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java new file mode 100644 index 0000000..d359b2b --- /dev/null +++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.util.*; + +import com.google.common.collect.Sets; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler; +import org.apache.cassandra.thrift.ThriftResultsMerger; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.SearchIterator; +import org.apache.cassandra.utils.memory.HeapAllocator; + +/** + * General interface for storage-engine read queries. + */ +public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<ClusteringIndexNamesFilter> +{ + protected SinglePartitionNamesCommand(boolean isDigest, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexNamesFilter clusteringIndexFilter) + { + super(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + } + + public SinglePartitionNamesCommand(CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexNamesFilter clusteringIndexFilter) + { + this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + } + + public SinglePartitionNamesCommand copy() + { + return new SinglePartitionNamesCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); + } + + protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap) + { + Tracing.trace("Acquiring sstable references"); + ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(partitionKey())); + + ArrayBackedPartition result = null; + ClusteringIndexNamesFilter filter = clusteringIndexFilter(); + + Tracing.trace("Merging memtable contents"); + for (Memtable memtable : view.memtables) + { + Partition partition = memtable.getPartition(partitionKey()); + if (partition == null) + continue; + + try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition)) + { + if (iter.isEmpty()) + continue; + + UnfilteredRowIterator clonedFilter = copyOnHeap + ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) + : iter; + result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result); + } + } + + /* add the SSTables on disk */ + Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); + int sstablesIterated = 0; + + // read sorted sstables + for (SSTableReader sstable : view.sstables) + { + // if we've already seen a partition tombstone with a timestamp greater + // than the most recent update to this sstable, we're done, since the rest of the sstables + // will also be older + if (result != null && sstable.getMaxTimestamp() < result.partitionLevelDeletion().markedForDeleteAt()) + break; + + long currentMaxTs = sstable.getMaxTimestamp(); + filter = reduceFilter(filter, result, currentMaxTs); + if (filter == null) + break; + + Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation); + sstable.incrementReadCount(); + try (UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()))) + { + if (iter.isEmpty()) + continue; + + sstablesIterated++; + result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result); + } + } + + cfs.metric.updateSSTableIterated(sstablesIterated); + + if (result == null || result.isEmpty()) + return UnfilteredRowIterators.emptyIterator(metadata(), partitionKey(), false); + + DecoratedKey key = result.partitionKey(); + cfs.metric.samplers.get(Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); + + // "hoist up" the requested data into a more recent sstable + if (sstablesIterated > cfs.getMinimumCompactionThreshold() + && !cfs.isAutoCompactionDisabled() + && cfs.getCompactionStrategyManager().shouldDefragment()) + { + // !!WARNING!! if we stop copying our data to a heap-managed object, + // we will need to track the lifetime of this mutation as well + Tracing.trace("Defragmenting requested data"); + + try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false)) + { + final Mutation mutation = new Mutation(UnfilteredRowIterators.toUpdate(iter)); + StageManager.getStage(Stage.MUTATION).execute(new Runnable() + { + public void run() + { + // skipping commitlog and index updates is fine since we're just de-fragmenting existing data + Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false); + } + }); + } + } + + return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed()); + } + + private ArrayBackedPartition add(UnfilteredRowIterator iter, ArrayBackedPartition result) + { + int maxRows = Math.max(clusteringIndexFilter().requestedRows().size(), 1); + if (result == null) + return ArrayBackedPartition.create(iter, maxRows); + + try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, false)), nowInSec())) + { + return ArrayBackedPartition.create(merged, maxRows); + } + } + + private ClusteringIndexNamesFilter reduceFilter(ClusteringIndexNamesFilter filter, Partition result, long sstableTimestamp) + { + if (result == null) + return filter; + + SearchIterator<Clustering, Row> searchIter = result.searchIterator(columnFilter(), false); + + PartitionColumns columns = columnFilter().fetchedColumns(); + NavigableSet<Clustering> clusterings = filter.requestedRows(); + + // We want to remove rows for which we have values for all requested columns. We have to deal with both static and regular rows. + // TODO: we could also remove a selected column if we've found values for every requested row but we'll leave + // that for later. + + boolean removeStatic = false; + if (!columns.statics.isEmpty()) + { + Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING); + removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp); + } + + NavigableSet<Clustering> toRemove = null; + for (Clustering clustering : clusterings) + { + if (!searchIter.hasNext()) + break; + + Row row = searchIter.next(clustering); + if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp)) + continue; + + if (toRemove == null) + toRemove = new TreeSet<>(result.metadata().comparator); + toRemove.add(clustering); + } + + if (!removeStatic && toRemove == null) + return filter; + + // Check if we have everything we need + boolean hasNoMoreStatic = columns.statics.isEmpty() || removeStatic; + boolean hasNoMoreClusterings = clusterings.isEmpty() || (toRemove != null && toRemove.size() == clusterings.size()); + if (hasNoMoreStatic && hasNoMoreClusterings) + return null; + + NavigableSet<Clustering> newClusterings = clusterings; + if (toRemove != null) + { + newClusterings = new TreeSet<>(result.metadata().comparator); + newClusterings.addAll(Sets.difference(clusterings, toRemove)); + } + return new ClusteringIndexNamesFilter(newClusterings, filter.isReversed()); + } + + private boolean canRemoveRow(Row row, Columns requestedColumns, long sstableTimestamp) + { + // We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query + // cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not) + // and 2) the requested columns. + if (!row.primaryKeyLivenessInfo().hasTimestamp() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp) + return false; + + for (ColumnDefinition column : requestedColumns) + { + // We can never be sure we have all of a collection, so never remove rows in that case. + if (column.type.isCollection()) + return false; + + Cell cell = row.getCell(column); + if (cell == null || cell.livenessInfo().timestamp() <= sstableTimestamp) + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java new file mode 100644 index 0000000..38651c1 --- /dev/null +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; +import java.util.*; + +import org.apache.cassandra.cache.*; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.metrics.ColumnFamilyMetrics; +import org.apache.cassandra.service.*; +import org.apache.cassandra.service.pager.*; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.concurrent.OpOrder; + +/** + * A read command that selects a (part of a) single partition. + */ +public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter> extends ReadCommand +{ + protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); + + private final DecoratedKey partitionKey; + private final F clusteringIndexFilter; + + protected SinglePartitionReadCommand(boolean isDigest, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + F clusteringIndexFilter) + { + super(Kind.SINGLE_PARTITION, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + this.partitionKey = partitionKey; + this.clusteringIndexFilter = clusteringIndexFilter; + } + + /** + * Creates a new read command on a single partition. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param columnFilter the column filter to use for the query. + * @param rowFilter the row filter to use for the query. + * @param limits the limits to use for the query. + * @param partitionKey the partition key for the partition to query. + * @param clusteringIndexFilter the clustering index filter to use for the query. + * + * @return a newly created read command. + */ + public static SinglePartitionReadCommand<?> create(CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter) + { + return create(false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + } + + /** + * Creates a new read command on a single partition for thrift. + * + * @param isForThrift whether the query is for thrift or not. + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param columnFilter the column filter to use for the query. + * @param rowFilter the row filter to use for the query. + * @param limits the limits to use for the query. + * @param partitionKey the partition key for the partition to query. + * @param clusteringIndexFilter the clustering index filter to use for the query. + * + * @return a newly created read command. + */ + public static SinglePartitionReadCommand<?> create(boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter) + { + if (clusteringIndexFilter instanceof ClusteringIndexSliceFilter) + return new SinglePartitionSliceCommand(false, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexSliceFilter) clusteringIndexFilter); + + assert clusteringIndexFilter instanceof ClusteringIndexNamesFilter; + return new SinglePartitionNamesCommand(false, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexNamesFilter) clusteringIndexFilter); + } + + /** + * Creates a new read command on a single partition. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param columnFilter the column filter to use for the query. + * @param filter the clustering index filter to use for the query. + * + * @return a newly created read command. The returned command will use no row filter and have no limits. + */ + public static SinglePartitionReadCommand<?> create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter) + { + return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter); + } + + /** + * Creates a new read command that queries a single partition in its entirety. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * + * @return a newly created read command that queries all the rows of {@code key}. + */ + public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key) + { + return SinglePartitionSliceCommand.create(metadata, nowInSec, key, Slices.ALL); + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public F clusteringIndexFilter() + { + return clusteringIndexFilter; + } + + public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key) + { + return clusteringIndexFilter; + } + + public long getTimeout() + { + return DatabaseDescriptor.getReadRpcTimeout(); + } + + public boolean selects(DecoratedKey partitionKey, Clustering clustering) + { + if (!partitionKey().equals(partitionKey)) + return false; + + if (clustering == Clustering.STATIC_CLUSTERING) + return !columnFilter().fetchedColumns().statics.isEmpty(); + + return clusteringIndexFilter().selects(clustering); + } + + /** + * Returns a new command suitable to paging from the last returned row. + * + * @param lastReturned the last row returned by the previous page. The newly created command + * will only query row that comes after this (in query order). This can be {@code null} if this + * is the first page. + * @param pageSize the size to use for the page to query. + * + * @return the newly create command. + */ + public SinglePartitionReadCommand forPaging(Clustering lastReturned, int pageSize) + { + // We shouldn't have set digest yet when reaching that point + assert !isDigestQuery(); + return create(isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits().forPaging(pageSize), + partitionKey(), + lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false)); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + { + return StorageProxy.read(Group.one(this), consistency, clientState); + } + + public SinglePartitionPager getPager(PagingState pagingState) + { + return getPager(this, pagingState); + } + + private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState) + { + return new SinglePartitionPager(command, pagingState); + } + + protected void recordLatency(ColumnFamilyMetrics metric, long latencyNanos) + { + metric.readLatency.addNano(latencyNanos); + } + + protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup) + { + @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail) + UnfilteredRowIterator partition = cfs.isRowCacheEnabled() + ? getThroughCache(cfs, orderGroup.baseReadOpOrderGroup()) + : queryMemtableAndDisk(cfs, orderGroup.baseReadOpOrderGroup()); + return new SingletonUnfilteredPartitionIterator(partition, isForThrift()); + } + + /** + * Fetch the rows requested if in cache; if not, read it from disk and cache it. + * <p> + * If the partition is cached, and the filter given is within its bounds, we return + * from cache, otherwise from disk. + * <p> + * If the partition is is not cached, we figure out what filter is "biggest", read + * that from disk, then filter the result and either cache that or return it. + */ + private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp) + { + assert !cfs.isIndex(); // CASSANDRA-5732 + assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [" + cfs.name + "]"); + + UUID cfId = metadata().cfId; + RowCacheKey key = new RowCacheKey(cfId, partitionKey()); + + // Attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our + // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862 + // TODO: don't evict entire partitions on writes (#2864) + IRowCacheEntry cached = CacheService.instance.rowCache.get(key); + if (cached != null) + { + if (cached instanceof RowCacheSentinel) + { + // Some other read is trying to cache the value, just do a normal non-caching read + Tracing.trace("Row cache miss (race)"); + cfs.metric.rowCacheMiss.inc(); + return queryMemtableAndDisk(cfs, readOp); + } + + CachedPartition cachedPartition = (CachedPartition)cached; + if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), cachedPartition, nowInSec())) + { + cfs.metric.rowCacheHit.inc(); + Tracing.trace("Row cache hit"); + return clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), cachedPartition); + } + + cfs.metric.rowCacheHitOutOfRange.inc(); + Tracing.trace("Ignoring row cache as cached value could not satisfy query"); + return queryMemtableAndDisk(cfs, readOp); + } + + cfs.metric.rowCacheMiss.inc(); + Tracing.trace("Row cache miss"); + + boolean cacheFullPartitions = metadata().getCaching().rowCache.cacheFullPartitions(); + + // To be able to cache what we read, what we read must at least covers what the cache holds, that + // is the 'rowsToCache' first rows of the partition. We could read those 'rowsToCache' first rows + // systematically, but we'd have to "extend" that to whatever is needed for the user query that the + // 'rowsToCache' first rows don't cover and it's not trivial with our existing filters. So currently + // we settle for caching what we read only if the user query does query the head of the partition since + // that's the common case of when we'll be able to use the cache anyway. One exception is if we cache + // full partitions, in which case we just always read it all and cache. + if (cacheFullPartitions || clusteringIndexFilter().isHeadFilter()) + { + RowCacheSentinel sentinel = new RowCacheSentinel(); + boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel); + boolean sentinelReplaced = false; + + try + { + int rowsToCache = cacheFullPartitions ? Integer.MAX_VALUE : metadata().getCaching().rowCache.rowsToCache; + @SuppressWarnings("resource") // we close on exception or upon closing the result of this method + UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp); + try + { + // We want to cache only rowsToCache rows + CachedPartition toCache = ArrayBackedCachedPartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec()); + if (sentinelSuccess && !toCache.isEmpty()) + { + Tracing.trace("Caching {} rows", toCache.rowCount()); + CacheService.instance.rowCache.replace(key, sentinel, toCache); + // Whether or not the previous replace has worked, our sentinel is not in the cache anymore + sentinelReplaced = true; + } + + // We then re-filter out what this query wants. + // Note that in the case where we don't cache full partitions, it's possible that the current query is interested in more + // than what we've cached, so we can't just use toCache. + UnfilteredRowIterator cacheIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), toCache); + if (cacheFullPartitions) + { + // Everything is guaranteed to be in 'toCache', we're done with 'iter' + assert !iter.hasNext(); + iter.close(); + return cacheIterator; + } + return UnfilteredRowIterators.concat(cacheIterator, clusteringIndexFilter().filterNotIndexed(columnFilter(), iter)); + } + catch (RuntimeException | Error e) + { + iter.close(); + throw e; + } + } + finally + { + if (sentinelSuccess && !sentinelReplaced) + cfs.invalidateCachedPartition(key); + } + } + + Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition"); + return queryMemtableAndDisk(cfs, readOp); + } + + /** + * Queries both memtable and sstables to fetch the result of this query. + * <p> + * Please note that this method: + * 1) does not check the row cache. + * 2) does not apply the query limit, nor the row filter (and so ignore 2ndary indexes). + * Those are applied in {@link ReadCommand#executeLocally}. + * 3) does not record some of the read metrics (latency, scanned cells histograms) nor + * throws TombstoneOverwhelmingException. + * It is publicly exposed because there is a few places where that is exactly what we want, + * but it should be used only where you know you don't need thoses things. + * <p> + * Also note that one must have "started" a {@code OpOrder.Group} on the queried table, and that is + * to enforce that that it is required as parameter, even though it's not explicitlly used by the method. + */ + public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, OpOrder.Group readOp) + { + Tracing.trace("Executing single-partition query on {}", cfs.name); + + boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap(); + return queryMemtableAndDiskInternal(cfs, copyOnHeap); + } + + protected abstract UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap); + + @Override + public String toString() + { + return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s key=%s filter=%s, nowInSec=%d)", + metadata().ksName, + metadata().cfName, + columnFilter(), + rowFilter(), + limits(), + metadata().getKeyValidator().getString(partitionKey().getKey()), + clusteringIndexFilter.toString(metadata()), + nowInSec()); + } + + protected void appendCQLWhereClause(StringBuilder sb) + { + sb.append(" WHERE "); + + sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append(" = "); + DataRange.appendKeyString(sb, metadata().getKeyValidator(), partitionKey().getKey()); + + // We put the row filter first because the clustering index filter can end by "ORDER BY" + if (!rowFilter().isEmpty()) + sb.append(" AND ").append(rowFilter()); + + String filterString = clusteringIndexFilter().toCQLString(metadata()); + if (!filterString.isEmpty()) + sb.append(" AND ").append(filterString); + } + + protected void serializeSelection(DataOutputPlus out, int version) throws IOException + { + metadata().getKeyValidator().writeValue(partitionKey().getKey(), out); + ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), out, version); + } + + protected long selectionSerializedSize(int version) + { + TypeSizes sizes = TypeSizes.NATIVE; + return metadata().getKeyValidator().writtenLength(partitionKey().getKey(), sizes) + + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version); + } + + /** + * Groups multiple single partition read commands. + */ + public static class Group implements ReadQuery + { + public final List<SinglePartitionReadCommand<?>> commands; + private final DataLimits limits; + private final int nowInSec; + + public Group(List<SinglePartitionReadCommand<?>> commands, DataLimits limits) + { + assert !commands.isEmpty(); + this.commands = commands; + this.limits = limits; + this.nowInSec = commands.get(0).nowInSec(); + for (int i = 1; i < commands.size(); i++) + assert commands.get(i).nowInSec() == nowInSec; + } + + public static Group one(SinglePartitionReadCommand<?> command) + { + return new Group(Collections.<SinglePartitionReadCommand<?>>singletonList(command), command.limits()); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + { + return StorageProxy.read(this, consistency, clientState); + } + + public int nowInSec() + { + return nowInSec; + } + + public DataLimits limits() + { + return limits; + } + + public CFMetaData metadata() + { + return commands.get(0).metadata(); + } + + public ReadOrderGroup startOrderGroup() + { + // Note that the only difference between the command in a group must be the partition key on which + // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one. + return commands.get(0).startOrderGroup(); + } + + public PartitionIterator executeInternal(ReadOrderGroup orderGroup) + { + List<PartitionIterator> partitions = new ArrayList<>(commands.size()); + for (SinglePartitionReadCommand cmd : commands) + partitions.add(cmd.executeInternal(orderGroup)); + + // Because we only have enforce the limit per command, we need to enforce it globally. + return limits.filter(PartitionIterators.concat(partitions), nowInSec); + } + + public QueryPager getPager(PagingState pagingState) + { + if (commands.size() == 1) + return SinglePartitionReadCommand.getPager(commands.get(0), pagingState); + + return new MultiPartitionPager(this, pagingState); + } + + @Override + public String toString() + { + return commands.toString(); + } + } + + private static class Deserializer extends SelectionDeserializer + { + public ReadCommand deserialize(DataInput in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) + throws IOException + { + DecoratedKey key = StorageService.getPartitioner().decorateKey(metadata.getKeyValidator().readValue(in)); + ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); + if (filter instanceof ClusteringIndexNamesFilter) + return new SinglePartitionNamesCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexNamesFilter)filter); + else + return new SinglePartitionSliceCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexSliceFilter)filter); + } + }; +}
