http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Conflicts.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Conflicts.java b/src/java/org/apache/cassandra/db/Conflicts.java new file mode 100644 index 0000000..fa0e819 --- /dev/null +++ b/src/java/org/apache/cassandra/db/Conflicts.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.context.CounterContext; + +public abstract class Conflicts +{ + private Conflicts() {} + + public enum Resolution { LEFT_WINS, MERGE, RIGHT_WINS }; + + public static Resolution resolveRegular(long leftTimestamp, + boolean leftLive, + int leftLocalDeletionTime, + ByteBuffer leftValue, + long rightTimestamp, + boolean rightLive, + int rightLocalDeletionTime, + ByteBuffer rightValue) + { + if (leftTimestamp != rightTimestamp) + return leftTimestamp < rightTimestamp ? Resolution.RIGHT_WINS : Resolution.LEFT_WINS; + + if (leftLive != rightLive) + return leftLive ? Resolution.RIGHT_WINS : Resolution.LEFT_WINS; + + int c = leftValue.compareTo(rightValue); + if (c < 0) + return Resolution.RIGHT_WINS; + else if (c > 0) + return Resolution.LEFT_WINS; + + // Prefer the longest ttl if relevant + return leftLocalDeletionTime < rightLocalDeletionTime ? Resolution.RIGHT_WINS : Resolution.LEFT_WINS; + } + + public static Resolution resolveCounter(long leftTimestamp, + boolean leftLive, + ByteBuffer leftValue, + long rightTimestamp, + boolean rightLive, + ByteBuffer rightValue) + { + // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346. + if (!leftLive) + // left is a tombstone: it has precedence over right if either right is not a tombstone, or left has a greater timestamp + return rightLive || leftTimestamp > rightTimestamp ? Resolution.LEFT_WINS : Resolution.RIGHT_WINS; + + // If right is a tombstone, since left isn't one, it has precedence + if (!rightLive) + return Resolution.RIGHT_WINS; + + return Resolution.MERGE; + } + + public static ByteBuffer mergeCounterValues(ByteBuffer left, ByteBuffer right) + { + return CounterContext.instance().merge(left, right); + } + +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CounterCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java deleted file mode 100644 index cda1200..0000000 --- a/src/java/org/apache/cassandra/db/CounterCell.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.context.CounterContext; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.MemtableAllocator; - -/** - * A column that represents a partitioned counter. - */ -public interface CounterCell extends Cell -{ - static final CounterContext contextManager = CounterContext.instance(); - - public long timestampOfLastDelete(); - - public long total(); - - public boolean hasLegacyShards(); - - public Cell markLocalToBeCleared(); - - CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator); - - CounterCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CounterMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index 58717b4..f87c66c 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db; import java.io.DataInput; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -27,12 +26,15 @@ import java.util.concurrent.locks.Lock; import com.google.common.base.Function; import com.google.common.base.Objects; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; import com.google.common.util.concurrent.Striped; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.context.CounterContext; -import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; @@ -41,6 +43,7 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.OpOrder; public class CounterMutation implements IMutation { @@ -67,9 +70,9 @@ public class CounterMutation implements IMutation return mutation.getColumnFamilyIds(); } - public Collection<ColumnFamily> getColumnFamilies() + public Collection<PartitionUpdate> getPartitionUpdates() { - return mutation.getColumnFamilies(); + return mutation.getPartitionUpdates(); } public Mutation getMutation() @@ -77,7 +80,7 @@ public class CounterMutation implements IMutation return mutation; } - public ByteBuffer key() + public DecoratedKey key() { return mutation.key(); } @@ -111,19 +114,14 @@ public class CounterMutation implements IMutation Mutation result = new Mutation(getKeyspaceName(), key()); Keyspace keyspace = Keyspace.open(getKeyspaceName()); - int count = 0; - for (ColumnFamily cf : getColumnFamilies()) - count += cf.getColumnCount(); - - List<Lock> locks = new ArrayList<>(count); - Tracing.trace("Acquiring {} counter locks", count); + List<Lock> locks = new ArrayList<>(); + Tracing.trace("Acquiring counter locks"); try { grabCounterLocks(keyspace, locks); - for (ColumnFamily cf : getColumnFamilies()) - result.add(processModifications(cf)); + for (PartitionUpdate upd : getPartitionUpdates()) + result.add(processModifications(upd)); result.apply(); - updateCounterCache(result, keyspace); return result; } finally @@ -160,141 +158,144 @@ public class CounterMutation implements IMutation */ private Iterable<Object> getCounterLockKeys() { - return Iterables.concat(Iterables.transform(getColumnFamilies(), new Function<ColumnFamily, Iterable<Object>>() + return Iterables.concat(Iterables.transform(getPartitionUpdates(), new Function<PartitionUpdate, Iterable<Object>>() { - public Iterable<Object> apply(final ColumnFamily cf) + public Iterable<Object> apply(final PartitionUpdate update) { - return Iterables.transform(cf, new Function<Cell, Object>() + return Iterables.concat(Iterables.transform(update, new Function<Row, Iterable<Object>>() { - public Object apply(Cell cell) + public Iterable<Object> apply(final Row row) { - return Objects.hashCode(cf.id(), key(), cell.name()); + return Iterables.concat(Iterables.transform(row, new Function<Cell, Object>() + { + public Object apply(final Cell cell) + { + return Objects.hashCode(update.metadata().cfId, key(), row.clustering(), cell.column(), cell.path()); + } + })); } - }); + })); } })); } - // Replaces all the CounterUpdateCell-s with updated regular CounterCell-s - private ColumnFamily processModifications(ColumnFamily changesCF) + private PartitionUpdate processModifications(PartitionUpdate changes) { - ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id()); + ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changes.metadata().cfId); - ColumnFamily resultCF = changesCF.cloneMeShallow(); + List<PartitionUpdate.CounterMark> marks = changes.collectCounterMarks(); - List<CounterUpdateCell> counterUpdateCells = new ArrayList<>(changesCF.getColumnCount()); - for (Cell cell : changesCF) + if (CacheService.instance.counterCache.getCapacity() != 0) { - if (cell instanceof CounterUpdateCell) - counterUpdateCells.add((CounterUpdateCell)cell); - else - resultCF.addColumn(cell); + Tracing.trace("Fetching {} counter values from cache", marks.size()); + updateWithCurrentValuesFromCache(marks, cfs); + if (marks.isEmpty()) + return changes; } - if (counterUpdateCells.isEmpty()) - return resultCF; // only DELETEs - - ClockAndCount[] currentValues = getCurrentValues(counterUpdateCells, cfs); - for (int i = 0; i < counterUpdateCells.size(); i++) - { - ClockAndCount currentValue = currentValues[i]; - CounterUpdateCell update = counterUpdateCells.get(i); + Tracing.trace("Reading {} counter values from the CF", marks.size()); + updateWithCurrentValuesFromCFS(marks, cfs); - long clock = currentValue.clock + 1L; - long count = currentValue.count + update.delta(); + // What's remain is new counters + for (PartitionUpdate.CounterMark mark : marks) + updateWithCurrentValue(mark, ClockAndCount.BLANK, cfs); - resultCF.addColumn(new BufferCounterCell(update.name(), - CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count), - update.timestamp())); - } - - return resultCF; + return changes; } - // Attempt to load the current values(s) from cache. If that fails, read the rest from the cfs. - private ClockAndCount[] getCurrentValues(List<CounterUpdateCell> counterUpdateCells, ColumnFamilyStore cfs) + private void updateWithCurrentValue(PartitionUpdate.CounterMark mark, ClockAndCount currentValue, ColumnFamilyStore cfs) { - ClockAndCount[] currentValues = new ClockAndCount[counterUpdateCells.size()]; - int remaining = counterUpdateCells.size(); + long clock = currentValue.clock + 1L; + long count = currentValue.count + CounterContext.instance().total(mark.value()); - if (CacheService.instance.counterCache.getCapacity() != 0) - { - Tracing.trace("Fetching {} counter values from cache", counterUpdateCells.size()); - remaining = getCurrentValuesFromCache(counterUpdateCells, cfs, currentValues); - if (remaining == 0) - return currentValues; - } - - Tracing.trace("Reading {} counter values from the CF", remaining); - getCurrentValuesFromCFS(counterUpdateCells, cfs, currentValues); + mark.setValue(CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count)); - return currentValues; + // Cache the newly updated value + cfs.putCachedCounter(key().getKey(), mark.clustering(), mark.column(), mark.path(), ClockAndCount.create(clock, count)); } // Returns the count of cache misses. - private int getCurrentValuesFromCache(List<CounterUpdateCell> counterUpdateCells, - ColumnFamilyStore cfs, - ClockAndCount[] currentValues) + private void updateWithCurrentValuesFromCache(List<PartitionUpdate.CounterMark> marks, ColumnFamilyStore cfs) { - int cacheMisses = 0; - for (int i = 0; i < counterUpdateCells.size(); i++) + Iterator<PartitionUpdate.CounterMark> iter = marks.iterator(); + while (iter.hasNext()) { - ClockAndCount cached = cfs.getCachedCounter(key(), counterUpdateCells.get(i).name()); + PartitionUpdate.CounterMark mark = iter.next(); + ClockAndCount cached = cfs.getCachedCounter(key().getKey(), mark.clustering(), mark.column(), mark.path()); if (cached != null) - currentValues[i] = cached; - else - cacheMisses++; + { + updateWithCurrentValue(mark, cached, cfs); + iter.remove(); + } } - return cacheMisses; } // Reads the missing current values from the CFS. - private void getCurrentValuesFromCFS(List<CounterUpdateCell> counterUpdateCells, - ColumnFamilyStore cfs, - ClockAndCount[] currentValues) + private void updateWithCurrentValuesFromCFS(List<PartitionUpdate.CounterMark> marks, ColumnFamilyStore cfs) { - SortedSet<CellName> names = new TreeSet<>(cfs.metadata.comparator); - for (int i = 0; i < currentValues.length; i++) - if (currentValues[i] == null) - names.add(counterUpdateCells.get(i).name()); - - ReadCommand cmd = new SliceByNamesReadCommand(getKeyspaceName(), key(), cfs.metadata.cfName, Long.MIN_VALUE, new NamesQueryFilter(names)); - Row row = cmd.getRow(cfs.keyspace); - ColumnFamily cf = row == null ? null : row.cf; + ColumnFilter.Builder builder = ColumnFilter.selectionBuilder(); + NavigableSet<Clustering> names = new TreeSet<>(cfs.metadata.comparator); + for (PartitionUpdate.CounterMark mark : marks) + { + names.add(mark.clustering().takeAlias()); + if (mark.path() == null) + builder.add(mark.column()); + else + builder.select(mark.column(), mark.path()); + } - for (int i = 0; i < currentValues.length; i++) + int nowInSec = FBUtilities.nowInSeconds(); + ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false); + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key(), builder.build(), filter); + PeekingIterator<PartitionUpdate.CounterMark> markIter = Iterators.peekingIterator(marks.iterator()); + try (OpOrder.Group op = cfs.readOrdering.start(); RowIterator partition = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, op), nowInSec)) { - if (currentValues[i] != null) - continue; + updateForRow(markIter, partition.staticRow(), cfs); - Cell cell = cf == null ? null : cf.getColumn(counterUpdateCells.get(i).name()); - if (cell == null || !cell.isLive()) // absent or a tombstone. - currentValues[i] = ClockAndCount.BLANK; - else - currentValues[i] = CounterContext.instance().getLocalClockAndCount(cell.value()); + while (partition.hasNext()) + { + if (!markIter.hasNext()) + return; + + updateForRow(markIter, partition.next(), cfs); + } } } - private void updateCounterCache(Mutation applied, Keyspace keyspace) + private int compare(Clustering c1, Clustering c2, ColumnFamilyStore cfs) { - if (CacheService.instance.counterCache.getCapacity() == 0) - return; + if (c1 == Clustering.STATIC_CLUSTERING) + return c2 == Clustering.STATIC_CLUSTERING ? 0 : -1; + if (c2 == Clustering.STATIC_CLUSTERING) + return 1; - for (ColumnFamily cf : applied.getColumnFamilies()) - { - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id()); - for (Cell cell : cf) - if (cell instanceof CounterCell) - cfs.putCachedCounter(key(), cell.name(), CounterContext.instance().getLocalClockAndCount(cell.value())); - } + return cfs.getComparator().compare(c1, c2); } - public void addAll(IMutation m) + private void updateForRow(PeekingIterator<PartitionUpdate.CounterMark> markIter, Row row, ColumnFamilyStore cfs) { - if (!(m instanceof CounterMutation)) - throw new IllegalArgumentException(); - CounterMutation cm = (CounterMutation)m; - mutation.addAll(cm.mutation); + int cmp = 0; + // If the mark is before the row, we have no value for this mark, just consume it + while (markIter.hasNext() && (cmp = compare(markIter.peek().clustering(), row.clustering(), cfs)) < 0) + markIter.next(); + + if (!markIter.hasNext()) + return; + + while (cmp == 0) + { + PartitionUpdate.CounterMark mark = markIter.next(); + Cell cell = mark.path() == null ? row.getCell(mark.column()) : row.getCell(mark.column(), mark.path()); + if (cell != null) + { + updateWithCurrentValue(mark, CounterContext.instance().getLocalClockAndCount(cell.value()), cfs); + markIter.remove(); + } + if (!markIter.hasNext()) + return; + + cmp = compare(markIter.peek().clustering(), row.clustering(), cfs); + } } public long getTimeout() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CounterUpdateCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java deleted file mode 100644 index 58ac365..0000000 --- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -/** - * A counter update while it hasn't been applied yet by the leader replica. - * - * Contains a single counter update. When applied by the leader replica, this - * is transformed to a relevant CounterCell. This Cell is a temporary data - * structure that should never be stored inside a memtable or an sstable. - */ -public interface CounterUpdateCell extends Cell -{ - public long delta(); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DataRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java index 1e6f8c8..909d6ed 100644 --- a/src/java/org/apache/cassandra/db/DataRange.java +++ b/src/java/org/apache/cassandra/db/DataRange.java @@ -6,7 +6,6 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -17,302 +16,391 @@ */ package org.apache.cassandra.db; +import java.io.DataInput; +import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import com.google.common.base.Objects; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.composites.Composites; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.dht.*; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; /** - * Groups key range and column filter for range queries. - * - * The main "trick" of this class is that the column filter can only - * be obtained by providing the row key on which the column filter will - * be applied (which we always know before actually querying the columns). - * - * This allows the paging DataRange to return a filter for most rows but a - * potentially different ones for the starting and stopping key. Could - * allow more fancy stuff in the future too, like column filters that - * depend on the actual key value :) + * Groups both the range of partitions to query, and the clustering index filter to + * apply for each partition (for a (partition) range query). + * <p> + * The main "trick" is that the clustering index filter can only be obtained by + * providing the partition key on which the filter will be applied. This is + * necessary when paging range queries, as we might need a different filter + * for the starting key than for other keys (because the previous page we had + * queried may have ended in the middle of a partition). */ public class DataRange { - protected final AbstractBounds<RowPosition> keyRange; - protected IDiskAtomFilter columnFilter; - protected final boolean selectFullRow; + public static final Serializer serializer = new Serializer(); - public DataRange(AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter) + private final AbstractBounds<PartitionPosition> keyRange; + protected final ClusteringIndexFilter clusteringIndexFilter; + + /** + * Creates a {@code DataRange} given a range of partition keys and a clustering index filter. The + * return {@code DataRange} will return the same filter for all keys. + * + * @param range the range over partition keys to use. + * @param clusteringIndexFilter the clustering index filter to use. + */ + public DataRange(AbstractBounds<PartitionPosition> range, ClusteringIndexFilter clusteringIndexFilter) { this.keyRange = range; - this.columnFilter = columnFilter; - this.selectFullRow = columnFilter instanceof SliceQueryFilter - ? isFullRowSlice((SliceQueryFilter)columnFilter) - : false; + this.clusteringIndexFilter = clusteringIndexFilter; } - public static boolean isFullRowSlice(SliceQueryFilter filter) + /** + * Creates a {@code DataRange} to query all data (over the whole ring). + * + * @param partitioner the partitioner in use for the table. + * + * @return the newly create {@code DataRange}. + */ + public static DataRange allData(IPartitioner partitioner) { - return filter.slices.length == 1 - && filter.start().isEmpty() - && filter.finish().isEmpty() - && filter.count == Integer.MAX_VALUE; + return forTokenRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken())); } - public static DataRange allData(IPartitioner partitioner) + /** + * Creates a {@code DataRange} to query all rows over the provided token range. + * + * @param tokenRange the (partition key) token range to query. + * + * @return the newly create {@code DataRange}. + */ + public static DataRange forTokenRange(Range<Token> tokenRange) { - return forTokenRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken())); + return forKeyRange(Range.makeRowRange(tokenRange)); } - public static DataRange forTokenRange(Range<Token> keyRange) + /** + * Creates a {@code DataRange} to query all rows over the provided key range. + * + * @param keyRange the (partition key) range to query. + * + * @return the newly create {@code DataRange}. + */ + public static DataRange forKeyRange(Range<PartitionPosition> keyRange) { - return forKeyRange(Range.makeRowRange(keyRange)); + return new DataRange(keyRange, new ClusteringIndexSliceFilter(Slices.ALL, false)); } - public static DataRange forKeyRange(Range<RowPosition> keyRange) + /** + * Creates a {@code DataRange} to query all partitions of the ring using the provided + * clustering index filter. + * + * @param partitioner the partitioner in use for the table queried. + * @param filter the clustering index filter to use. + * + * @return the newly create {@code DataRange}. + */ + public static DataRange allData(IPartitioner partitioner, ClusteringIndexFilter filter) { - return new DataRange(keyRange, new IdentityQueryFilter()); + return new DataRange(Range.makeRowRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken())), filter); } - public AbstractBounds<RowPosition> keyRange() + /** + * The range of partition key queried by this {@code DataRange}. + * + * @return the range of partition key queried by this {@code DataRange}. + */ + public AbstractBounds<PartitionPosition> keyRange() { return keyRange; } - public RowPosition startKey() + /** + * The start of the partition key range queried by this {@code DataRange}. + * + * @return the start of the partition key range queried by this {@code DataRange}. + */ + public PartitionPosition startKey() { return keyRange.left; } - public RowPosition stopKey() + /** + * The end of the partition key range queried by this {@code DataRange}. + * + * @return the end of the partition key range queried by this {@code DataRange}. + */ + public PartitionPosition stopKey() { return keyRange.right; } /** - * Returns true if tombstoned partitions should not be included in results or count towards the limit. - * See CASSANDRA-8490 for more details on why this is needed (and done this way). - * */ - public boolean ignoredTombstonedPartitions() + * Whether the underlying clustering index filter is a names filter or not. + * + * @return Whether the underlying clustering index filter is a names filter or not. + */ + public boolean isNamesQuery() { - if (!(columnFilter instanceof SliceQueryFilter)) - return false; - - return ((SliceQueryFilter) columnFilter).compositesToGroup == SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS; + return clusteringIndexFilter instanceof ClusteringIndexNamesFilter; } - // Whether the bounds of this DataRange actually wraps around. + /** + * Whether the range queried by this {@code DataRange} actually wraps around. + * + * @return whether the range queried by this {@code DataRange} actually wraps around. + */ public boolean isWrapAround() { - // On range can ever wrap + // Only range can ever wrap return keyRange instanceof Range && ((Range<?>)keyRange).isWrapAround(); } - public boolean contains(RowPosition pos) + /** + * Whether the provided ring position is covered by this {@code DataRange}. + * + * @return whether the provided ring position is covered by this {@code DataRange}. + */ + public boolean contains(PartitionPosition pos) { return keyRange.contains(pos); } - public int getLiveCount(ColumnFamily data, long now) + /** + * Whether this {@code DataRange} queries everything (has no restriction neither on the + * partition queried, nor within the queried partition). + * + * @return Whether this {@code DataRange} queries everything. + */ + public boolean isUnrestricted() { - return columnFilter instanceof SliceQueryFilter - ? ((SliceQueryFilter)columnFilter).lastCounted() - : columnFilter.getLiveCount(data, now); + return startKey().isMinimum() && stopKey().isMinimum() && clusteringIndexFilter.selectsAllPartition(); } - public boolean selectsFullRowFor(ByteBuffer rowKey) + /** + * The clustering index filter to use for the provided key. + * <p> + * This may or may not be the same filter for all keys (that is, paging range + * use a different filter for their start key). + * + * @param key the partition key for which we want the clustering index filter. + * + * @return the clustering filter to use for {@code key}. + */ + public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key) { - return selectFullRow; + return clusteringIndexFilter; } /** - * Returns a column filter that should be used for a particular row key. Note that in the case of paging, - * slice starts and ends may change depending on the row key. + * Returns a new {@code DataRange} for use when paging {@code this} range. + * + * @param range the range of partition keys to query. + * @param comparator the comparator for the table queried. + * @param lastReturned the clustering for the last result returned by the previous page, i.e. the result we want to start our new page + * from. This last returned must <b>must</b> correspond to left bound of {@code range} (in other words, {@code range.left} must be the + * partition key for that {@code lastReturned} result). + * @param inclusive whether or not we want to include the {@code lastReturned} in the newly returned page of results. + * + * @return a new {@code DataRange} suitable for paging {@code this} range given the {@code lastRetuned} result of the previous page. */ - public IDiskAtomFilter columnFilter(ByteBuffer rowKey) + public DataRange forPaging(AbstractBounds<PartitionPosition> range, ClusteringComparator comparator, Clustering lastReturned, boolean inclusive) { - return columnFilter; + return new Paging(range, clusteringIndexFilter, comparator, lastReturned, inclusive); } /** - * Sets a new limit on the number of (grouped) cells to fetch. This is currently only used when the query limit applies - * to CQL3 rows. + * Returns a new {@code DataRange} equivalent to {@code this} one but restricted to the provided sub-range. + * + * @param range the sub-range to use for the newly returned data range. Note that assumes that {@code range} is a proper + * sub-range of the initial range but doesn't validate it. You should make sure to only provided sub-ranges however or this + * might throw off the paging case (see Paging.forSubRange()). + * + * @return a new {@code DataRange} using {@code range} as partition key range and the clustering index filter filter from {@code this}. */ - public void updateColumnsLimit(int count) + public DataRange forSubRange(AbstractBounds<PartitionPosition> range) { - columnFilter.updateColumnsLimit(count); + return new DataRange(range, clusteringIndexFilter); } - public static class Paging extends DataRange + public String toString(CFMetaData metadata) { - // The slice of columns that we want to fetch for each row, ignoring page start/end issues. - private final SliceQueryFilter sliceFilter; + return String.format("range=%s pfilter=%s", keyRange.getString(metadata.getKeyValidator()), clusteringIndexFilter.toString(metadata)); + } - private final CFMetaData cfm; + public String toCQLString(CFMetaData metadata) + { + if (isUnrestricted()) + return "UNRESTRICTED"; - private final Comparator<Composite> comparator; + StringBuilder sb = new StringBuilder(); - // used to restrict the start of the slice for the first partition in the range - private final Composite firstPartitionColumnStart; + boolean needAnd = false; + if (!startKey().isMinimum()) + { + appendClause(startKey(), sb, metadata, true, keyRange.isStartInclusive()); + needAnd = true; + } + if (!stopKey().isMinimum()) + { + if (needAnd) + sb.append(" AND "); + appendClause(stopKey(), sb, metadata, false, keyRange.isEndInclusive()); + needAnd = true; + } - // used to restrict the end of the slice for the last partition in the range - private final Composite lastPartitionColumnFinish; + String filterString = clusteringIndexFilter.toCQLString(metadata); + if (!filterString.isEmpty()) + sb.append(needAnd ? " AND " : "").append(filterString); - // tracks the last key that we updated the filter for to avoid duplicating work - private ByteBuffer lastKeyFilterWasUpdatedFor; + return sb.toString(); + } - private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite firstPartitionColumnStart, - Composite lastPartitionColumnFinish, CFMetaData cfm, Comparator<Composite> comparator) + private void appendClause(PartitionPosition pos, StringBuilder sb, CFMetaData metadata, boolean isStart, boolean isInclusive) + { + sb.append("token("); + sb.append(ColumnDefinition.toCQLString(metadata.partitionKeyColumns())); + sb.append(") ").append(getOperator(isStart, isInclusive)).append(" "); + if (pos instanceof DecoratedKey) + { + sb.append("token("); + appendKeyString(sb, metadata.getKeyValidator(), ((DecoratedKey)pos).getKey()); + sb.append(")"); + } + else + { + sb.append(((Token.KeyBound)pos).getToken()); + } + } + + private static String getOperator(boolean isStart, boolean isInclusive) + { + return isStart + ? (isInclusive ? ">=" : ">") + : (isInclusive ? "<=" : "<"); + } + + // TODO: this is reused in SinglePartitionReadCommand but this should not really be here. Ideally + // we need a more "native" handling of composite partition keys. + public static void appendKeyString(StringBuilder sb, AbstractType<?> type, ByteBuffer key) + { + if (type instanceof CompositeType) + { + CompositeType ct = (CompositeType)type; + ByteBuffer[] values = ct.split(key); + for (int i = 0; i < ct.types.size(); i++) + sb.append(i == 0 ? "" : ", ").append(ct.types.get(i).getString(values[i])); + } + else + { + sb.append(type.getString(key)); + } + } + + /** + * Specialized {@code DataRange} used for the paging case. + * <p> + * It uses the clustering of the last result of the previous page to restrict the filter on the + * first queried partition (the one for that last result) so it only fetch results that follow that + * last result. In other words, this makes sure this resume paging where we left off. + */ + private static class Paging extends DataRange + { + private final ClusteringComparator comparator; + private final Clustering lastReturned; + private final boolean inclusive; + + private Paging(AbstractBounds<PartitionPosition> range, + ClusteringIndexFilter filter, + ClusteringComparator comparator, + Clustering lastReturned, + boolean inclusive) { super(range, filter); // When using a paging range, we don't allow wrapped ranges, as it's unclear how to handle them properly. - // This is ok for now since we only need this in range slice queries, and the range are "unwrapped" in that case. + // This is ok for now since we only need this in range queries, and the range are "unwrapped" in that case. assert !(range instanceof Range) || !((Range<?>)range).isWrapAround() || range.right.isMinimum() : range; + assert lastReturned != null; - this.sliceFilter = filter; - this.cfm = cfm; this.comparator = comparator; - this.firstPartitionColumnStart = firstPartitionColumnStart; - this.lastPartitionColumnFinish = lastPartitionColumnFinish; - this.lastKeyFilterWasUpdatedFor = null; + this.lastReturned = lastReturned; + this.inclusive = inclusive; } - public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite columnStart, Composite columnFinish, CFMetaData cfm) + @Override + public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key) { - this(range, filter, columnStart, columnFinish, cfm, filter.isReversed() ? cfm.comparator.reverseComparator() : cfm.comparator); + return key.equals(startKey()) + ? clusteringIndexFilter.forPaging(comparator, lastReturned, inclusive) + : clusteringIndexFilter; } @Override - public boolean selectsFullRowFor(ByteBuffer rowKey) + public DataRange forSubRange(AbstractBounds<PartitionPosition> range) { - // If we initial filter is not the full filter, don't bother - if (!selectFullRow) - return false; - - if (!equals(startKey(), rowKey) && !equals(stopKey(), rowKey)) - return true; + // This is called for subrange of the initial range. So either it's the beginning of the initial range, + // and we need to preserver lastReturned, or it's not, and we don't care about it anymore. + return range.left.equals(keyRange().left) + ? new Paging(range, clusteringIndexFilter, comparator, lastReturned, inclusive) + : new DataRange(range, clusteringIndexFilter); + } - return isFullRowSlice((SliceQueryFilter)columnFilter(rowKey)); + @Override + public boolean isUnrestricted() + { + return false; } + } - private boolean equals(RowPosition pos, ByteBuffer rowKey) + public static class Serializer + { + public void serialize(DataRange range, DataOutputPlus out, int version, CFMetaData metadata) throws IOException { - return pos instanceof DecoratedKey && ((DecoratedKey)pos).getKey().equals(rowKey); + AbstractBounds.rowPositionSerializer.serialize(range.keyRange, out, version); + ClusteringIndexFilter.serializer.serialize(range.clusteringIndexFilter, out, version); + boolean isPaging = range instanceof Paging; + out.writeBoolean(isPaging); + if (isPaging) + { + Clustering.serializer.serialize(((Paging)range).lastReturned, out, version, metadata.comparator.subtypes()); + out.writeBoolean(((Paging)range).inclusive); + } } - @Override - public IDiskAtomFilter columnFilter(ByteBuffer rowKey) + public DataRange deserialize(DataInput in, int version, CFMetaData metadata) throws IOException { - /* - * We have that ugly hack that for slice queries, when we ask for - * the live count, we reach into the query filter to get the last - * counter number of columns to avoid recounting. - * Maybe we should just remove that hack, but in the meantime, we - * need to keep a reference the last returned filter. - */ - if (equals(startKey(), rowKey) || equals(stopKey(), rowKey)) + AbstractBounds<PartitionPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version); + ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); + if (in.readBoolean()) { - if (!rowKey.equals(lastKeyFilterWasUpdatedFor)) - { - this.lastKeyFilterWasUpdatedFor = rowKey; - columnFilter = sliceFilter.withUpdatedSlices(slicesForKey(rowKey)); - } + ClusteringComparator comparator = metadata.comparator; + Clustering lastReturned = Clustering.serializer.deserialize(in, version, comparator.subtypes()); + boolean inclusive = in.readBoolean(); + return new Paging(range, filter, comparator, lastReturned, inclusive); } else { - columnFilter = sliceFilter; + return new DataRange(range, filter); } - - return columnFilter; } - /** Returns true if the slice includes static columns, false otherwise. */ - private boolean sliceIncludesStatics(ColumnSlice slice, boolean reversed, CFMetaData cfm) + public long serializedSize(DataRange range, int version, CFMetaData metadata) { - return cfm.hasStaticColumns() && - slice.includes(reversed ? cfm.comparator.reverseComparator() : cfm.comparator, cfm.comparator.staticPrefix().end()); - } + long size = AbstractBounds.rowPositionSerializer.serializedSize(range.keyRange, version) + + ClusteringIndexFilter.serializer.serializedSize(range.clusteringIndexFilter, version) + + 1; // isPaging boolean - private ColumnSlice[] slicesForKey(ByteBuffer key) - { - // Also note that firstPartitionColumnStart and lastPartitionColumnFinish, when used, only "restrict" the filter slices, - // it doesn't expand on them. As such, we can ignore the case where they are empty and we do - // as it screw up with the logic below (see #6592) - Composite newStart = equals(startKey(), key) && !firstPartitionColumnStart.isEmpty() ? firstPartitionColumnStart : null; - Composite newFinish = equals(stopKey(), key) && !lastPartitionColumnFinish.isEmpty() ? lastPartitionColumnFinish : null; - - // in the common case, we'll have the same number of slices - List<ColumnSlice> newSlices = new ArrayList<>(sliceFilter.slices.length); - - // Check our slices to see if any fall before the page start (in which case they can be removed) or - // if they contain the page start (in which case they should start from the page start). However, if the - // slices would include static columns, we need to ensure they are also fetched, and so a separate - // slice for the static columns may be required. - // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so - // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details. - for (ColumnSlice slice : sliceFilter.slices) + if (range instanceof Paging) { - if (newStart != null) - { - if (slice.isBefore(comparator, newStart)) - { - if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm)) - newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end())); - - continue; - } - - if (slice.includes(comparator, newStart)) - { - if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm) && !newStart.equals(Composites.EMPTY)) - newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end())); - - slice = new ColumnSlice(newStart, slice.finish); - } - - // once we see a slice that either includes the page start or is after it, we can stop checking - // against the page start (because the slices are ordered) - newStart = null; - } - - assert newStart == null; - if (newFinish != null && !slice.isBefore(comparator, newFinish)) - { - if (slice.includes(comparator, newFinish)) - newSlices.add(new ColumnSlice(slice.start, newFinish)); - // In any case, we're done - break; - } - newSlices.add(slice); + size += Clustering.serializer.serializedSize(((Paging)range).lastReturned, version, metadata.comparator.subtypes(), TypeSizes.NATIVE); + size += 1; // inclusive boolean } - - return newSlices.toArray(new ColumnSlice[newSlices.size()]); - } - - @Override - public void updateColumnsLimit(int count) - { - columnFilter.updateColumnsLimit(count); - sliceFilter.updateColumnsLimit(count); - } - - @Override - public String toString() - { - return Objects.toStringHelper(this) - .add("keyRange", keyRange) - .add("sliceFilter", sliceFilter) - .add("columnFilter", columnFilter) - .add("firstPartitionColumnStart", firstPartitionColumnStart == null ? "null" : cfm.comparator.getString(firstPartitionColumnStart)) - .add("lastPartitionColumnFinish", lastPartitionColumnFinish == null ? "null" : cfm.comparator.getString(lastPartitionColumnFinish)) - .toString(); + return size; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DecoratedKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DecoratedKey.java b/src/java/org/apache/cassandra/db/DecoratedKey.java index cc62a15..92d6414 100644 --- a/src/java/org/apache/cassandra/db/DecoratedKey.java +++ b/src/java/org/apache/cassandra/db/DecoratedKey.java @@ -36,7 +36,7 @@ import org.apache.cassandra.utils.IFilter.FilterKey; * if this matters, you can subclass RP to use a stronger hash, or use a non-lossy tokenization scheme (as in the * OrderPreservingPartitioner classes). */ -public abstract class DecoratedKey implements RowPosition, FilterKey +public abstract class DecoratedKey implements PartitionPosition, FilterKey { public static final Comparator<DecoratedKey> comparator = new Comparator<DecoratedKey>() { @@ -72,7 +72,7 @@ public abstract class DecoratedKey implements RowPosition, FilterKey return ByteBufferUtil.compareUnsigned(getKey(), other.getKey()) == 0; // we compare faster than BB.equals for array backed BB } - public int compareTo(RowPosition pos) + public int compareTo(PartitionPosition pos) { if (this == pos) return 0; @@ -86,7 +86,7 @@ public abstract class DecoratedKey implements RowPosition, FilterKey return cmp == 0 ? ByteBufferUtil.compareUnsigned(getKey(), otherKey.getKey()) : cmp; } - public static int compareTo(IPartitioner partitioner, ByteBuffer key, RowPosition position) + public static int compareTo(IPartitioner partitioner, ByteBuffer key, PartitionPosition position) { // delegate to Token.KeyBound if needed if (!(position instanceof DecoratedKey)) @@ -113,9 +113,9 @@ public abstract class DecoratedKey implements RowPosition, FilterKey return false; } - public RowPosition.Kind kind() + public PartitionPosition.Kind kind() { - return RowPosition.Kind.ROW_KEY; + return PartitionPosition.Kind.ROW_KEY; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletedCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java deleted file mode 100644 index 998c409..0000000 --- a/src/java/org/apache/cassandra/db/DeletedCell.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.MemtableAllocator; - -public interface DeletedCell extends Cell -{ - DeletedCell localCopy(CFMetaData metadata, AbstractAllocator allocator); - - DeletedCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java index 048324a..e54d6b1 100644 --- a/src/java/org/apache/cassandra/db/DeletionInfo.java +++ b/src/java/org/apache/cassandra/db/DeletionInfo.java @@ -17,40 +17,32 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; -import java.io.IOException; -import java.security.MessageDigest; -import java.util.Comparator; import java.util.Iterator; import com.google.common.base.Objects; import com.google.common.collect.Iterators; import org.apache.cassandra.cache.IMeasurableMemory; -import org.apache.cassandra.db.composites.CType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.memory.AbstractAllocator; /** - * A combination of a top-level (or row) tombstone and range tombstones describing the deletions - * within a {@link ColumnFamily} (or row). + * A combination of a top-level (partition) tombstone and range tombstones describing the deletions + * within a partition. */ public class DeletionInfo implements IMeasurableMemory { private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionInfo(0, 0)); /** - * This represents a deletion of the entire row. We can't represent this within the RangeTombstoneList, so it's - * kept separately. This also slightly optimizes the common case of a full row deletion. + * This represents a deletion of the entire partition. We can't represent this within the RangeTombstoneList, so it's + * kept separately. This also slightly optimizes the common case of a full partition deletion. */ - private DeletionTime topLevel; + private DeletionTime partitionDeletion; /** - * A list of range tombstones within the row. This is left as null if there are no range tombstones + * A list of range tombstones within the partition. This is left as null if there are no range tombstones * (to save an allocation (since it's a common case). */ private RangeTombstoneList ranges; @@ -65,28 +57,23 @@ public class DeletionInfo implements IMeasurableMemory { // Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE // (see CASSANDRA-3872) - this(new DeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime)); + this(new SimpleDeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime)); } - public DeletionInfo(DeletionTime topLevel) + public DeletionInfo(DeletionTime partitionDeletion) { - this(topLevel, null); + this(partitionDeletion, null); } - public DeletionInfo(Composite start, Composite end, Comparator<Composite> comparator, long markedForDeleteAt, int localDeletionTime) + public DeletionInfo(ClusteringComparator comparator, Slice slice, long markedForDeleteAt, int localDeletionTime) { this(DeletionTime.LIVE, new RangeTombstoneList(comparator, 1)); - ranges.add(start, end, markedForDeleteAt, localDeletionTime); + ranges.add(slice.start(), slice.end(), markedForDeleteAt, localDeletionTime); } - public DeletionInfo(RangeTombstone rangeTombstone, Comparator<Composite> comparator) + public DeletionInfo(DeletionTime partitionDeletion, RangeTombstoneList ranges) { - this(rangeTombstone.min, rangeTombstone.max, comparator, rangeTombstone.data.markedForDeleteAt, rangeTombstone.data.localDeletionTime); - } - - private DeletionInfo(DeletionTime topLevel, RangeTombstoneList ranges) - { - this.topLevel = topLevel; + this.partitionDeletion = partitionDeletion.takeAlias(); this.ranges = ranges; } @@ -100,17 +87,16 @@ public class DeletionInfo implements IMeasurableMemory public DeletionInfo copy() { - return new DeletionInfo(topLevel, ranges == null ? null : ranges.copy()); + return new DeletionInfo(partitionDeletion, ranges == null ? null : ranges.copy()); } public DeletionInfo copy(AbstractAllocator allocator) { - RangeTombstoneList rangesCopy = null; if (ranges != null) rangesCopy = ranges.copy(allocator); - return new DeletionInfo(topLevel, rangesCopy); + return new DeletionInfo(partitionDeletion, rangesCopy); } /** @@ -118,106 +104,31 @@ public class DeletionInfo implements IMeasurableMemory */ public boolean isLive() { - return topLevel.isLive() && (ranges == null || ranges.isEmpty()); + return partitionDeletion.isLive() && (ranges == null || ranges.isEmpty()); } /** - * Return whether a given cell is deleted by the container having this deletion info. + * Return whether a given cell is deleted by this deletion info. * + * @param clustering the clustering for the cell to check. * @param cell the cell to check. * @return true if the cell is deleted, false otherwise */ - public boolean isDeleted(Cell cell) + private boolean isDeleted(Clustering clustering, Cell cell) { - // We do rely on this test: if topLevel.markedForDeleteAt is MIN_VALUE, we should not - // consider the column deleted even if timestamp=MIN_VALUE, otherwise this break QueryFilter.isRelevant + // If we're live, don't consider anything deleted, even if the cell ends up having as timestamp Long.MIN_VALUE + // (which shouldn't happen in practice, but it would invalid to consider it deleted if it does). if (isLive()) return false; - if (cell.timestamp() <= topLevel.markedForDeleteAt) + if (cell.livenessInfo().timestamp() <= partitionDeletion.markedForDeleteAt()) return true; // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346. - if (!topLevel.isLive() && cell instanceof CounterCell) - return true; - - return ranges != null && ranges.isDeleted(cell); - } - - /** - * Returns a new {@link InOrderTester} in forward order. - */ - public InOrderTester inOrderTester() - { - return inOrderTester(false); - } - - /** - * Returns a new {@link InOrderTester} given the order in which - * columns will be passed to it. - */ - public InOrderTester inOrderTester(boolean reversed) - { - return new InOrderTester(reversed); - } - - /** - * Purge every tombstones that are older than {@code gcbefore}. - * - * @param gcBefore timestamp (in seconds) before which tombstones should be purged - */ - public void purge(int gcBefore) - { - topLevel = topLevel.localDeletionTime < gcBefore ? DeletionTime.LIVE : topLevel; - - if (ranges != null) - { - ranges.purge(gcBefore); - if (ranges.isEmpty()) - ranges = null; - } - } - - /** - * Evaluates difference between this deletion info and superset for read repair - * - * @return the difference between the two, or LIVE if no difference - */ - public DeletionInfo diff(DeletionInfo superset) - { - RangeTombstoneList rangeDiff = superset.ranges == null || superset.ranges.isEmpty() - ? null - : ranges == null ? superset.ranges : ranges.diff(superset.ranges); - - return topLevel.markedForDeleteAt != superset.topLevel.markedForDeleteAt || rangeDiff != null - ? new DeletionInfo(superset.topLevel, rangeDiff) - : DeletionInfo.live(); - } - - - /** - * Digests deletion info. Used to trigger read repair on mismatch. - */ - public void updateDigest(MessageDigest digest) - { - if (topLevel.markedForDeleteAt != Long.MIN_VALUE) - digest.update(ByteBufferUtil.bytes(topLevel.markedForDeleteAt)); - - if (ranges != null) - ranges.updateDigest(digest); - } - - /** - * Returns true if {@code purge} would remove the top-level tombstone or any of the range - * tombstones, false otherwise. - * @param gcBefore timestamp (in seconds) before which tombstones should be purged - */ - public boolean hasPurgeableTombstones(int gcBefore) - { - if (topLevel.localDeletionTime < gcBefore) + if (!partitionDeletion.isLive() && cell.isCounterCell()) return true; - return ranges != null && ranges.hasPurgeableTombstones(gcBefore); + return ranges != null && ranges.isDeleted(clustering, cell); } /** @@ -227,11 +138,11 @@ public class DeletionInfo implements IMeasurableMemory */ public void add(DeletionTime newInfo) { - if (topLevel.markedForDeleteAt < newInfo.markedForDeleteAt) - topLevel = newInfo; + if (newInfo.supersedes(partitionDeletion)) + partitionDeletion = newInfo; } - public void add(RangeTombstone tombstone, Comparator<Composite> comparator) + public void add(RangeTombstone tombstone, ClusteringComparator comparator) { if (ranges == null) ranges = new RangeTombstoneList(comparator, 1); @@ -248,7 +159,7 @@ public class DeletionInfo implements IMeasurableMemory */ public DeletionInfo add(DeletionInfo newInfo) { - add(newInfo.topLevel); + add(newInfo.partitionDeletion); if (ranges == null) ranges = newInfo.ranges == null ? null : newInfo.ranges.copy(); @@ -258,53 +169,30 @@ public class DeletionInfo implements IMeasurableMemory return this; } - /** - * Returns the minimum timestamp in any of the range tombstones or the top-level tombstone. - */ - public long minTimestamp() - { - return ranges == null - ? topLevel.markedForDeleteAt - : Math.min(topLevel.markedForDeleteAt, ranges.minMarkedAt()); - } - - /** - * Returns the maximum timestamp in any of the range tombstones or the top-level tombstone. - */ - public long maxTimestamp() - { - return ranges == null - ? topLevel.markedForDeleteAt - : Math.max(topLevel.markedForDeleteAt, ranges.maxMarkedAt()); - } - - /** - * Returns the top-level (or "row") tombstone. - */ - public DeletionTime getTopLevelDeletion() + public DeletionTime getPartitionDeletion() { - return topLevel; + return partitionDeletion; } // Use sparingly, not the most efficient thing - public Iterator<RangeTombstone> rangeIterator() + public Iterator<RangeTombstone> rangeIterator(boolean reversed) { - return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(); + return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(reversed); } - public Iterator<RangeTombstone> rangeIterator(Composite start, Composite finish) + public Iterator<RangeTombstone> rangeIterator(Slice slice, boolean reversed) { - return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(start, finish); + return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(slice, reversed); } - public RangeTombstone rangeCovering(Composite name) + public RangeTombstone rangeCovering(Clustering name) { return ranges == null ? null : ranges.search(name); } public int dataSize() { - int size = TypeSizes.NATIVE.sizeof(topLevel.markedForDeleteAt); + int size = TypeSizes.NATIVE.sizeof(partitionDeletion.markedForDeleteAt()); return size + (ranges == null ? 0 : ranges.dataSize()); } @@ -323,45 +211,43 @@ public class DeletionInfo implements IMeasurableMemory */ public boolean mayModify(DeletionInfo delInfo) { - return topLevel.compareTo(delInfo.topLevel) > 0 || hasRanges(); + return partitionDeletion.compareTo(delInfo.partitionDeletion) > 0 || hasRanges(); } @Override public String toString() { if (ranges == null || ranges.isEmpty()) - return String.format("{%s}", topLevel); + return String.format("{%s}", partitionDeletion); else - return String.format("{%s, ranges=%s}", topLevel, rangesAsString()); + return String.format("{%s, ranges=%s}", partitionDeletion, rangesAsString()); } private String rangesAsString() { assert !ranges.isEmpty(); StringBuilder sb = new StringBuilder(); - CType type = (CType)ranges.comparator(); - assert type != null; - Iterator<RangeTombstone> iter = rangeIterator(); + ClusteringComparator cc = ranges.comparator(); + Iterator<RangeTombstone> iter = rangeIterator(false); while (iter.hasNext()) { RangeTombstone i = iter.next(); - sb.append("["); - sb.append(type.getString(i.min)).append("-"); - sb.append(type.getString(i.max)).append(", "); - sb.append(i.data); - sb.append("]"); + sb.append(i.deletedSlice().toString(cc)); + sb.append("@"); + sb.append(i.deletionTime()); } return sb.toString(); } // Updates all the timestamp of the deletion contained in this DeletionInfo to be {@code timestamp}. - public void updateAllTimestamp(long timestamp) + public DeletionInfo updateAllTimestamp(long timestamp) { - if (topLevel.markedForDeleteAt != Long.MIN_VALUE) - topLevel = new DeletionTime(timestamp, topLevel.localDeletionTime); + if (partitionDeletion.markedForDeleteAt() != Long.MIN_VALUE) + partitionDeletion = new SimpleDeletionTime(timestamp, partitionDeletion.localDeletionTime()); if (ranges != null) ranges.updateAllTimestamp(timestamp); + return this; } @Override @@ -370,100 +256,18 @@ public class DeletionInfo implements IMeasurableMemory if(!(o instanceof DeletionInfo)) return false; DeletionInfo that = (DeletionInfo)o; - return topLevel.equals(that.topLevel) && Objects.equal(ranges, that.ranges); + return partitionDeletion.equals(that.partitionDeletion) && Objects.equal(ranges, that.ranges); } @Override public final int hashCode() { - return Objects.hashCode(topLevel, ranges); + return Objects.hashCode(partitionDeletion, ranges); } @Override public long unsharedHeapSize() { - return EMPTY_SIZE + topLevel.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize()); - } - - public static class Serializer implements IVersionedSerializer<DeletionInfo> - { - private final RangeTombstoneList.Serializer rtlSerializer; - - public Serializer(CType type) - { - this.rtlSerializer = new RangeTombstoneList.Serializer(type); - } - - public void serialize(DeletionInfo info, DataOutputPlus out, int version) throws IOException - { - DeletionTime.serializer.serialize(info.topLevel, out); - rtlSerializer.serialize(info.ranges, out, version); - } - - public DeletionInfo deserialize(DataInput in, int version) throws IOException - { - DeletionTime topLevel = DeletionTime.serializer.deserialize(in); - RangeTombstoneList ranges = rtlSerializer.deserialize(in, version); - return new DeletionInfo(topLevel, ranges); - } - - public long serializedSize(DeletionInfo info, TypeSizes typeSizes, int version) - { - long size = DeletionTime.serializer.serializedSize(info.topLevel, typeSizes); - return size + rtlSerializer.serializedSize(info.ranges, typeSizes, version); - } - - public long serializedSize(DeletionInfo info, int version) - { - return serializedSize(info, TypeSizes.NATIVE, version); - } - } - - /** - * This object allow testing whether a given column (name/timestamp) is deleted - * or not by this DeletionInfo, assuming that the columns given to this - * object are passed in forward or reversed comparator sorted order. - * - * This is more efficient that calling DeletionInfo.isDeleted() repeatedly - * in that case. - */ - public class InOrderTester - { - /* - * Note that because because range tombstone are added to this DeletionInfo while we iterate, - * `ranges` may be null initially and we need to wait for the first range to create the tester (once - * created the test will pick up new tombstones however). We are guaranteed that a range tombstone - * will be added *before* we test any column that it may delete, so this is ok. - */ - private RangeTombstoneList.InOrderTester tester; - private final boolean reversed; - - private InOrderTester(boolean reversed) - { - this.reversed = reversed; - } - - public boolean isDeleted(Cell cell) - { - if (cell.timestamp() <= topLevel.markedForDeleteAt) - return true; - - // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346. - if (!topLevel.isLive() && cell instanceof CounterCell) - return true; - - /* - * We don't optimize the reversed case for now because RangeTombstoneList - * is always in forward sorted order. - */ - if (reversed) - return DeletionInfo.this.isDeleted(cell); - - // Maybe create the tester if we hadn't yet and we now have some ranges (see above). - if (tester == null && ranges != null) - tester = ranges.inOrderTester(); - - return tester != null && tester.isDeleted(cell); - } + return EMPTY_SIZE + partitionDeletion.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletionTime.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java index 7165417..f070778 100644 --- a/src/java/org/apache/cassandra/db/DeletionTime.java +++ b/src/java/org/apache/cassandra/db/DeletionTime.java @@ -19,58 +19,56 @@ package org.apache.cassandra.db; import java.io.DataInput; import java.io.IOException; +import java.security.MessageDigest; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; -import org.codehaus.jackson.annotate.JsonIgnore; /** - * A top-level (row) tombstone. + * Information on deletion of a storage engine object. */ -public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory +public abstract class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory, Aliasable<DeletionTime> { - private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0)); + private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleDeletionTime(0, 0)); /** * A special DeletionTime that signifies that there is no top-level (row) tombstone. */ - public static final DeletionTime LIVE = new DeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE); + public static final DeletionTime LIVE = new SimpleDeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE); + + public static final Serializer serializer = new Serializer(); /** * A timestamp (typically in microseconds since the unix epoch, although this is not enforced) after which * data should be considered deleted. If set to Long.MIN_VALUE, this implies that the data has not been marked * for deletion at all. */ - public final long markedForDeleteAt; + public abstract long markedForDeleteAt(); /** * The local server timestamp, in seconds since the unix epoch, at which this tombstone was created. This is * only used for purposes of purging the tombstone after gc_grace_seconds have elapsed. */ - public final int localDeletionTime; - - public static final Serializer serializer = new Serializer(); - - @VisibleForTesting - public DeletionTime(long markedForDeleteAt, int localDeletionTime) - { - this.markedForDeleteAt = markedForDeleteAt; - this.localDeletionTime = localDeletionTime; - } + public abstract int localDeletionTime(); /** * Returns whether this DeletionTime is live, that is deletes no columns. */ - @JsonIgnore public boolean isLive() { - return markedForDeleteAt == Long.MIN_VALUE && localDeletionTime == Integer.MAX_VALUE; + return markedForDeleteAt() == Long.MIN_VALUE && localDeletionTime() == Integer.MAX_VALUE; + } + + public void digest(MessageDigest digest) + { + FBUtilities.updateWithLong(digest, markedForDeleteAt()); + FBUtilities.updateWithInt(digest, localDeletionTime()); } @Override @@ -79,48 +77,58 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory if(!(o instanceof DeletionTime)) return false; DeletionTime that = (DeletionTime)o; - return markedForDeleteAt == that.markedForDeleteAt && localDeletionTime == that.localDeletionTime; + return markedForDeleteAt() == that.markedForDeleteAt() && localDeletionTime() == that.localDeletionTime(); } @Override public final int hashCode() { - return Objects.hashCode(markedForDeleteAt, localDeletionTime); + return Objects.hashCode(markedForDeleteAt(), localDeletionTime()); } @Override public String toString() { - return String.format("deletedAt=%d, localDeletion=%d", markedForDeleteAt, localDeletionTime); + return String.format("deletedAt=%d, localDeletion=%d", markedForDeleteAt(), localDeletionTime()); } public int compareTo(DeletionTime dt) { - if (markedForDeleteAt < dt.markedForDeleteAt) + if (markedForDeleteAt() < dt.markedForDeleteAt()) return -1; - else if (markedForDeleteAt > dt.markedForDeleteAt) + else if (markedForDeleteAt() > dt.markedForDeleteAt()) return 1; - else if (localDeletionTime < dt.localDeletionTime) + else if (localDeletionTime() < dt.localDeletionTime()) return -1; - else if (localDeletionTime > dt.localDeletionTime) + else if (localDeletionTime() > dt.localDeletionTime()) return -1; else return 0; } - public boolean isGcAble(int gcBefore) + public boolean supersedes(DeletionTime dt) { - return localDeletionTime < gcBefore; + return markedForDeleteAt() > dt.markedForDeleteAt() || (markedForDeleteAt() == dt.markedForDeleteAt() && localDeletionTime() > dt.localDeletionTime()); } - public boolean isDeleted(OnDiskAtom atom) + public boolean isPurgeable(long maxPurgeableTimestamp, int gcBefore) { - return atom.timestamp() <= markedForDeleteAt; + return markedForDeleteAt() < maxPurgeableTimestamp && localDeletionTime() < gcBefore; } - public boolean supersedes(DeletionTime dt) + public boolean deletes(LivenessInfo info) + { + return deletes(info.timestamp()); + } + + public boolean deletes(long timestamp) + { + return timestamp <= markedForDeleteAt(); + } + + public int dataSize() { - return this.markedForDeleteAt > dt.markedForDeleteAt; + return 12; } public long unsharedHeapSize() @@ -132,8 +140,8 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory { public void serialize(DeletionTime delTime, DataOutputPlus out) throws IOException { - out.writeInt(delTime.localDeletionTime); - out.writeLong(delTime.markedForDeleteAt); + out.writeInt(delTime.localDeletionTime()); + out.writeLong(delTime.markedForDeleteAt()); } public DeletionTime deserialize(DataInput in) throws IOException @@ -142,7 +150,7 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory long mfda = in.readLong(); return mfda == Long.MIN_VALUE && ldt == Integer.MAX_VALUE ? LIVE - : new DeletionTime(mfda, ldt); + : new SimpleDeletionTime(mfda, ldt); } public void skip(DataInput in) throws IOException @@ -152,8 +160,8 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory public long serializedSize(DeletionTime delTime, TypeSizes typeSizes) { - return typeSizes.sizeof(delTime.localDeletionTime) - + typeSizes.sizeof(delTime.markedForDeleteAt); + return typeSizes.sizeof(delTime.localDeletionTime()) + + typeSizes.sizeof(delTime.markedForDeleteAt()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletionTimeArray.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionTimeArray.java b/src/java/org/apache/cassandra/db/DeletionTimeArray.java new file mode 100644 index 0000000..77eb953 --- /dev/null +++ b/src/java/org/apache/cassandra/db/DeletionTimeArray.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.util.Arrays; + +import org.apache.cassandra.utils.ObjectSizes; + +/** + * Utility class to store an array of deletion times a bit efficiently. + */ +public class DeletionTimeArray +{ + private long[] markedForDeleteAts; + private int[] delTimes; + + public DeletionTimeArray(int initialCapacity) + { + this.markedForDeleteAts = new long[initialCapacity]; + this.delTimes = new int[initialCapacity]; + clear(); + } + + public void clear(int i) + { + markedForDeleteAts[i] = Long.MIN_VALUE; + delTimes[i] = Integer.MAX_VALUE; + } + + public void set(int i, DeletionTime dt) + { + this.markedForDeleteAts[i] = dt.markedForDeleteAt(); + this.delTimes[i] = dt.localDeletionTime(); + } + + public int size() + { + return markedForDeleteAts.length; + } + + public void resize(int newSize) + { + int prevSize = size(); + + markedForDeleteAts = Arrays.copyOf(markedForDeleteAts, newSize); + delTimes = Arrays.copyOf(delTimes, newSize); + + Arrays.fill(markedForDeleteAts, prevSize, newSize, Long.MIN_VALUE); + Arrays.fill(delTimes, prevSize, newSize, Integer.MAX_VALUE); + } + + public boolean supersedes(int i, DeletionTime dt) + { + return markedForDeleteAts[i] > dt.markedForDeleteAt(); + } + + public boolean supersedes(int i, int j) + { + return markedForDeleteAts[i] > markedForDeleteAts[j]; + } + + public void swap(int i, int j) + { + long m = markedForDeleteAts[j]; + int l = delTimes[j]; + + move(i, j); + + markedForDeleteAts[i] = m; + delTimes[i] = l; + } + + public void move(int i, int j) + { + markedForDeleteAts[j] = markedForDeleteAts[i]; + delTimes[j] = delTimes[i]; + } + + public boolean isLive(int i) + { + return markedForDeleteAts[i] > Long.MIN_VALUE; + } + + public void clear() + { + Arrays.fill(markedForDeleteAts, Long.MIN_VALUE); + Arrays.fill(delTimes, Integer.MAX_VALUE); + } + + public int dataSize() + { + return 12 * markedForDeleteAts.length; + } + + public long unsharedHeapSize() + { + return ObjectSizes.sizeOfArray(markedForDeleteAts) + + ObjectSizes.sizeOfArray(delTimes); + } + + public void copy(DeletionTimeArray other) + { + assert size() == other.size(); + for (int i = 0; i < size(); i++) + { + markedForDeleteAts[i] = other.markedForDeleteAts[i]; + delTimes[i] = other.delTimes[i]; + } + } + + public static class Cursor extends DeletionTime + { + private DeletionTimeArray array; + private int i; + + public Cursor setTo(DeletionTimeArray array, int i) + { + this.array = array; + this.i = i; + return this; + } + + public long markedForDeleteAt() + { + return array.markedForDeleteAts[i]; + } + + public int localDeletionTime() + { + return array.delTimes[i]; + } + + public DeletionTime takeAlias() + { + return new SimpleDeletionTime(markedForDeleteAt(), localDeletionTime()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ExpiringCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java deleted file mode 100644 index 5fc0f94..0000000 --- a/src/java/org/apache/cassandra/db/ExpiringCell.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.MemtableAllocator; - -/** - * Alternative to Cell that have an expiring time. - * ExpiringCell is immutable (as Cell is). - * - * Note that ExpiringCell does not override Cell.getMarkedForDeleteAt, - * which means that it's in the somewhat unintuitive position of being deleted (after its expiration) - * without having a time-at-which-it-became-deleted. (Because ttl is a server-side measurement, - * we can't mix it with the timestamp field, which is client-supplied and whose resolution we - * can't assume anything about.) - */ -public interface ExpiringCell extends Cell -{ - public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in seconds - - public int getTimeToLive(); - - ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator allocator); - - ExpiringCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup); -}
