http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java new file mode 100644 index 0000000..65b4e3f --- /dev/null +++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.util.*; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler; +import org.apache.cassandra.thrift.ThriftResultsMerger; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.memory.HeapAllocator; + +/** + * General interface for storage-engine read queries. + */ +public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<ClusteringIndexSliceFilter> +{ + public SinglePartitionSliceCommand(boolean isDigest, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexSliceFilter clusteringIndexFilter) + { + super(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + } + + public SinglePartitionSliceCommand(CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexSliceFilter clusteringIndexFilter) + { + this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + } + + /** + * Creates a new single partition slice command for the provided single slice. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param slice the slice of rows to query. + * + * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will + * query every columns for the table (without limit or row filtering) and be in forward order. + */ + public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice) + { + return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice)); + } + + /** + * Creates a new single partition slice command for the provided slices. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param slices the slices of rows to query. + * + * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will + * query every columns for the table (without limit or row filtering) and be in forward order. + */ + public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices) + { + ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false); + return new SinglePartitionSliceCommand(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); + } + + public SinglePartitionSliceCommand copy() + { + return new SinglePartitionSliceCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); + } + + protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap) + { + Tracing.trace("Acquiring sstable references"); + ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(partitionKey())); + + List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size()); + ClusteringIndexSliceFilter filter = clusteringIndexFilter(); + + try + { + for (Memtable memtable : view.memtables) + { + Partition partition = memtable.getPartition(partitionKey()); + if (partition == null) + continue; + + @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition); + @SuppressWarnings("resource") // same as above + UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter; + iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied); + } + + /* + * We can't eliminate full sstables based on the timestamp of what we've already read like + * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone + * we've read. We still rely on the sstable ordering by maxTimestamp since if + * maxTimestamp_s1 > maxTimestamp_s0, + * we're guaranteed that s1 cannot have a row tombstone such that + * timestamp(tombstone) > maxTimestamp_s0 + * since we necessarily have + * timestamp(tombstone) <= maxTimestamp_s1 + * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination + * in one pass, and minimize the number of sstables for which we read a partition tombstone. + */ + int sstablesIterated = 0; + Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); + List<SSTableReader> skippedSSTables = null; + long mostRecentPartitionTombstone = Long.MIN_VALUE; + long minTimestamp = Long.MAX_VALUE; + int nonIntersectingSSTables = 0; + + for (SSTableReader sstable : view.sstables) + { + minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp()); + // if we've already seen a partition tombstone with a timestamp greater + // than the most recent update to this sstable, we can skip it + if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone) + break; + + if (!filter.shouldInclude(sstable)) + { + nonIntersectingSSTables++; + // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely + if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE) + { + if (skippedSSTables == null) + skippedSSTables = new ArrayList<>(); + skippedSSTables.add(sstable); + } + continue; + } + + sstable.incrementReadCount(); + @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())); + iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter); + mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt()); + sstablesIterated++; + } + + int includedDueToTombstones = 0; + // Check for partition tombstones in the skipped sstables + if (skippedSSTables != null) + { + for (SSTableReader sstable : skippedSSTables) + { + if (sstable.getMaxTimestamp() <= minTimestamp) + continue; + + sstable.incrementReadCount(); + @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator + UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())); + if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp) + { + iterators.add(iter); + includedDueToTombstones++; + sstablesIterated++; + } + else + { + iter.close(); + } + } + } + if (Tracing.isTracing()) + Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones", + nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones); + + cfs.metric.updateSSTableIterated(sstablesIterated); + + if (iterators.isEmpty()) + return UnfilteredRowIterators.emptyIterator(cfs.metadata, partitionKey(), filter.isReversed()); + + Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated); + + @SuppressWarnings("resource") // Closed through the closing of the result of that method. + UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec()); + if (!merged.isEmpty()) + { + DecoratedKey key = merged.partitionKey(); + cfs.metric.samplers.get(Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); + } + + return merged; + } + catch (RuntimeException | Error e) + { + try + { + FBUtilities.closeAll(iterators); + } + catch (Exception suppressed) + { + e.addSuppressed(suppressed); + } + throw e; + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Slice.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java new file mode 100644 index 0000000..dae491e --- /dev/null +++ b/src/java/org/apache/cassandra/db/Slice.java @@ -0,0 +1,652 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.ObjectSizes; + +/** + * A slice represents the selection of a range of rows. + * <p> + * A slice has a start and an end bound that are both (potentially full) clustering prefixes. + * A slice selects every rows whose clustering is bigger than the slice start prefix but smaller + * than the end prefix. Both start and end can be either inclusive or exclusive. + */ +public class Slice +{ + public static final Serializer serializer = new Serializer(); + + /** The slice selecting all rows (of a given partition) */ + public static final Slice ALL = new Slice(Bound.BOTTOM, Bound.TOP) + { + @Override + public boolean selects(ClusteringComparator comparator, Clustering clustering) + { + return true; + } + + @Override + public boolean intersects(ClusteringComparator comparator, List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues) + { + return true; + } + + @Override + public String toString(ClusteringComparator comparator) + { + return "ALL"; + } + }; + + private final Bound start; + private final Bound end; + + private Slice(Bound start, Bound end) + { + assert start.isStart() && end.isEnd(); + this.start = start.takeAlias(); + this.end = end.takeAlias(); + } + + public static Slice make(Bound start, Bound end) + { + if (start == Bound.BOTTOM && end == Bound.TOP) + return ALL; + + return new Slice(start, end); + } + + public static Slice make(ClusteringComparator comparator, Object... values) + { + CBuilder builder = CBuilder.create(comparator); + for (int i = 0; i < values.length; i++) + { + Object val = values[i]; + if (val instanceof ByteBuffer) + builder.add((ByteBuffer)val); + else + builder.add(val); + } + return new Slice(builder.buildBound(true, true), builder.buildBound(false, true)); + } + + public static Slice make(Clustering clustering) + { + // This doesn't give us what we want with the clustering prefix + assert clustering != Clustering.STATIC_CLUSTERING; + ByteBuffer[] values = extractValues(clustering); + return new Slice(Bound.inclusiveStartOf(values), Bound.inclusiveEndOf(values)); + } + + public static Slice make(Clustering start, Clustering end) + { + // This doesn't give us what we want with the clustering prefix + assert start != Clustering.STATIC_CLUSTERING && end != Clustering.STATIC_CLUSTERING; + + ByteBuffer[] startValues = extractValues(start); + ByteBuffer[] endValues = extractValues(end); + + return new Slice(Bound.inclusiveStartOf(startValues), Bound.inclusiveEndOf(endValues)); + } + + private static ByteBuffer[] extractValues(ClusteringPrefix clustering) + { + ByteBuffer[] values = new ByteBuffer[clustering.size()]; + for (int i = 0; i < clustering.size(); i++) + values[i] = clustering.get(i); + return values; + } + + public Bound start() + { + return start; + } + + public Bound end() + { + return end; + } + + public Bound open(boolean reversed) + { + return reversed ? end : start; + } + + public Bound close(boolean reversed) + { + return reversed ? start : end; + } + + /** + * Return whether the slice is empty. + * + * @param comparator the comparator to compare the bounds. + * @return whether the slice formed is empty or not. + */ + public boolean isEmpty(ClusteringComparator comparator) + { + return isEmpty(comparator, start(), end()); + } + + /** + * Return whether the slice formed by the two provided bound is empty or not. + * + * @param comparator the comparator to compare the bounds. + * @param start the start for the slice to consider. This must be a start bound. + * @param end the end for the slice to consider. This must be an end bound. + * @return whether the slice formed by {@code start} and {@code end} is + * empty or not. + */ + public static boolean isEmpty(ClusteringComparator comparator, Slice.Bound start, Slice.Bound end) + { + assert start.isStart() && end.isEnd(); + return comparator.compare(end, start) < 0; + } + + /** + * Returns whether a given clustering is selected by this slice. + * + * @param comparator the comparator for the table this is a slice of. + * @param clustering the clustering to test inclusion of. + * + * @return whether {@code clustering} is selected by this slice. + */ + public boolean selects(ClusteringComparator comparator, Clustering clustering) + { + return comparator.compare(start, clustering) <= 0 && comparator.compare(clustering, end) <= 0; + } + + /** + * Returns whether a given bound is included in this slice. + * + * @param comparator the comparator for the table this is a slice of. + * @param bound the bound to test inclusion of. + * + * @return whether {@code bound} is within the bounds of this slice. + */ + public boolean includes(ClusteringComparator comparator, Bound bound) + { + return comparator.compare(start, bound) <= 0 && comparator.compare(bound, end) <= 0; + } + + /** + * Returns a slice for continuing paging from the last returned clustering prefix. + * + * @param comparator the comparator for the table this is a filter for. + * @param lastReturned the last clustering that was returned for the query we are paging for. The + * resulting slices will be such that only results coming stricly after {@code lastReturned} are returned + * (where coming after means "greater than" if {@code !reversed} and "lesser than" otherwise). + * @param inclusive whether or not we want to include the {@code lastReturned} in the newly returned page of results. + * @param reversed whether the query we're paging for is reversed or not. + * + * @return a new slice that selects results coming after {@code lastReturned}, or {@code null} if paging + * the resulting slice selects nothing (i.e. if it originally selects nothing coming after {@code lastReturned}). + */ + public Slice forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed) + { + if (reversed) + { + int cmp = comparator.compare(lastReturned, start); + if (cmp < 0 || (!inclusive && cmp == 0)) + return null; + + cmp = comparator.compare(end, lastReturned); + if (cmp < 0 || (inclusive && cmp == 0)) + return this; + + ByteBuffer[] values = extractValues(lastReturned); + return new Slice(start, inclusive ? Bound.inclusiveEndOf(values) : Bound.exclusiveEndOf(values)); + } + else + { + int cmp = comparator.compare(end, lastReturned); + if (cmp < 0 || (!inclusive && cmp == 0)) + return null; + + cmp = comparator.compare(lastReturned, start); + if (cmp < 0 || (inclusive && cmp == 0)) + return this; + + ByteBuffer[] values = extractValues(lastReturned); + return new Slice(inclusive ? Bound.inclusiveStartOf(values) : Bound.exclusiveStartOf(values), end); + } + } + + /** + * Given the per-clustering column minimum and maximum value a sstable contains, whether or not this slice potentially + * intersects that sstable or not. + * + * @param comparator the comparator for the table this is a slice of. + * @param minClusteringValues the smallest values for each clustering column that a sstable contains. + * @param maxClusteringValues the biggest values for each clustering column that a sstable contains. + * + * @return whether the slice might intersects with the sstable having {@code minClusteringValues} and + * {@code maxClusteringValues}. + */ + public boolean intersects(ClusteringComparator comparator, List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues) + { + // If this slice start after max or end before min, it can't intersect + if (start.compareTo(comparator, maxClusteringValues) > 0 || end.compareTo(comparator, minClusteringValues) < 0) + return false; + + // We could safely return true here, but there's a minor optimization: if the first component + // of the slice is restricted to a single value (typically the slice is [4:5, 4:7]), we can + // check that the second component falls within the min/max for that component (and repeat for + // all components). + for (int j = 0; j < minClusteringValues.size() && j < maxClusteringValues.size(); j++) + { + ByteBuffer s = j < start.size() ? start.get(j) : null; + ByteBuffer f = j < end.size() ? end.get(j) : null; + + // we already know the first component falls within its min/max range (otherwise we wouldn't get here) + if (j > 0 && (j < end.size() && comparator.compareComponent(j, f, minClusteringValues.get(j)) < 0 || + j < start.size() && comparator.compareComponent(j, s, maxClusteringValues.get(j)) > 0)) + return false; + + // if this component isn't equal in the start and finish, we don't need to check any more + if (j >= start.size() || j >= end.size() || comparator.compareComponent(j, s, f) != 0) + break; + } + return true; + } + + public String toString(CFMetaData metadata) + { + return toString(metadata.comparator); + } + + public String toString(ClusteringComparator comparator) + { + StringBuilder sb = new StringBuilder(); + sb.append(start.isInclusive() ? "[" : "("); + for (int i = 0; i < start.size(); i++) + { + if (i > 0) + sb.append(":"); + sb.append(comparator.subtype(i).getString(start.get(i))); + } + sb.append(", "); + for (int i = 0; i < end.size(); i++) + { + if (i > 0) + sb.append(":"); + sb.append(comparator.subtype(i).getString(end.get(i))); + } + sb.append(end.isInclusive() ? "]" : ")"); + return sb.toString(); + } + + @Override + public boolean equals(Object other) + { + if(!(other instanceof Slice)) + return false; + + Slice that = (Slice)other; + return this.start().equals(that.start()) + && this.end().equals(that.end()); + } + + @Override + public int hashCode() + { + return Objects.hash(start(), end()); + } + + public static class Serializer + { + public void serialize(Slice slice, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException + { + Bound.serializer.serialize(slice.start, out, version, types); + Bound.serializer.serialize(slice.end, out, version, types); + } + + public long serializedSize(Slice slice, int version, List<AbstractType<?>> types, TypeSizes sizes) + { + return Bound.serializer.serializedSize(slice.start, version, types, sizes) + + Bound.serializer.serializedSize(slice.end, version, types, sizes); + } + + public Slice deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException + { + Bound start = Bound.serializer.deserialize(in, version, types); + Bound end = Bound.serializer.deserialize(in, version, types); + return new Slice(start, end); + } + } + + /** + * The bound of a slice. + * <p> + * This can be either a start or an end bound, and this can be either inclusive or exclusive. + */ + public static class Bound extends AbstractClusteringPrefix + { + private static final long EMPTY_SIZE = ObjectSizes.measure(new Bound(Kind.INCL_START_BOUND, new ByteBuffer[0])); + public static final Serializer serializer = new Serializer(); + + /** The smallest start bound, i.e. the one that starts before any row. */ + public static final Bound BOTTOM = inclusiveStartOf(); + /** The biggest end bound, i.e. the one that ends after any row. */ + public static final Bound TOP = inclusiveEndOf(); + + protected final Kind kind; + protected final ByteBuffer[] values; + + protected Bound(Kind kind, ByteBuffer[] values) + { + this.kind = kind; + this.values = values; + } + + public static Bound create(Kind kind, ByteBuffer[] values) + { + assert !kind.isBoundary(); + return new Bound(kind, values); + } + + public static Kind boundKind(boolean isStart, boolean isInclusive) + { + return isStart + ? (isInclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND) + : (isInclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND); + } + + public static Bound inclusiveStartOf(ByteBuffer... values) + { + return create(Kind.INCL_START_BOUND, values); + } + + public static Bound inclusiveEndOf(ByteBuffer... values) + { + return create(Kind.INCL_END_BOUND, values); + } + + public static Bound exclusiveStartOf(ByteBuffer... values) + { + return create(Kind.EXCL_START_BOUND, values); + } + + public static Bound exclusiveEndOf(ByteBuffer... values) + { + return create(Kind.EXCL_END_BOUND, values); + } + + public static Bound exclusiveStartOf(ClusteringPrefix prefix) + { + ByteBuffer[] values = new ByteBuffer[prefix.size()]; + for (int i = 0; i < prefix.size(); i++) + values[i] = prefix.get(i); + return exclusiveStartOf(values); + } + + public static Bound inclusiveEndOf(ClusteringPrefix prefix) + { + ByteBuffer[] values = new ByteBuffer[prefix.size()]; + for (int i = 0; i < prefix.size(); i++) + values[i] = prefix.get(i); + return inclusiveEndOf(values); + } + + public static Bound create(ClusteringComparator comparator, boolean isStart, boolean isInclusive, Object... values) + { + CBuilder builder = CBuilder.create(comparator); + for (int i = 0; i < values.length; i++) + { + Object val = values[i]; + if (val instanceof ByteBuffer) + builder.add((ByteBuffer)val); + else + builder.add(val); + } + return builder.buildBound(isStart, isInclusive); + } + + public Kind kind() + { + return kind; + } + + public int size() + { + return values.length; + } + + public ByteBuffer get(int i) + { + return values[i]; + } + + public Bound withNewKind(Kind kind) + { + assert !kind.isBoundary(); + return new Bound(kind, values); + } + + public boolean isStart() + { + return kind().isStart(); + } + + public boolean isEnd() + { + return !isStart(); + } + + public boolean isInclusive() + { + return kind == Kind.INCL_START_BOUND || kind == Kind.INCL_END_BOUND; + } + + public boolean isExclusive() + { + return kind == Kind.EXCL_START_BOUND || kind == Kind.EXCL_END_BOUND; + } + + /** + * Returns the inverse of the current bound. + * <p> + * This invert both start into end (and vice-versa) and inclusive into exclusive (and vice-versa). + * + * @return the invert of this bound. For instance, if this bound is an exlusive start, this return + * an inclusive end with the same values. + */ + public Slice.Bound invert() + { + return withNewKind(kind().invert()); + } + + public ByteBuffer[] getRawValues() + { + return values; + } + + public void digest(MessageDigest digest) + { + for (int i = 0; i < size(); i++) + digest.update(get(i).duplicate()); + FBUtilities.updateWithByte(digest, kind().ordinal()); + } + + public void writeTo(Slice.Bound.Writer writer) + { + super.writeTo(writer); + writer.writeBoundKind(kind()); + } + + // For use by intersects, it's called with the sstable bound opposite to the slice bound + // (so if the slice bound is a start, it's call with the max sstable bound) + private int compareTo(ClusteringComparator comparator, List<ByteBuffer> sstableBound) + { + for (int i = 0; i < sstableBound.size(); i++) + { + // Say the slice bound is a start. It means we're in the case where the max + // sstable bound is say (1:5) while the slice start is (1). So the start + // does start before the sstable end bound (and intersect it). It's the exact + // inverse with a end slice bound. + if (i >= size()) + return isStart() ? -1 : 1; + + int cmp = comparator.compareComponent(i, get(i), sstableBound.get(i)); + if (cmp != 0) + return cmp; + } + + // Say the slice bound is a start. I means we're in the case where the max + // sstable bound is say (1), while the slice start is (1:5). This again means + // that the slice start before the end bound. + if (size() > sstableBound.size()) + return isStart() ? -1 : 1; + + // The slice bound is equal to the sstable bound. Results depends on whether the slice is inclusive or not + return isInclusive() ? 0 : (isStart() ? 1 : -1); + } + + public String toString(CFMetaData metadata) + { + return toString(metadata.comparator); + } + + public String toString(ClusteringComparator comparator) + { + StringBuilder sb = new StringBuilder(); + sb.append(kind()).append("("); + for (int i = 0; i < size(); i++) + { + if (i > 0) + sb.append(", "); + sb.append(comparator.subtype(i).getString(get(i))); + } + return sb.append(")").toString(); + } + + // Overriding to get a more precise type + @Override + public Bound takeAlias() + { + return this; + } + + @Override + public long unsharedHeapSize() + { + return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values); + } + + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values); + } + + public static Builder builder(int size) + { + return new Builder(size); + } + + public interface Writer extends ClusteringPrefix.Writer + { + public void writeBoundKind(Kind kind); + } + + public static class Builder implements Writer + { + private final ByteBuffer[] values; + private Kind kind; + private int idx; + + private Builder(int size) + { + this.values = new ByteBuffer[size]; + } + + public void writeClusteringValue(ByteBuffer value) + { + values[idx++] = value; + } + + public void writeBoundKind(Kind kind) + { + this.kind = kind; + } + + public Slice.Bound build() + { + assert idx == values.length; + return Slice.Bound.create(kind, values); + } + } + + /** + * Serializer for slice bounds. + * <p> + * Contrarily to {@code Clustering}, a slice bound can only be a true prefix of the full clustering, so we actually record + * its size. + */ + public static class Serializer + { + public void serialize(Slice.Bound bound, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException + { + out.writeByte(bound.kind().ordinal()); + out.writeShort(bound.size()); + ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types); + } + + public long serializedSize(Slice.Bound bound, int version, List<AbstractType<?>> types, TypeSizes sizes) + { + return 1 // kind ordinal + + sizes.sizeof((short)bound.size()) + + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types, sizes); + } + + public Slice.Bound deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException + { + Kind kind = Kind.values()[in.readByte()]; + return deserializeValues(in, kind, version, types); + } + + public Slice.Bound deserializeValues(DataInput in, Kind kind, int version, List<AbstractType<?>> types) throws IOException + { + int size = in.readUnsignedShort(); + if (size == 0) + return kind.isStart() ? BOTTOM : TOP; + + Builder builder = builder(size); + ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, builder); + builder.writeBoundKind(kind); + return builder.build(); + } + + public void deserializeValues(DataInput in, Bound.Kind kind, int version, List<AbstractType<?>> types, Writer writer) throws IOException + { + int size = in.readUnsignedShort(); + ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, writer); + writer.writeBoundKind(kind); + } + + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java deleted file mode 100644 index 65eefaa..0000000 --- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.*; -import java.nio.ByteBuffer; - -import com.google.common.base.Objects; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ByteBufferUtil; - -public class SliceByNamesReadCommand extends ReadCommand -{ - static final SliceByNamesReadCommandSerializer serializer = new SliceByNamesReadCommandSerializer(); - - public final NamesQueryFilter filter; - - public SliceByNamesReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, NamesQueryFilter filter) - { - super(keyspaceName, key, cfName, timestamp, Type.GET_BY_NAMES); - this.filter = filter; - } - - public ReadCommand copy() - { - return new SliceByNamesReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery()); - } - - public Row getRow(Keyspace keyspace) - { - DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); - return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp)); - } - - @Override - public String toString() - { - return Objects.toStringHelper(this) - .add("ksName", ksName) - .add("cfName", cfName) - .add("key", ByteBufferUtil.bytesToHex(key)) - .add("filter", filter) - .add("timestamp", timestamp) - .toString(); - } - - public IDiskAtomFilter filter() - { - return filter; - } -} - -class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadCommand> -{ - public void serialize(ReadCommand cmd, DataOutputPlus out, int version) throws IOException - { - SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd; - out.writeBoolean(command.isDigestQuery()); - out.writeUTF(command.ksName); - ByteBufferUtil.writeWithShortLength(command.key, out); - out.writeUTF(command.cfName); - out.writeLong(cmd.timestamp); - - CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName); - metadata.comparator.namesQueryFilterSerializer().serialize(command.filter, out, version); - } - - public ReadCommand deserialize(DataInput in, int version) throws IOException - { - boolean isDigest = in.readBoolean(); - String keyspaceName = in.readUTF(); - ByteBuffer key = ByteBufferUtil.readWithShortLength(in); - String cfName = in.readUTF(); - long timestamp = in.readLong(); - CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); - if (metadata == null) - { - String message = String.format("Got slice command for nonexistent table %s.%s. If the table was just " + - "created, this is likely due to the schema not being fully propagated. Please wait for schema " + - "agreement on table creation.", keyspaceName, cfName); - throw new UnknownColumnFamilyException(message, null); - } - NamesQueryFilter filter = metadata.comparator.namesQueryFilterSerializer().deserialize(in, version); - return new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest); - } - - public long serializedSize(ReadCommand cmd, int version) - { - TypeSizes sizes = TypeSizes.NATIVE; - SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd; - int size = sizes.sizeof(command.isDigestQuery()); - int keySize = command.key.remaining(); - - CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName); - - size += sizes.sizeof(command.ksName); - size += sizes.sizeof((short)keySize) + keySize; - size += sizes.sizeof(command.cfName); - size += sizes.sizeof(cmd.timestamp); - size += metadata.comparator.namesQueryFilterSerializer().serializedSize(command.filter, version); - - return size; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SliceFromReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java deleted file mode 100644 index 6995193..0000000 --- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import com.google.common.base.Objects; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.filter.SliceQueryFilter; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.service.RowDataResolver; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.Pair; - -public class SliceFromReadCommand extends ReadCommand -{ - private static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class); - - static final SliceFromReadCommandSerializer serializer = new SliceFromReadCommandSerializer(); - - public final SliceQueryFilter filter; - - public SliceFromReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter) - { - super(keyspaceName, key, cfName, timestamp, Type.GET_SLICES); - this.filter = filter; - } - - public ReadCommand copy() - { - return new SliceFromReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery()); - } - - public Row getRow(Keyspace keyspace) - { - CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName); - DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); - - // If we're doing a reversed query and the filter includes static columns, we need to issue two separate - // reads in order to guarantee that the static columns are fetched. See CASSANDRA-8502 for more details. - if (filter.reversed && filter.hasStaticSlice(cfm)) - { - logger.debug("Splitting reversed slice with static columns into two reads"); - Pair<SliceQueryFilter, SliceQueryFilter> newFilters = filter.splitOutStaticSlice(cfm); - - Row normalResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.right, timestamp)); - Row staticResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.left, timestamp)); - - // add the static results to the start of the normal results - if (normalResults.cf == null) - return staticResults; - - if (staticResults.cf != null) - for (Cell cell : staticResults.cf.getReverseSortedColumns()) - normalResults.cf.addColumn(cell); - - return normalResults; - } - - return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp)); - } - - @Override - public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row) - { - int maxLiveColumns = resolver.getMaxLiveCount(); - - int count = filter.count; - // We generate a retry if at least one node reply with count live columns but after merge we have less - // than the total number of column we are interested in (which may be < count on a retry). - // So in particular, if no host returned count live columns, we know it's not a short read. - if (maxLiveColumns < count) - return null; - - int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf, timestamp); - if (liveCountInRow < getOriginalRequestedCount()) - { - // We asked t (= count) live columns and got l (=liveCountInRow) ones. - // From that, we can estimate that on this row, for x requested - // columns, only l/t end up live after reconciliation. So for next - // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l. - int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1; - SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount); - return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, newFilter, getOriginalRequestedCount()); - } - - return null; - } - - @Override - public void maybeTrim(Row row) - { - if ((row == null) || (row.cf == null)) - return; - - filter.trim(row.cf, getOriginalRequestedCount(), timestamp); - } - - public IDiskAtomFilter filter() - { - return filter; - } - - public SliceFromReadCommand withUpdatedFilter(SliceQueryFilter newFilter) - { - return new SliceFromReadCommand(ksName, key, cfName, timestamp, newFilter); - } - - /** - * The original number of columns requested by the user. - * This can be different from count when the slice command is a retry (see - * RetriedSliceFromReadCommand) - */ - protected int getOriginalRequestedCount() - { - return filter.count; - } - - @Override - public String toString() - { - return Objects.toStringHelper(this) - .add("ksName", ksName) - .add("cfName", cfName) - .add("key", ByteBufferUtil.bytesToHex(key)) - .add("filter", filter) - .add("timestamp", timestamp) - .toString(); - } -} - -class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand> -{ - public void serialize(ReadCommand rm, DataOutputPlus out, int version) throws IOException - { - SliceFromReadCommand realRM = (SliceFromReadCommand)rm; - out.writeBoolean(realRM.isDigestQuery()); - out.writeUTF(realRM.ksName); - ByteBufferUtil.writeWithShortLength(realRM.key, out); - out.writeUTF(realRM.cfName); - out.writeLong(realRM.timestamp); - CFMetaData metadata = Schema.instance.getCFMetaData(realRM.ksName, realRM.cfName); - metadata.comparator.sliceQueryFilterSerializer().serialize(realRM.filter, out, version); - } - - public ReadCommand deserialize(DataInput in, int version) throws IOException - { - boolean isDigest = in.readBoolean(); - String keyspaceName = in.readUTF(); - ByteBuffer key = ByteBufferUtil.readWithShortLength(in); - String cfName = in.readUTF(); - long timestamp = in.readLong(); - CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); - if (metadata == null) - { - String message = String.format("Got slice command for nonexistent table %s.%s. If the table was just " + - "created, this is likely due to the schema not being fully propagated. Please wait for schema " + - "agreement on table creation.", keyspaceName, cfName); - throw new UnknownColumnFamilyException(message, null); - } - SliceQueryFilter filter = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version); - return new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest); - } - - public long serializedSize(ReadCommand cmd, int version) - { - TypeSizes sizes = TypeSizes.NATIVE; - SliceFromReadCommand command = (SliceFromReadCommand) cmd; - int keySize = command.key.remaining(); - - CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName); - - int size = sizes.sizeof(cmd.isDigestQuery()); // boolean - size += sizes.sizeof(command.ksName); - size += sizes.sizeof((short) keySize) + keySize; - size += sizes.sizeof(command.cfName); - size += sizes.sizeof(cmd.timestamp); - size += metadata.comparator.sliceQueryFilterSerializer().serializedSize(command.filter, version); - - return size; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Slices.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java new file mode 100644 index 0000000..ec7797d --- /dev/null +++ b/src/java/org/apache/cassandra/db/Slices.java @@ -0,0 +1,898 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * Represents the selection of multiple range of rows within a partition. + * <p> + * A {@code Slices} is basically a list of {@code Slice}, though those are guaranteed to be non-overlapping + * and always in clustering order. + */ +public abstract class Slices implements Iterable<Slice> +{ + public static final Serializer serializer = new Serializer(); + + /** Slices selecting all the rows of a partition. */ + public static final Slices ALL = new SelectAllSlices(); + /** Slices selecting no rows in a partition. */ + public static final Slices NONE = new SelectNoSlices(); + + protected Slices() + { + } + + /** + * Creates a {@code Slices} object that contains a single slice. + * + * @param comparator the comparator for the table {@code slice} is a slice of. + * @param slice the single slice that the return object should contains. + * + * @return the newly created {@code Slices} object. + */ + public static Slices with(ClusteringComparator comparator, Slice slice) + { + if (slice.start() == Slice.Bound.BOTTOM && slice.end() == Slice.Bound.TOP) + return Slices.ALL; + + assert comparator.compare(slice.start(), slice.end()) <= 0; + return new ArrayBackedSlices(comparator, new Slice[]{ slice }); + } + + /** + * Whether the slices has a lower bound, that is whether it's first slice start is {@code Slice.BOTTOM}. + * + * @return whether the slices has a lower bound. + */ + public abstract boolean hasLowerBound(); + + /** + * Whether the slices has an upper bound, that is whether it's last slice end is {@code Slice.TOP}. + * + * @return whether the slices has an upper bound. + */ + public abstract boolean hasUpperBound(); + + /** + * The number of slice this object contains. + * + * @return the number of slice this object contains. + */ + public abstract int size(); + + /** + * Returns the ith slice of this {@code Slices} object. + * + * @return the ith slice of this object. + */ + public abstract Slice get(int i); + + /** + * Returns slices for continuing the paging of those slices given the last returned clustering prefix. + * + * @param comparator the comparator for the table this is a filter for. + * @param lastReturned the last clustering that was returned for the query we are paging for. The + * resulting slices will be such that only results coming stricly after {@code lastReturned} are returned + * (where coming after means "greater than" if {@code !reversed} and "lesser than" otherwise). + * @param inclusive whether or not we want to include the {@code lastReturned} in the newly returned page of results. + * @param reversed whether the query we're paging for is reversed or not. + * + * @return new slices that select results coming after {@code lastReturned}. + */ + public abstract Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed); + + /** + * An object that allows to test whether rows are selected by this {@code Slices} objects assuming those rows + * are tested in clustering order. + * + * @param reversed if true, the rows passed to the returned object will be assumed to be in reversed clustering + * order, otherwise they should be in clustering order. + * + * @return an object that tests for selection of rows by this {@code Slices} object. + */ + public abstract InOrderTester inOrderTester(boolean reversed); + + /** + * Whether a given clustering (row) is selected by this {@code Slices} object. + * + * @param clustering the clustering to test for selection. + * + * @return whether a given clustering (row) is selected by this {@code Slices} object. + */ + public abstract boolean selects(Clustering clustering); + + + /** + * Given the per-clustering column minimum and maximum value a sstable contains, whether or not this slices potentially + * intersects that sstable or not. + * + * @param minClusteringValues the smallest values for each clustering column that a sstable contains. + * @param maxClusteringValues the biggest values for each clustering column that a sstable contains. + * + * @return whether the slices might intersects with the sstable having {@code minClusteringValues} and + * {@code maxClusteringValues}. + */ + public abstract boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues); + + /** + * Given a sliceable row iterator, returns a row iterator that only return rows selected by the slice of + * this {@code Slices} object. + * + * @param iter the sliceable iterator to filter. + * + * @return an iterator that only returns the rows (or rather Unfiltered) of {@code iter} that are selected by those slices. + */ + public abstract UnfilteredRowIterator makeSliceIterator(SliceableUnfilteredRowIterator iter); + + public abstract String toCQLString(CFMetaData metadata); + + /** + * In simple object that allows to test the inclusion of rows in those slices assuming those rows + * are passed (to {@link #includes}) in clustering order (or reverse clustering ordered, depending + * of the argument passed to {@link #inOrderTester}). + */ + public interface InOrderTester + { + public boolean includes(Clustering value); + public boolean isDone(); + } + + /** + * Builder to create {@code Slices} objects. + */ + public static class Builder + { + private final ClusteringComparator comparator; + + private final List<Slice> slices; + + private boolean needsNormalizing; + + public Builder(ClusteringComparator comparator) + { + this.comparator = comparator; + this.slices = new ArrayList<>(); + } + + public Builder(ClusteringComparator comparator, int initialSize) + { + this.comparator = comparator; + this.slices = new ArrayList<>(initialSize); + } + + public Builder add(Slice.Bound start, Slice.Bound end) + { + return add(Slice.make(start, end)); + } + + public Builder add(Slice slice) + { + assert comparator.compare(slice.start(), slice.end()) <= 0; + if (slices.size() > 0 && comparator.compare(slices.get(slices.size()-1).end(), slice.start()) > 0) + needsNormalizing = true; + slices.add(slice); + return this; + } + + public int size() + { + return slices.size(); + } + + public Slices build() + { + if (slices.isEmpty()) + return NONE; + + if (slices.size() == 1 && slices.get(0) == Slice.ALL) + return ALL; + + List<Slice> normalized = needsNormalizing + ? normalize(slices) + : slices; + + return new ArrayBackedSlices(comparator, normalized.toArray(new Slice[normalized.size()])); + } + + /** + * Given an array of slices (potentially overlapping and in any order) and return an equivalent array + * of non-overlapping slices in clustering order. + * + * @param slices an array of slices. This may be modified by this method. + * @return the smallest possible array of non-overlapping slices in clustering order. If the original + * slices are already non-overlapping and in comparator order, this may or may not return the provided slices + * directly. + */ + private List<Slice> normalize(List<Slice> slices) + { + if (slices.size() <= 1) + return slices; + + Collections.sort(slices, new Comparator<Slice>() + { + @Override + public int compare(Slice s1, Slice s2) + { + int c = comparator.compare(s1.start(), s2.start()); + if (c != 0) + return c; + + return comparator.compare(s1.end(), s2.end()); + } + }); + + List<Slice> slicesCopy = new ArrayList<>(slices.size()); + + Slice last = slices.get(0); + + for (int i = 1; i < slices.size(); i++) + { + Slice s2 = slices.get(i); + + boolean includesStart = last.includes(comparator, s2.start()); + boolean includesFinish = last.includes(comparator, s2.end()); + + if (includesStart && includesFinish) + continue; + + if (!includesStart && !includesFinish) + { + slicesCopy.add(last); + last = s2; + continue; + } + + if (includesStart) + { + last = Slice.make(last.start(), s2.end()); + continue; + } + + assert !includesFinish; + } + + slicesCopy.add(last); + return slicesCopy; + } + } + + public static class Serializer + { + public void serialize(Slices slices, DataOutputPlus out, int version) throws IOException + { + int size = slices.size(); + out.writeInt(size); + + if (size == 0) + return; + + List<AbstractType<?>> types = slices == ALL + ? Collections.<AbstractType<?>>emptyList() + : ((ArrayBackedSlices)slices).comparator.subtypes(); + + for (Slice slice : slices) + Slice.serializer.serialize(slice, out, version, types); + } + + public long serializedSize(Slices slices, int version, TypeSizes sizes) + { + long size = sizes.sizeof(slices.size()); + + if (slices.size() == 0) + return size; + + List<AbstractType<?>> types = slices instanceof SelectAllSlices + ? Collections.<AbstractType<?>>emptyList() + : ((ArrayBackedSlices)slices).comparator.subtypes(); + + for (Slice slice : slices) + size += Slice.serializer.serializedSize(slice, version, types, sizes); + + return size; + } + + public Slices deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + { + int size = in.readInt(); + + if (size == 0) + return NONE; + + Slice[] slices = new Slice[size]; + for (int i = 0; i < size; i++) + slices[i] = Slice.serializer.deserialize(in, version, metadata.comparator.subtypes()); + + if (size == 1 && slices[0].start() == Slice.Bound.BOTTOM && slices[0].end() == Slice.Bound.TOP) + return ALL; + + return new ArrayBackedSlices(metadata.comparator, slices); + } + } + + /** + * Simple {@code Slices} implementation that stores its slices in an array. + */ + private static class ArrayBackedSlices extends Slices + { + private final ClusteringComparator comparator; + + private final Slice[] slices; + + private ArrayBackedSlices(ClusteringComparator comparator, Slice[] slices) + { + this.comparator = comparator; + this.slices = slices; + } + + public int size() + { + return slices.length; + } + + public boolean hasLowerBound() + { + return slices[0].start().size() != 0; + } + + public boolean hasUpperBound() + { + return slices[slices.length - 1].end().size() != 0; + } + + public Slice get(int i) + { + return slices[i]; + } + + public boolean selects(Clustering clustering) + { + for (int i = 0; i < slices.length; i++) + { + Slice slice = slices[i]; + if (comparator.compare(clustering, slice.start()) < 0) + return false; + + if (comparator.compare(clustering, slice.end()) <= 0) + return true; + } + return false; + } + + public InOrderTester inOrderTester(boolean reversed) + { + return reversed ? new InReverseOrderTester() : new InForwardOrderTester(); + } + + public Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed) + { + return reversed ? forReversePaging(comparator, lastReturned, inclusive) : forForwardPaging(comparator, lastReturned, inclusive); + } + + private Slices forForwardPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive) + { + for (int i = 0; i < slices.length; i++) + { + Slice slice = slices[i]; + Slice newSlice = slice.forPaging(comparator, lastReturned, inclusive, false); + if (newSlice == null) + continue; + + if (slice == newSlice && i == 0) + return this; + + ArrayBackedSlices newSlices = new ArrayBackedSlices(comparator, Arrays.copyOfRange(slices, i, slices.length)); + newSlices.slices[0] = newSlice; + return newSlices; + } + return Slices.NONE; + } + + private Slices forReversePaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive) + { + for (int i = slices.length - 1; i >= 0; i--) + { + Slice slice = slices[i]; + Slice newSlice = slice.forPaging(comparator, lastReturned, inclusive, true); + if (newSlice == null) + continue; + + if (slice == newSlice && i == slices.length - 1) + return this; + + ArrayBackedSlices newSlices = new ArrayBackedSlices(comparator, Arrays.copyOfRange(slices, 0, i + 1)); + newSlices.slices[i] = newSlice; + return newSlices; + } + return Slices.NONE; + } + + public boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues) + { + for (Slice slice : this) + { + if (slice.intersects(comparator, minClusteringValues, maxClusteringValues)) + return true; + } + return false; + } + + public UnfilteredRowIterator makeSliceIterator(final SliceableUnfilteredRowIterator iter) + { + return new WrappingUnfilteredRowIterator(iter) + { + private int nextSlice = iter.isReverseOrder() ? slices.length - 1 : 0; + private Iterator<Unfiltered> currentSliceIterator = Collections.emptyIterator(); + + private Unfiltered next; + + @Override + public boolean hasNext() + { + prepareNext(); + return next != null; + } + + @Override + public Unfiltered next() + { + prepareNext(); + Unfiltered toReturn = next; + next = null; + return toReturn; + } + + private boolean hasMoreSlice() + { + return isReverseOrder() + ? nextSlice >= 0 + : nextSlice < slices.length; + } + + private Slice popNextSlice() + { + return slices[isReverseOrder() ? nextSlice-- : nextSlice++]; + } + + private void prepareNext() + { + if (next != null) + return; + + while (true) + { + if (currentSliceIterator.hasNext()) + { + next = currentSliceIterator.next(); + return; + } + + if (!hasMoreSlice()) + return; + + currentSliceIterator = iter.slice(popNextSlice()); + } + } + }; + } + + public Iterator<Slice> iterator() + { + return Iterators.forArray(slices); + } + + private class InForwardOrderTester implements InOrderTester + { + private int idx; + private boolean inSlice; + + public boolean includes(Clustering value) + { + while (idx < slices.length) + { + if (!inSlice) + { + int cmp = comparator.compare(value, slices[idx].start()); + // value < start + if (cmp < 0) + return false; + + inSlice = true; + + if (cmp == 0) + return true; + } + + // Here, start < value and inSlice + if (comparator.compare(value, slices[idx].end()) <= 0) + return true; + + ++idx; + inSlice = false; + } + return false; + } + + public boolean isDone() + { + return idx >= slices.length; + } + } + + private class InReverseOrderTester implements InOrderTester + { + private int idx; + private boolean inSlice; + + public InReverseOrderTester() + { + this.idx = slices.length - 1; + } + + public boolean includes(Clustering value) + { + while (idx >= 0) + { + if (!inSlice) + { + int cmp = comparator.compare(slices[idx].end(), value); + // value > end + if (cmp > 0) + return false; + + inSlice = true; + + if (cmp == 0) + return true; + } + + // Here, value <= end and inSlice + if (comparator.compare(slices[idx].start(), value) <= 0) + return true; + + --idx; + inSlice = false; + } + return false; + } + + public boolean isDone() + { + return idx < 0; + } + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + for (int i = 0; i < slices.length; i++) + { + if (i > 0) + sb.append(", "); + sb.append(slices[i].toString(comparator)); + } + return sb.append("}").toString(); + } + + public String toCQLString(CFMetaData metadata) + { + StringBuilder sb = new StringBuilder(); + + // In CQL, condition are expressed by column, so first group things that way, + // i.e. for each column, we create a list of what each slice contains on that column + int clusteringSize = metadata.clusteringColumns().size(); + List<List<ComponentOfSlice>> columnComponents = new ArrayList<>(clusteringSize); + for (int i = 0; i < clusteringSize; i++) + { + List<ComponentOfSlice> perSlice = new ArrayList<>(); + columnComponents.add(perSlice); + + for (int j = 0; j < slices.length; j++) + { + ComponentOfSlice c = ComponentOfSlice.fromSlice(i, slices[j]); + if (c != null) + perSlice.add(c); + } + } + + boolean needAnd = false; + for (int i = 0; i < clusteringSize; i++) + { + ColumnDefinition column = metadata.clusteringColumns().get(i); + List<ComponentOfSlice> componentInfo = columnComponents.get(i); + if (componentInfo.isEmpty()) + break; + + // For a given column, there is only 3 cases that CQL currently generates: + // 1) every slice are EQ with the same value, it's a simple '=' relation. + // 2) every slice are EQ but with different values, it's a IN relation. + // 3) every slice aren't EQ but have the same values, we have inequality relations. + // Note that this doesn't cover everything that ReadCommand can express, but + // as it's all that CQL support for now, we'll ignore other cases (which would then + // display a bogus query but that's not the end of the world). + // TODO: we should improve this at some point. + ComponentOfSlice first = componentInfo.get(0); + if (first.isEQ()) + { + if (needAnd) + sb.append(" AND "); + needAnd = true; + + sb.append(column.name); + + Set<ByteBuffer> values = new LinkedHashSet<>(); + for (int j = 0; j < componentInfo.size(); j++) + values.add(componentInfo.get(j).startValue); + + if (values.size() == 1) + { + sb.append(" = ").append(column.type.getString(first.startValue)); + } + else + { + sb.append(" IN ("); + int j = 0; + for (ByteBuffer value : values) + sb.append(j++ == 0 ? "" : ", ").append(column.type.getString(value)); + sb.append(")"); + } + } + else + { + // As said above, we assume (without checking) that this means all ComponentOfSlice for this column + // are the same, so we only bother about the first. + if (first.startValue != null) + { + if (needAnd) + sb.append(" AND "); + needAnd = true; + sb.append(column.name).append(first.startInclusive ? " >= " : " > ").append(column.type.getString(first.startValue)); + } + if (first.endValue != null) + { + if (needAnd) + sb.append(" AND "); + needAnd = true; + sb.append(column.name).append(first.endInclusive ? " <= " : " < ").append(column.type.getString(first.endValue)); + } + } + } + return sb.toString(); + } + + // An somewhat adhoc utility class only used by toCQLString + private static class ComponentOfSlice + { + public final boolean startInclusive; + public final ByteBuffer startValue; + public final boolean endInclusive; + public final ByteBuffer endValue; + + private ComponentOfSlice(boolean startInclusive, ByteBuffer startValue, boolean endInclusive, ByteBuffer endValue) + { + this.startInclusive = startInclusive; + this.startValue = startValue; + this.endInclusive = endInclusive; + this.endValue = endValue; + } + + public static ComponentOfSlice fromSlice(int component, Slice slice) + { + Slice.Bound start = slice.start(); + Slice.Bound end = slice.end(); + + if (component >= start.size() && component >= end.size()) + return null; + + boolean startInclusive = true, endInclusive = true; + ByteBuffer startValue = null, endValue = null; + if (component < start.size()) + { + startInclusive = start.isInclusive(); + startValue = start.get(component); + } + if (component < end.size()) + { + endInclusive = end.isInclusive(); + endValue = end.get(component); + } + return new ComponentOfSlice(startInclusive, startValue, endInclusive, endValue); + } + + public boolean isEQ() + { + return startValue.equals(endValue); + } + } + } + + /** + * Specialized implementation of {@code Slices} that selects all rows. + * <p> + * This is equivalent to having the single {@code Slice.ALL} slice, but is somewhat more effecient. + */ + private static class SelectAllSlices extends Slices + { + private static final InOrderTester trivialTester = new InOrderTester() + { + public boolean includes(Clustering value) + { + return true; + } + + public boolean isDone() + { + return false; + } + }; + + public int size() + { + return 1; + } + + public Slice get(int i) + { + return Slice.ALL; + } + + public boolean hasLowerBound() + { + return false; + } + + public boolean hasUpperBound() + { + return false; + } + + public boolean selects(Clustering clustering) + { + return true; + } + + public Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed) + { + return new ArrayBackedSlices(comparator, new Slice[]{ Slice.ALL.forPaging(comparator, lastReturned, inclusive, reversed) }); + } + + public InOrderTester inOrderTester(boolean reversed) + { + return trivialTester; + } + + public boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues) + { + return true; + } + + public UnfilteredRowIterator makeSliceIterator(SliceableUnfilteredRowIterator iter) + { + return iter; + } + + public Iterator<Slice> iterator() + { + return Iterators.singletonIterator(Slice.ALL); + } + + @Override + public String toString() + { + return "ALL"; + } + + public String toCQLString(CFMetaData metadata) + { + return ""; + } + } + + /** + * Specialized implementation of {@code Slices} that selects no rows. + */ + private static class SelectNoSlices extends Slices + { + private static final InOrderTester trivialTester = new InOrderTester() + { + public boolean includes(Clustering value) + { + return false; + } + + public boolean isDone() + { + return true; + } + }; + + public int size() + { + return 0; + } + + public Slice get(int i) + { + throw new UnsupportedOperationException(); + } + + public boolean hasLowerBound() + { + return false; + } + + public boolean hasUpperBound() + { + return false; + } + + public Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed) + { + return this; + } + + public boolean selects(Clustering clustering) + { + return false; + } + + public InOrderTester inOrderTester(boolean reversed) + { + return trivialTester; + } + + public boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues) + { + return false; + } + + public UnfilteredRowIterator makeSliceIterator(SliceableUnfilteredRowIterator iter) + { + return UnfilteredRowIterators.emptyIterator(iter.metadata(), iter.partitionKey(), iter.isReverseOrder()); + } + + public Iterator<Slice> iterator() + { + return Iterators.emptyIterator(); + } + + @Override + public String toString() + { + return "NONE"; + } + + public String toCQLString(CFMetaData metadata) + { + return ""; + } + } +}
