http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index dedff6f..bad096f 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -19,158 +19,500 @@ package org.apache.cassandra.db; import java.io.DataInput; import java.io.IOException; -import java.nio.ByteBuffer; +import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.NamesQueryFilter; -import org.apache.cassandra.db.filter.SliceQueryFilter; +import org.apache.cassandra.db.index.SecondaryIndexSearcher; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.metrics.ColumnFamilyMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.IReadCommand; -import org.apache.cassandra.service.RowDataResolver; -import org.apache.cassandra.service.pager.Pageable; +import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.tracing.Tracing; -public abstract class ReadCommand implements IReadCommand, Pageable +/** + * General interface for storage-engine read commands (common to both range and + * single partition commands). + * <p> + * This contains all the informations needed to do a local read. + */ +public abstract class ReadCommand implements ReadQuery { - public enum Type + protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class); + + public static final IVersionedSerializer<ReadCommand> serializer = new Serializer(); + + private final Kind kind; + private final CFMetaData metadata; + private final int nowInSec; + + private final ColumnFilter columnFilter; + private final RowFilter rowFilter; + private final DataLimits limits; + + private boolean isDigestQuery; + private final boolean isForThrift; + + protected static abstract class SelectionDeserializer { - GET_BY_NAMES((byte)1), - GET_SLICES((byte)2); + public abstract ReadCommand deserialize(DataInput in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException; + } - public final byte serializedValue; + protected enum Kind + { + SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer), + PARTITION_RANGE (PartitionRangeReadCommand.selectionDeserializer); - private Type(byte b) - { - this.serializedValue = b; - } + private SelectionDeserializer selectionDeserializer; - public static Type fromSerializedValue(byte b) + private Kind(SelectionDeserializer selectionDeserializer) { - return b == 1 ? GET_BY_NAMES : GET_SLICES; + this.selectionDeserializer = selectionDeserializer; } } - public static final ReadCommandSerializer serializer = new ReadCommandSerializer(); + protected ReadCommand(Kind kind, + boolean isDigestQuery, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits) + { + this.kind = kind; + this.isDigestQuery = isDigestQuery; + this.isForThrift = isForThrift; + this.metadata = metadata; + this.nowInSec = nowInSec; + this.columnFilter = columnFilter; + this.rowFilter = rowFilter; + this.limits = limits; + } - public MessageOut<ReadCommand> createMessage() + protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException; + protected abstract long selectionSerializedSize(int version); + + /** + * The metadata for the table queried. + * + * @return the metadata for the table queried. + */ + public CFMetaData metadata() { - return new MessageOut<>(MessagingService.Verb.READ, this, serializer); + return metadata; } - public final String ksName; - public final String cfName; - public final ByteBuffer key; - public final long timestamp; - private boolean isDigestQuery = false; - protected final Type commandType; + /** + * The time in seconds to use as "now" for this query. + * <p> + * We use the same time as "now" for the whole query to avoid considering different + * values as expired during the query, which would be buggy (would throw of counting amongst other + * things). + * + * @return the time (in seconds) to use as "now". + */ + public int nowInSec() + { + return nowInSec; + } + + /** + * The configured timeout for this command. + * + * @return the configured timeout for this command. + */ + public abstract long getTimeout(); - protected ReadCommand(String ksName, ByteBuffer key, String cfName, long timestamp, Type cmdType) + /** + * A filter on which (non-PK) columns must be returned by the query. + * + * @return which columns must be fetched by this query. + */ + public ColumnFilter columnFilter() { - this.ksName = ksName; - this.key = key; - this.cfName = cfName; - this.timestamp = timestamp; - this.commandType = cmdType; + return columnFilter; } - public static ReadCommand create(String ksName, ByteBuffer key, String cfName, long timestamp, IDiskAtomFilter filter) + /** + * Filters/Resrictions on CQL rows. + * <p> + * This contains the restrictions that are not directly handled by the + * {@code ClusteringIndexFilter}. More specifically, this includes any non-PK column + * restrictions and can include some PK columns restrictions when those can't be + * satisfied entirely by the clustering index filter (because not all clustering columns + * have been restricted for instance). If there is 2ndary indexes on the table, + * one of this restriction might be handled by a 2ndary index. + * + * @return the filter holding the expression that rows must satisfy. + */ + public RowFilter rowFilter() { - if (filter instanceof SliceQueryFilter) - return new SliceFromReadCommand(ksName, key, cfName, timestamp, (SliceQueryFilter)filter); - else - return new SliceByNamesReadCommand(ksName, key, cfName, timestamp, (NamesQueryFilter)filter); + return rowFilter; } + /** + * The limits set on this query. + * + * @return the limits set on this query. + */ + public DataLimits limits() + { + return limits; + } + + /** + * Whether this query is a digest one or not. + * + * @return Whether this query is a digest query. + */ public boolean isDigestQuery() { return isDigestQuery; } + /** + * Sets whether this command should be a digest one or not. + * + * @param isDigestQuery whether the command should be set as a digest one or not. + * @return this read command. + */ public ReadCommand setIsDigestQuery(boolean isDigestQuery) { this.isDigestQuery = isDigestQuery; return this; } - public String getColumnFamilyName() + /** + * Whether this query is for thrift or not. + * + * @return whether this query is for thrift. + */ + public boolean isForThrift() { - return cfName; + return isForThrift; } + /** + * The clustering index filter this command to use for the provided key. + * <p> + * Note that that method should only be called on a key actually queried by this command + * and in practice, this will almost always return the same filter, but for the sake of + * paging, the filter on the first key of a range command might be slightly different. + * + * @param key a partition key queried by this command. + * + * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}. + */ + public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key); + + /** + * Returns a copy of this command. + * + * @return a copy of this command. + */ public abstract ReadCommand copy(); - public abstract Row getRow(Keyspace keyspace); + /** + * Whether the provided row, identified by its primary key components, is selected by + * this read command. + * + * @param partitionKey the partition key for the row to test. + * @param clustering the clustering for the row to test. + * + * @return whether the row of partition key {@code partitionKey} and clustering + * {@code clustering} is selected by this command. + */ + public abstract boolean selects(DecoratedKey partitionKey, Clustering clustering); - public abstract IDiskAtomFilter filter(); + protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup); - public String getKeyspace() + public ReadResponse createResponse(UnfilteredPartitionIterator iterator) { - return ksName; + return isDigestQuery() + ? ReadResponse.createDigestResponse(iterator) + : ReadResponse.createDataResponse(iterator); } - // maybeGenerateRetryCommand is used to generate a retry for short reads - public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row) + protected SecondaryIndexSearcher getIndexSearcher(ColumnFamilyStore cfs) { - return null; + return cfs.indexManager.getBestIndexSearcherFor(this); } - // maybeTrim removes columns from a response that is too long - public void maybeTrim(Row row) + /** + * Executes this command on the local host. + * + * @param cfs the store for the table queried by this command. + * + * @return an iterator over the result of executing this command locally. + */ + @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary + // iterators created inside the try as long as we do close the original resultIterator), or by closing the result. + public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup) { - // noop + long startTimeNanos = System.nanoTime(); + + ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata()); + SecondaryIndexSearcher searcher = getIndexSearcher(cfs); + UnfilteredPartitionIterator resultIterator = searcher == null + ? queryStorage(cfs, orderGroup) + : searcher.search(this, orderGroup); + + try + { + resultIterator = UnfilteredPartitionIterators.convertExpiredCellsToTombstones(resultIterator, nowInSec); + resultIterator = withMetricsRecording(withoutExpiredTombstones(resultIterator, cfs), cfs.metric, startTimeNanos); + + // TODO: we should push the dropping of columns down the layers because + // 1) it'll be more efficient + // 2) it could help us solve #6276 + // But there is not reason not to do this as a followup so keeping it here for now (we'll have + // to be wary of cached row if we move this down the layers) + if (!metadata().getDroppedColumns().isEmpty()) + resultIterator = UnfilteredPartitionIterators.removeDroppedColumns(resultIterator, metadata().getDroppedColumns()); + + // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so + // no point in checking it again. + RowFilter updatedFilter = searcher == null + ? rowFilter() + : rowFilter().without(searcher.primaryClause(this)); + + // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However, + // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it + // would be more efficient (the sooner we discard stuff we know we don't care, the less useless + // processing we do on it). + return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec()); + } + catch (RuntimeException | Error e) + { + resultIterator.close(); + throw e; + } } - public long getTimeout() + protected abstract void recordLatency(ColumnFamilyMetrics metric, long latencyNanos); + + public PartitionIterator executeInternal(ReadOrderGroup orderGroup) { - return DatabaseDescriptor.getReadRpcTimeout(); + return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec()); } -} -class ReadCommandSerializer implements IVersionedSerializer<ReadCommand> -{ - public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + public ReadOrderGroup startOrderGroup() + { + return ReadOrderGroup.forCommand(this); + } + + /** + * Wraps the provided iterator so that metrics on what is scanned by the command are recorded. + * This also log warning/trow TombstoneOverwhelmingException if appropriate. + */ + private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final ColumnFamilyMetrics metric, final long startTimeNanos) { - out.writeByte(command.commandType.serializedValue); - switch (command.commandType) + return new WrappingUnfilteredPartitionIterator(iter) { - case GET_BY_NAMES: - SliceByNamesReadCommand.serializer.serialize(command, out, version); - break; - case GET_SLICES: - SliceFromReadCommand.serializer.serialize(command, out, version); - break; - default: - throw new AssertionError(); - } + private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold(); + private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold(); + + private final boolean respectTombstoneThresholds = !ReadCommand.this.metadata().ksName.equals(SystemKeyspace.NAME); + + private int liveRows = 0; + private int tombstones = 0; + + private DecoratedKey currentKey; + + @Override + public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) + { + currentKey = iter.partitionKey(); + + return new WrappingUnfilteredRowIterator(iter) + { + public Unfiltered next() + { + Unfiltered unfiltered = super.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + { + Row row = (Row) unfiltered; + if (row.hasLiveData(ReadCommand.this.nowInSec())) + ++liveRows; + for (Cell cell : row) + if (!cell.isLive(ReadCommand.this.nowInSec())) + countTombstone(row.clustering()); + } + else + { + countTombstone(unfiltered.clustering()); + } + + return unfiltered; + } + + private void countTombstone(ClusteringPrefix clustering) + { + ++tombstones; + if (tombstones > failureThreshold && respectTombstoneThresholds) + { + String query = ReadCommand.this.toCQLString(); + Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query); + throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering); + } + } + }; + } + + @Override + public void close() + { + try + { + super.close(); + } + finally + { + recordLatency(metric, System.nanoTime() - startTimeNanos); + + metric.tombstoneScannedHistogram.update(tombstones); + metric.liveScannedHistogram.update(liveRows); + + boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds; + if (warnTombstones) + { + String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString()); + ClientWarn.warn(msg); + logger.warn(msg); + } + + Tracing.trace("Read {} live and {} tombstone cells{}", new Object[]{ liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "") }); + } + } + }; + } + + /** + * Creates a message for this command. + */ + public MessageOut<ReadCommand> createMessage() + { + // TODO: we should use different verbs for old message (RANGE_SLICE, PAGED_RANGE) + return new MessageOut<>(MessagingService.Verb.READ, this, serializer); } - public ReadCommand deserialize(DataInput in, int version) throws IOException + protected abstract void appendCQLWhereClause(StringBuilder sb); + + // Skip expired tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it + // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for expired tombstones (which + // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). + protected UnfilteredPartitionIterator withoutExpiredTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs) { - ReadCommand.Type msgType = ReadCommand.Type.fromSerializedValue(in.readByte()); - switch (msgType) + return new TombstonePurgingPartitionIterator(iterator, cfs.gcBefore(nowInSec())) { - case GET_BY_NAMES: - return SliceByNamesReadCommand.serializer.deserialize(in, version); - case GET_SLICES: - return SliceFromReadCommand.serializer.deserialize(in, version); - default: - throw new AssertionError(); - } + protected long getMaxPurgeableTimestamp() + { + return Long.MAX_VALUE; + } + }; } - public long serializedSize(ReadCommand command, int version) + /** + * Recreate the CQL string corresponding to this query. + * <p> + * Note that in general the returned string will not be exactly the original user string, first + * because there isn't always a single syntax for a given query, but also because we don't have + * all the information needed (we know the non-PK columns queried but not the PK ones as internally + * we query them all). So this shouldn't be relied too strongly, but this should be good enough for + * debugging purpose which is what this is for. + */ + public String toCQLString() { - switch (command.commandType) + StringBuilder sb = new StringBuilder(); + sb.append("SELECT ").append(columnFilter()); + sb.append(" FROM ").append(metadata().ksName).append(".").append(metadata.cfName); + appendCQLWhereClause(sb); + + if (limits() != DataLimits.NONE) + sb.append(" ").append(limits()); + return sb.toString(); + } + + private static class Serializer implements IVersionedSerializer<ReadCommand> + { + private static int digestFlag(boolean isDigest) + { + return isDigest ? 0x01 : 0; + } + + private static boolean isDigest(int flags) { - case GET_BY_NAMES: - return 1 + SliceByNamesReadCommand.serializer.serializedSize(command, version); - case GET_SLICES: - return 1 + SliceFromReadCommand.serializer.serializedSize(command, version); - default: - throw new AssertionError(); + return (flags & 0x01) != 0; + } + + private static int thriftFlag(boolean isForThrift) + { + return isForThrift ? 0x02 : 0; + } + + private static boolean isForThrift(int flags) + { + return (flags & 0x02) != 0; + } + + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + if (version < MessagingService.VERSION_30) + throw new UnsupportedOperationException(); + + out.writeByte(command.kind.ordinal()); + out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift())); + CFMetaData.serializer.serialize(command.metadata(), out, version); + out.writeInt(command.nowInSec()); + ColumnFilter.serializer.serialize(command.columnFilter(), out, version); + RowFilter.serializer.serialize(command.rowFilter(), out, version); + DataLimits.serializer.serialize(command.limits(), out, version); + + command.serializeSelection(out, version); + } + + public ReadCommand deserialize(DataInput in, int version) throws IOException + { + if (version < MessagingService.VERSION_30) + throw new UnsupportedOperationException(); + + Kind kind = Kind.values()[in.readByte()]; + int flags = in.readByte(); + boolean isDigest = isDigest(flags); + boolean isForThrift = isForThrift(flags); + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); + int nowInSec = in.readInt(); + ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); + RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); + DataLimits limits = DataLimits.serializer.deserialize(in, version); + + return kind.selectionDeserializer.deserialize(in, version, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + } + + public long serializedSize(ReadCommand command, int version) + { + if (version < MessagingService.VERSION_30) + throw new UnsupportedOperationException(); + + TypeSizes sizes = TypeSizes.NATIVE; + + return 2 // kind + flags + + CFMetaData.serializer.serializedSize(command.metadata(), version, sizes) + + sizes.sizeof(command.nowInSec()) + + ColumnFilter.serializer.serializedSize(command.columnFilter(), version, sizes) + + RowFilter.serializer.serializedSize(command.rowFilter(), version) + + DataLimits.serializer.serializedSize(command.limits(), version) + + command.selectionSerializedSize(version); } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java new file mode 100644 index 0000000..f85d406 --- /dev/null +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tracing.Tracing; + +public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> +{ + public void doVerb(MessageIn<ReadCommand> message, int id) + { + if (StorageService.instance.isBootstrapMode()) + { + throw new RuntimeException("Cannot service reads while bootstrapping!"); + } + + ReadCommand command = message.payload; + ReadResponse response; + try (ReadOrderGroup opGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(opGroup)) + { + response = command.createResponse(iterator); + } + + MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, ReadResponse.serializer); + + Tracing.trace("Enqueuing response to {}", message.from); + MessagingService.instance().sendReply(reply, id, message.from); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadOrderGroup.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadOrderGroup.java b/src/java/org/apache/cassandra/db/ReadOrderGroup.java new file mode 100644 index 0000000..0a5bee8 --- /dev/null +++ b/src/java/org/apache/cassandra/db/ReadOrderGroup.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.apache.cassandra.db.index.*; +import org.apache.cassandra.utils.concurrent.OpOrder; + +public class ReadOrderGroup implements AutoCloseable +{ + // For every reads + private final OpOrder.Group baseOp; + + // For index reads + private final OpOrder.Group indexOp; + private final OpOrder.Group writeOp; + + private ReadOrderGroup(OpOrder.Group baseOp, OpOrder.Group indexOp, OpOrder.Group writeOp) + { + this.baseOp = baseOp; + this.indexOp = indexOp; + this.writeOp = writeOp; + } + + public OpOrder.Group baseReadOpOrderGroup() + { + return baseOp; + } + + public OpOrder.Group indexReadOpOrderGroup() + { + return indexOp; + } + + public OpOrder.Group writeOpOrderGroup() + { + return writeOp; + } + + public static ReadOrderGroup emptyGroup() + { + return new ReadOrderGroup(null, null, null); + } + + public static ReadOrderGroup forCommand(ReadCommand command) + { + ColumnFamilyStore baseCfs = Keyspace.openAndGetStore(command.metadata()); + ColumnFamilyStore indexCfs = maybeGetIndexCfs(baseCfs, command); + + if (indexCfs == null) + { + return new ReadOrderGroup(baseCfs.readOrdering.start(), null, null); + } + else + { + OpOrder.Group baseOp = null, indexOp = null, writeOp; + // OpOrder.start() shouldn't fail, but better safe than sorry. + try + { + baseOp = baseCfs.readOrdering.start(); + indexOp = indexCfs.readOrdering.start(); + // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room + // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made + writeOp = baseCfs.keyspace.writeOrder.start(); + return new ReadOrderGroup(baseOp, indexOp, writeOp); + } + catch (RuntimeException e) + { + // Note that must have writeOp == null since ReadOrderGroup ctor can't fail + try + { + if (baseOp != null) + baseOp.close(); + } + finally + { + if (indexOp != null) + indexOp.close(); + } + throw e; + } + } + } + + private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command) + { + SecondaryIndexSearcher searcher = command.getIndexSearcher(baseCfs); + if (searcher == null) + return null; + + SecondaryIndex index = searcher.highestSelectivityIndex(command.rowFilter()); + return index == null || !(index instanceof AbstractSimplePerColumnSecondaryIndex) + ? null + : ((AbstractSimplePerColumnSecondaryIndex)index).getIndexCfs(); + } + + public void close() + { + try + { + if (baseOp != null) + baseOp.close(); + } + finally + { + if (indexOp != null) + { + try + { + indexOp.close(); + } + finally + { + writeOp.close(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java new file mode 100644 index 0000000..3ad0f82 --- /dev/null +++ b/src/java/org/apache/cassandra/db/ReadQuery.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.pager.QueryPager; +import org.apache.cassandra.service.pager.PagingState; + +/** + * Generic abstraction for read queries. + * <p> + * The main implementation of this is {@link ReadCommand} but we have this interface because + * {@link SinglePartitionReadCommand.Group} is also consider as a "read query" but is not a + * {@code ReadCommand}. + */ +public interface ReadQuery +{ + public static final ReadQuery EMPTY = new ReadQuery() + { + public ReadOrderGroup startOrderGroup() + { + return ReadOrderGroup.emptyGroup(); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + { + return PartitionIterators.EMPTY; + } + + public PartitionIterator executeInternal(ReadOrderGroup orderGroup) + { + return PartitionIterators.EMPTY; + } + + public DataLimits limits() + { + // What we return here doesn't matter much in practice. However, returning DataLimits.NONE means + // "no particular limit", which makes SelectStatement.execute() take the slightly more complex "paging" + // path. Not a big deal but it's easy enough to return a limit of 0 rows which avoids this. + return DataLimits.cqlLimits(0); + } + + public QueryPager getPager(PagingState state) + { + return QueryPager.EMPTY; + } + + public QueryPager getLocalPager() + { + return QueryPager.EMPTY; + } + }; + + /** + * Starts a new read operation. + * <p> + * This must be called before {@link executeInternal} and passed to it to protect the read. + * The returned object <b>must</b> be closed on all path and it is thus strongly advised to + * use it in a try-with-ressource construction. + * + * @return a newly started order group for this {@code ReadQuery}. + */ + public ReadOrderGroup startOrderGroup(); + + /** + * Executes the query at the provided consistency level. + * + * @param consistency the consistency level to achieve for the query. + * @param clientState the {@code ClientState} for the query. In practice, this can be null unless + * {@code consistency} is a serial consistency. + * + * @return the result of the query. + */ + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException; + + /** + * Execute the query for internal queries (that is, it basically executes the query locally). + * + * @param orderGroup the {@code ReadOrderGroup} protecting the read. + * @return the result of the query. + */ + public PartitionIterator executeInternal(ReadOrderGroup orderGroup); + + /** + * Returns a pager for the query. + * + * @param pagingState the {@code PagingState} to start from if this is a paging continuation. This can be + * {@code null} if this is the start of paging. + * + * @return a pager for the query. + */ + public QueryPager getPager(PagingState pagingState); + + /** + * The limits for the query. + * + * @return The limits for the query. + */ + public DataLimits limits(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 39022a4..6453077 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -19,96 +19,219 @@ package org.apache.cassandra.db; import java.io.*; import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.List; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; -/* - * The read response message is sent by the server when reading data - * this encapsulates the keyspacename and the row that has been read. - * The keyspace name is needed so that we can use it to create repairs. - */ -public class ReadResponse +public abstract class ReadResponse { - public static final IVersionedSerializer<ReadResponse> serializer = new ReadResponseSerializer(); + public static final IVersionedSerializer<ReadResponse> serializer = new Serializer(); + public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer(); - private final Row row; - private final ByteBuffer digest; - - public ReadResponse(ByteBuffer digest) + public static ReadResponse createDataResponse(UnfilteredPartitionIterator data) { - assert digest != null; - this.digest= digest; - this.row = null; + return new DataResponse(data); } - public ReadResponse(Row row) + public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data) { - assert row != null; - this.row = row; - this.digest = null; + return new DigestResponse(makeDigest(data)); } - public Row row() - { - return row; - } + public abstract UnfilteredPartitionIterator makeIterator(); + public abstract ByteBuffer digest(); + public abstract boolean isDigestQuery(); - public ByteBuffer digest() + protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator) { - return digest; + MessageDigest digest = FBUtilities.threadLocalMD5Digest(); + UnfilteredPartitionIterators.digest(iterator, digest); + return ByteBuffer.wrap(digest.digest()); } - public boolean isDigestQuery() + private static class DigestResponse extends ReadResponse { - return digest != null; + private final ByteBuffer digest; + + private DigestResponse(ByteBuffer digest) + { + assert digest.hasRemaining(); + this.digest = digest; + } + + public UnfilteredPartitionIterator makeIterator() + { + throw new UnsupportedOperationException(); + } + + public ByteBuffer digest() + { + return digest; + } + + public boolean isDigestQuery() + { + return true; + } } -} -class ReadResponseSerializer implements IVersionedSerializer<ReadResponse> -{ - public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException + private static class DataResponse extends ReadResponse { - out.writeInt(response.isDigestQuery() ? response.digest().remaining() : 0); - ByteBuffer buffer = response.isDigestQuery() ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER; - out.write(buffer); - out.writeBoolean(response.isDigestQuery()); - if (!response.isDigestQuery()) - Row.serializer.serialize(response.row(), out, version); + // The response, serialized in the current messaging version + private final ByteBuffer data; + private final SerializationHelper.Flag flag; + + private DataResponse(ByteBuffer data) + { + this.data = data; + this.flag = SerializationHelper.Flag.FROM_REMOTE; + } + + private DataResponse(UnfilteredPartitionIterator iter) + { + try (DataOutputBuffer buffer = new DataOutputBuffer()) + { + UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, buffer, MessagingService.current_version); + this.data = buffer.buffer(); + this.flag = SerializationHelper.Flag.LOCAL; + } + catch (IOException e) + { + // We're serializing in memory so this shouldn't happen + throw new RuntimeException(e); + } + } + + public UnfilteredPartitionIterator makeIterator() + { + try + { + DataInput in = new DataInputStream(ByteBufferUtil.inputStream(data)); + return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, MessagingService.current_version, flag); + } + catch (IOException e) + { + // We're deserializing in memory so this shouldn't happen + throw new RuntimeException(e); + } + } + + public ByteBuffer digest() + { + try (UnfilteredPartitionIterator iterator = makeIterator()) + { + return makeDigest(iterator); + } + } + + public boolean isDigestQuery() + { + return false; + } } - public ReadResponse deserialize(DataInput in, int version) throws IOException + private static class Serializer implements IVersionedSerializer<ReadResponse> { - byte[] digest = null; - int digestSize = in.readInt(); - if (digestSize > 0) + public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException { - digest = new byte[digestSize]; - in.readFully(digest, 0, digestSize); + if (version < MessagingService.VERSION_30) + { + // TODO + throw new UnsupportedOperationException(); + } + + boolean isDigest = response.isDigestQuery(); + ByteBufferUtil.writeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER, out); + if (!isDigest) + { + // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the + // version, we'll have to deserialize/re-serialize the data to be in the proper version. + assert version == MessagingService.VERSION_30; + ByteBuffer data = ((DataResponse)response).data; + ByteBufferUtil.writeWithLength(data, out); + } } - boolean isDigest = in.readBoolean(); - assert isDigest == digestSize > 0; - Row row = null; - if (!isDigest) + public ReadResponse deserialize(DataInput in, int version) throws IOException { - // This is coming from a remote host - row = Row.serializer.deserialize(in, version, ColumnSerializer.Flag.FROM_REMOTE); + if (version < MessagingService.VERSION_30) + { + // TODO + throw new UnsupportedOperationException(); + } + + ByteBuffer digest = ByteBufferUtil.readWithShortLength(in); + if (digest.hasRemaining()) + return new DigestResponse(digest); + + assert version == MessagingService.VERSION_30; + ByteBuffer data = ByteBufferUtil.readWithLength(in); + return new DataResponse(data); } - return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row); + public long serializedSize(ReadResponse response, int version) + { + if (version < MessagingService.VERSION_30) + { + // TODO + throw new UnsupportedOperationException(); + } + + TypeSizes sizes = TypeSizes.NATIVE; + boolean isDigest = response.isDigestQuery(); + long size = ByteBufferUtil.serializedSizeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER, sizes); + + if (!isDigest) + { + // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the + // version, we'll have to deserialize/re-serialize the data to be in the proper version. + assert version == MessagingService.VERSION_30; + ByteBuffer data = ((DataResponse)response).data; + size += ByteBufferUtil.serializedSizeWithLength(data, sizes); + } + return size; + } } - public long serializedSize(ReadResponse response, int version) + private static class LegacyRangeSliceReplySerializer implements IVersionedSerializer<ReadResponse> { - TypeSizes typeSizes = TypeSizes.NATIVE; - ByteBuffer buffer = response.isDigestQuery() ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER; - int size = typeSizes.sizeof(buffer.remaining()); - size += buffer.remaining(); - size += typeSizes.sizeof(response.isDigestQuery()); - if (!response.isDigestQuery()) - size += Row.serializer.serializedSize(response.row(), version); - return size; + public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException + { + // TODO + throw new UnsupportedOperationException(); + // out.writeInt(rsr.rows.size()); + // for (Row row : rsr.rows) + // Row.serializer.serialize(row, out, version); + } + + public ReadResponse deserialize(DataInput in, int version) throws IOException + { + // TODO + throw new UnsupportedOperationException(); + // int rowCount = in.readInt(); + // List<Row> rows = new ArrayList<Row>(rowCount); + // for (int i = 0; i < rowCount; i++) + // rows.add(Row.serializer.deserialize(in, version)); + // return new RangeSliceReply(rows); + } + + public long serializedSize(ReadResponse response, int version) + { + // TODO + throw new UnsupportedOperationException(); + // int size = TypeSizes.NATIVE.sizeof(rsr.rows.size()); + // for (Row row : rsr.rows) + // size += Row.serializer.serializedSize(row, version); + // return size; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java deleted file mode 100644 index 8c167ed..0000000 --- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import org.apache.cassandra.net.IVerbHandler; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.tracing.Tracing; - -public class ReadVerbHandler implements IVerbHandler<ReadCommand> -{ - public void doVerb(MessageIn<ReadCommand> message, int id) - { - if (StorageService.instance.isBootstrapMode()) - { - throw new RuntimeException("Cannot service reads while bootstrapping!"); - } - - ReadCommand command = message.payload; - Keyspace keyspace = Keyspace.open(command.ksName); - Row row = command.getRow(keyspace); - - MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(MessagingService.Verb.REQUEST_RESPONSE, - getResponse(command, row), - ReadResponse.serializer); - Tracing.trace("Enqueuing response to {}", message.from); - MessagingService.instance().sendReply(reply, id, message.from); - } - - public static ReadResponse getResponse(ReadCommand command, Row row) - { - if (command.isDigestQuery()) - { - return new ReadResponse(ColumnFamily.digest(row.cf)); - } - else - { - return new ReadResponse(row); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java deleted file mode 100644 index 41f5a50..0000000 --- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.filter.SliceQueryFilter; - -public class RetriedSliceFromReadCommand extends SliceFromReadCommand -{ - static final Logger logger = LoggerFactory.getLogger(RetriedSliceFromReadCommand.class); - public final int originalCount; - - public RetriedSliceFromReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter, int originalCount) - { - super(keyspaceName, key, cfName, timestamp, filter); - this.originalCount = originalCount; - } - - @Override - public ReadCommand copy() - { - return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, filter, originalCount).setIsDigestQuery(isDigestQuery()); - } - - @Override - public int getOriginalRequestedCount() - { - return originalCount; - } - - @Override - public String toString() - { - return "RetriedSliceFromReadCommand(" + "cmd=" + super.toString() + ", originalCount=" + originalCount + ")"; - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReusableClustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReusableClustering.java b/src/java/org/apache/cassandra/db/ReusableClustering.java new file mode 100644 index 0000000..e2760aa --- /dev/null +++ b/src/java/org/apache/cassandra/db/ReusableClustering.java @@ -0,0 +1,82 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.cassandra.utils.ObjectSizes; + +public class ReusableClustering extends Clustering +{ + private static final long EMPTY_SIZE = ObjectSizes.measure(new ReusableClustering(0)); + + protected final ByteBuffer[] values; + + protected ReusableWriter writer; + + public ReusableClustering(int size) + { + this.values = new ByteBuffer[size]; + } + + public int size() + { + return values.length; + } + + public ByteBuffer get(int i) + { + return values[i]; + } + + public ByteBuffer[] getRawValues() + { + return values; + } + + public Writer writer() + { + if (writer == null) + writer = new ReusableWriter(); + return writer; + } + + public void reset() + { + Arrays.fill(values, null); + if (writer != null) + writer.reset(); + } + + protected class ReusableWriter implements Writer + { + int idx; + + public void writeClusteringValue(ByteBuffer value) + { + values[idx++] = value; + } + + private void reset() + { + idx = 0; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java b/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java new file mode 100644 index 0000000..d2f19f7 --- /dev/null +++ b/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.cassandra.utils.ObjectSizes; + +// Note that we abuse a bit ReusableClustering to store Slice.Bound infos, but it's convenient so ... +public class ReusableClusteringPrefix extends ReusableClustering +{ + private Kind kind; + private int size; + + public ReusableClusteringPrefix(int size) + { + super(size); + } + + public ClusteringPrefix get() + { + // We use ReusableClusteringPrefix when writing sstables (in ColumnIndex) and we + // don't write static clustering there. + assert kind != Kind.STATIC_CLUSTERING; + if (kind == Kind.CLUSTERING) + { + assert values.length == size; + return this; + } + + return Slice.Bound.create(kind, Arrays.copyOfRange(values, 0, size)); + } + + public void copy(ClusteringPrefix clustering) + { + kind = clustering.kind(); + for (int i = 0; i < clustering.size(); i++) + values[i] = clustering.get(i); + size = clustering.size(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java b/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java new file mode 100644 index 0000000..43530b0 --- /dev/null +++ b/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +public class ReusableLivenessInfo extends AbstractLivenessInfo +{ + private long timestamp; + private int ttl; + private int localDeletionTime; + + public ReusableLivenessInfo() + { + reset(); + } + + public LivenessInfo setTo(LivenessInfo info) + { + return setTo(info.timestamp(), info.ttl(), info.localDeletionTime()); + } + + public LivenessInfo setTo(long timestamp, int ttl, int localDeletionTime) + { + this.timestamp = timestamp; + this.ttl = ttl; + this.localDeletionTime = localDeletionTime; + return this; + } + + public long timestamp() + { + return timestamp; + } + + public int ttl() + { + return ttl; + } + + public int localDeletionTime() + { + return localDeletionTime; + } + + public void reset() + { + this.timestamp = LivenessInfo.NO_TIMESTAMP; + this.ttl = LivenessInfo.NO_TTL; + this.localDeletionTime = LivenessInfo.NO_DELETION_TIME; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java deleted file mode 100644 index a826894..0000000 --- a/src/java/org/apache/cassandra/db/Row.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.*; -import java.nio.ByteBuffer; - -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ByteBufferUtil; - -public class Row -{ - public static final RowSerializer serializer = new RowSerializer(); - - public final DecoratedKey key; - public final ColumnFamily cf; - - public Row(DecoratedKey key, ColumnFamily cf) - { - assert key != null; - // cf may be null, indicating no data - this.key = key; - this.cf = cf; - } - - public Row(ByteBuffer key, ColumnFamily updates) - { - this(StorageService.getPartitioner().decorateKey(key), updates); - } - - @Override - public String toString() - { - return "Row(" + - "key=" + key + - ", cf=" + cf + - ')'; - } - - public int getLiveCount(IDiskAtomFilter filter, long now) - { - return cf == null ? 0 : filter.getLiveCount(cf, now); - } - - public static class RowSerializer implements IVersionedSerializer<Row> - { - public void serialize(Row row, DataOutputPlus out, int version) throws IOException - { - ByteBufferUtil.writeWithShortLength(row.key.getKey(), out); - ColumnFamily.serializer.serialize(row.cf, out, version); - } - - public Row deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException - { - return new Row(StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)), - ColumnFamily.serializer.deserialize(in, flag, version)); - } - - public Row deserialize(DataInput in, int version) throws IOException - { - return deserialize(in, version, ColumnSerializer.Flag.LOCAL); - } - - public long serializedSize(Row row, int version) - { - int keySize = row.key.getKey().remaining(); - return TypeSizes.NATIVE.sizeof((short) keySize) + keySize + ColumnFamily.serializer.serializedSize(row.cf, TypeSizes.NATIVE, version); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java index 4ff61ce..016e26e 100644 --- a/src/java/org/apache/cassandra/db/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java @@ -26,6 +26,7 @@ import java.util.List; import com.google.common.primitives.Ints; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.sstable.IndexHelper; @@ -45,7 +46,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory this.position = position; } - public int promotedSize(ISerializer<T> idxSerializer) + public int promotedSize(CFMetaData metadata, Version version, SerializationHeader header) { return 0; } @@ -100,34 +101,39 @@ public class RowIndexEntry<T> implements IMeasurableMemory public static interface IndexSerializer<T> { void serialize(RowIndexEntry<T> rie, DataOutputPlus out) throws IOException; - RowIndexEntry<T> deserialize(DataInput in, Version version) throws IOException; + RowIndexEntry<T> deserialize(DataInput in) throws IOException; public int serializedSize(RowIndexEntry<T> rie); } public static class Serializer implements IndexSerializer<IndexHelper.IndexInfo> { - private final ISerializer<IndexHelper.IndexInfo> idxSerializer; + private final CFMetaData metadata; + private final Version version; + private final SerializationHeader header; - public Serializer(ISerializer<IndexHelper.IndexInfo> idxSerializer) + public Serializer(CFMetaData metadata, Version version, SerializationHeader header) { - this.idxSerializer = idxSerializer; + this.metadata = metadata; + this.version = version; + this.header = header; } public void serialize(RowIndexEntry<IndexHelper.IndexInfo> rie, DataOutputPlus out) throws IOException { out.writeLong(rie.position); - out.writeInt(rie.promotedSize(idxSerializer)); + out.writeInt(rie.promotedSize(metadata, version, header)); if (rie.isIndexed()) { DeletionTime.serializer.serialize(rie.deletionTime(), out); out.writeInt(rie.columnsIndex().size()); + IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version); for (IndexHelper.IndexInfo info : rie.columnsIndex()) - idxSerializer.serialize(info, out); + idxSerializer.serialize(info, out, header); } } - public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInput in, Version version) throws IOException + public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInput in) throws IOException { long position = in.readLong(); @@ -137,9 +143,10 @@ public class RowIndexEntry<T> implements IMeasurableMemory DeletionTime deletionTime = DeletionTime.serializer.deserialize(in); int entries = in.readInt(); + IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version); List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<>(entries); for (int i = 0; i < entries; i++) - columnsIndex.add(idxSerializer.deserialize(in)); + columnsIndex.add(idxSerializer.deserialize(in, header)); return new IndexedEntry(position, deletionTime, columnsIndex); } @@ -166,7 +173,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory public int serializedSize(RowIndexEntry<IndexHelper.IndexInfo> rie) { - int size = TypeSizes.NATIVE.sizeof(rie.position) + TypeSizes.NATIVE.sizeof(rie.promotedSize(idxSerializer)); + int size = TypeSizes.NATIVE.sizeof(rie.position) + TypeSizes.NATIVE.sizeof(rie.promotedSize(metadata, version, header)); if (rie.isIndexed()) { @@ -175,8 +182,9 @@ public class RowIndexEntry<T> implements IMeasurableMemory size += DeletionTime.serializer.serializedSize(rie.deletionTime(), TypeSizes.NATIVE); size += TypeSizes.NATIVE.sizeof(index.size()); + IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version); for (IndexHelper.IndexInfo info : index) - size += idxSerializer.serializedSize(info, TypeSizes.NATIVE); + size += idxSerializer.serializedSize(info, header, TypeSizes.NATIVE); } @@ -217,13 +225,14 @@ public class RowIndexEntry<T> implements IMeasurableMemory } @Override - public int promotedSize(ISerializer<IndexHelper.IndexInfo> idxSerializer) + public int promotedSize(CFMetaData metadata, Version version, SerializationHeader header) { TypeSizes typeSizes = TypeSizes.NATIVE; long size = DeletionTime.serializer.serializedSize(deletionTime, typeSizes); size += typeSizes.sizeof(columnsIndex.size()); // number of entries + IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version); for (IndexHelper.IndexInfo info : columnsIndex) - size += idxSerializer.serializedSize(info, typeSizes); + size += idxSerializer.serializedSize(info, header, typeSizes); return Ints.checkedCast(size); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RowIteratorFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java deleted file mode 100644 index 3473e96..0000000 --- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.util.*; - -import com.google.common.collect.Iterables; - -import org.apache.cassandra.db.columniterator.IColumnIteratorFactory; -import org.apache.cassandra.db.columniterator.LazyColumnIterator; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.MergeIterator; - -public class RowIteratorFactory -{ - - private static final Comparator<OnDiskAtomIterator> COMPARE_BY_KEY = new Comparator<OnDiskAtomIterator>() - { - public int compare(OnDiskAtomIterator o1, OnDiskAtomIterator o2) - { - return DecoratedKey.comparator.compare(o1.getKey(), o2.getKey()); - } - }; - - - /** - * Get a row iterator over the provided memtables and sstables, between the provided keys - * and filtered by the queryfilter. - * @param memtables Memtables pending flush. - * @param sstables SStables to scan through. - * @param range The data range to fetch - * @param cfs - * @return A row iterator following all the given restrictions - */ - public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables, - final Collection<SSTableReader> sstables, - final DataRange range, - final ColumnFamilyStore cfs, - final long now) - { - // fetch data from current memtable, historical memtables, and SSTables in the correct order. - final List<CloseableIterator<OnDiskAtomIterator>> iterators = new ArrayList<>(Iterables.size(memtables) + sstables.size()); - - for (Memtable memtable : memtables) - iterators.add(new ConvertToColumnIterator(range, memtable.getEntryIterator(range.startKey(), range.stopKey()))); - - for (SSTableReader sstable : sstables) - iterators.add(sstable.getScanner(range)); - - // reduce rows from all sources into a single row - return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<OnDiskAtomIterator, Row>() - { - private final int gcBefore = cfs.gcBefore(now); - private final List<OnDiskAtomIterator> colIters = new ArrayList<>(); - private DecoratedKey key; - private ColumnFamily returnCF; - - @Override - protected void onKeyChange() - { - this.returnCF = ArrayBackedSortedColumns.factory.create(cfs.metadata, range.columnFilter.isReversed()); - } - - public void reduce(OnDiskAtomIterator current) - { - this.colIters.add(current); - this.key = current.getKey(); - this.returnCF.delete(current.getColumnFamily()); - } - - protected Row getReduced() - { - // First check if this row is in the rowCache. If it is and it covers our filter, we can skip the rest - ColumnFamily cached = cfs.getRawCachedRow(key); - IDiskAtomFilter filter = range.columnFilter(key.getKey()); - - try - { - if (cached == null || !cfs.isFilterFullyCoveredBy(filter, cached, now)) - { - // not cached: collate - QueryFilter.collateOnDiskAtom(returnCF, colIters, filter, key, gcBefore, now); - } - else - { - QueryFilter keyFilter = new QueryFilter(key, cfs.name, filter, now); - returnCF = cfs.filterColumnFamily(cached, keyFilter); - } - } - catch(TombstoneOverwhelmingException e) - { - e.setKey(key); - throw e; - } - - Row rv = new Row(key, returnCF); - colIters.clear(); - key = null; - return rv; - } - }); - } - - /** - * Get a ColumnIterator for a specific key in the memtable. - */ - private static class ConvertToColumnIterator implements CloseableIterator<OnDiskAtomIterator> - { - private final DataRange range; - private final Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter; - - public ConvertToColumnIterator(DataRange range, Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter) - { - this.range = range; - this.iter = iter; - } - - public boolean hasNext() - { - return iter.hasNext(); - } - - /* - * Note that when doing get_paged_slice, we reset the start of the queryFilter after we've fetched the - * first row. This means that this iterator should not use in any way the filter to fetch a row before - * we call next(). Which prevents us for using guava AbstractIterator. - * This is obviously rather fragile and we should consider refactoring that code, but such refactor will go - * deep into the storage engine code so this will have to do until then. - */ - public OnDiskAtomIterator next() - { - final Map.Entry<DecoratedKey, ColumnFamily> entry = iter.next(); - return new LazyColumnIterator(entry.getKey(), new IColumnIteratorFactory() - { - public OnDiskAtomIterator create() - { - return range.columnFilter(entry.getKey().getKey()).getColumnIterator(entry.getKey(), entry.getValue()); - } - }); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - - public void close() - { - // pass - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RowPosition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowPosition.java b/src/java/org/apache/cassandra/db/RowPosition.java deleted file mode 100644 index 3fa0465..0000000 --- a/src/java/org/apache/cassandra/db/RowPosition.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.cassandra.dht.*; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ByteBufferUtil; - -public interface RowPosition extends RingPosition<RowPosition> -{ - public static enum Kind - { - // Only add new values to the end of the enum, the ordinal is used - // during serialization - ROW_KEY, MIN_BOUND, MAX_BOUND; - - private static final Kind[] allKinds = Kind.values(); - - static Kind fromOrdinal(int ordinal) - { - return allKinds[ordinal]; - } - } - - public static final class ForKey - { - public static RowPosition get(ByteBuffer key, IPartitioner p) - { - return key == null || key.remaining() == 0 ? p.getMinimumToken().minKeyBound() : p.decorateKey(key); - } - } - - public static final RowPositionSerializer serializer = new RowPositionSerializer(); - - public Kind kind(); - public boolean isMinimum(); - - public static class RowPositionSerializer implements IPartitionerDependentSerializer<RowPosition> - { - /* - * We need to be able to serialize both Token.KeyBound and - * DecoratedKey. To make this compact, we first write a byte whose - * meaning is: - * - 0: DecoratedKey - * - 1: a 'minimum' Token.KeyBound - * - 2: a 'maximum' Token.KeyBound - * In the case of the DecoratedKey, we then serialize the key (the - * token is recreated on the other side). In the other cases, we then - * serialize the token. - */ - public void serialize(RowPosition pos, DataOutputPlus out, int version) throws IOException - { - Kind kind = pos.kind(); - out.writeByte(kind.ordinal()); - if (kind == Kind.ROW_KEY) - ByteBufferUtil.writeWithShortLength(((DecoratedKey)pos).getKey(), out); - else - Token.serializer.serialize(pos.getToken(), out, version); - } - - public RowPosition deserialize(DataInput in, IPartitioner p, int version) throws IOException - { - Kind kind = Kind.fromOrdinal(in.readByte()); - if (kind == Kind.ROW_KEY) - { - ByteBuffer k = ByteBufferUtil.readWithShortLength(in); - return StorageService.getPartitioner().decorateKey(k); - } - else - { - Token t = Token.serializer.deserialize(in, p, version); - return kind == Kind.MIN_BOUND ? t.minKeyBound() : t.maxKeyBound(); - } - } - - public long serializedSize(RowPosition pos, int version) - { - Kind kind = pos.kind(); - int size = 1; // 1 byte for enum - if (kind == Kind.ROW_KEY) - { - int keySize = ((DecoratedKey)pos).getKey().remaining(); - size += TypeSizes.NATIVE.sizeof((short) keySize) + keySize; - } - else - { - size += Token.serializer.serializedSize(pos.getToken(), version); - } - return size; - } - } -}
