Repository: cassandra Updated Branches: refs/heads/trunk 79f4bdb20 -> 220f253fb
Fix bug with range tombstone in reverse queries and Partition test coverage patch by blambov; reviewed by slebresne for CASSANDRA-10059 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e978df3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e978df3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e978df3 Branch: refs/heads/trunk Commit: 1e978df3a80a8201ce95253e3fd822fd804993d7 Parents: 814bc70 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Wed Nov 11 10:43:54 2015 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Dec 4 10:08:51 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/RangeTombstoneList.java | 47 +- .../apache/cassandra/utils/SearchIterator.java | 3 +- test/unit/org/apache/cassandra/Util.java | 23 + .../partition/PartitionImplementationTest.java | 524 +++++++++++++++++++ 5 files changed, 566 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e978df3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f6aed18..2527c43 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 3.0.1 + * Fix bug with range tombstones on reverse queries and test coverage for + AbstractBTreePartition (CASSANDRA-10059) * Remove 64k limit on collection elements (CASSANDRA-10374) * Remove unclear Indexer.indexes() method (CASSANDRA-10690) * Fix NPE on stream read error (CASSANDRA-10771) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e978df3/src/java/org/apache/cassandra/db/RangeTombstoneList.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java index c92a296..c67ea33 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java +++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java @@ -17,21 +17,16 @@ */ package org.apache.cassandra.db; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import org.apache.cassandra.utils.AbstractIterator; import com.google.common.collect.Iterators; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.memory.AbstractAllocator; @@ -53,8 +48,6 @@ import org.apache.cassandra.utils.memory.AbstractAllocator; */ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurableMemory { - private static final Logger logger = LoggerFactory.getLogger(RangeTombstoneList.class); - private static long EMPTY_SIZE = ObjectSizes.measure(new RangeTombstoneList(null, 0)); private final ClusteringComparator comparator; @@ -265,6 +258,8 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable /* * Return is the index of the range covering name if name is covered. If the return idx is negative, * no range cover name and -idx-1 is the index of the first range whose start is greater than name. + * + * Note that bounds are not in the range if they fall on its boundary. */ private int searchInternal(ClusteringPrefix name, int startIdx, int endIdx) { @@ -274,7 +269,9 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable int pos = Arrays.binarySearch(starts, startIdx, endIdx, name, comparator); if (pos >= 0) { - return pos; + // Equality only happens for bounds (as used by forward/reverseIterator), and bounds are equal only if they + // are the same or complementary, in either case the bound itself is not part of the range. + return -pos - 1; } else { @@ -283,7 +280,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable if (idx < 0) return -1; - return comparator.compare(name, ends[idx]) <= 0 ? idx : -idx-2; + return comparator.compare(name, ends[idx]) < 0 ? idx : -idx-2; } } @@ -387,14 +384,14 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable final int start = startIdx < 0 ? -startIdx-1 : startIdx; if (start >= size) - return Iterators.<RangeTombstone>emptyIterator(); + return Collections.emptyIterator(); int finishIdx = slice.end() == Slice.Bound.TOP ? size - 1 : searchInternal(slice.end(), start, size); // if stopIdx is the first range after 'slice.end()' we care only until the previous range final int finish = finishIdx < 0 ? -finishIdx-2 : finishIdx; if (start > finish) - return Iterators.<RangeTombstone>emptyIterator(); + return Collections.emptyIterator(); if (start == finish) { @@ -428,19 +425,19 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable private Iterator<RangeTombstone> reverseIterator(final Slice slice) { - int startIdx = slice.end() == Slice.Bound.TOP ? 0 : searchInternal(slice.end(), 0, size); + int startIdx = slice.end() == Slice.Bound.TOP ? size - 1 : searchInternal(slice.end(), 0, size); // if startIdx is the first range after 'slice.end()' we care only until the previous range final int start = startIdx < 0 ? -startIdx-2 : startIdx; - if (start >= size) - return Iterators.<RangeTombstone>emptyIterator(); + if (start < 0) + return Collections.emptyIterator(); - int finishIdx = slice.start() == Slice.Bound.BOTTOM ? 0 : searchInternal(slice.start(), 0, start); + int finishIdx = slice.start() == Slice.Bound.BOTTOM ? 0 : searchInternal(slice.start(), 0, start + 1); // include same as finish // if stopIdx is the first range after 'slice.end()' we care only until the previous range final int finish = finishIdx < 0 ? -finishIdx-1 : finishIdx; if (start < finish) - return Iterators.<RangeTombstone>emptyIterator(); + return Collections.emptyIterator(); if (start == finish) { @@ -467,7 +464,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable return rangeTombstoneWithNewEnd(idx--, slice.end()); if (idx == finish && comparator.compare(starts[idx], slice.start()) < 0) return rangeTombstoneWithNewStart(idx--, slice.start()); - return rangeTombstone(idx++); + return rangeTombstone(idx--); } }; } @@ -665,20 +662,6 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable size++; } - private void removeInternal(int i) - { - assert i >= 0; - - System.arraycopy(starts, i+1, starts, i, size - i - 1); - System.arraycopy(ends, i+1, ends, i, size - i - 1); - System.arraycopy(markedAts, i+1, markedAts, i, size - i - 1); - System.arraycopy(delTimes, i+1, delTimes, i, size - i - 1); - - --size; - starts[size] = null; - ends[size] = null; - } - /* * Grow the arrays, leaving index i "free" in the process. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e978df3/src/java/org/apache/cassandra/utils/SearchIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/SearchIterator.java b/src/java/org/apache/cassandra/utils/SearchIterator.java index ca7b2fa..5309f4a 100644 --- a/src/java/org/apache/cassandra/utils/SearchIterator.java +++ b/src/java/org/apache/cassandra/utils/SearchIterator.java @@ -23,7 +23,8 @@ public interface SearchIterator<K, V> /** * Searches "forwards" (in direction of travel) in the iterator for the required key; - * if this or any key greater has already been returned by the iterator, null will be returned. + * if this or any key greater has already been returned by the iterator, the method may + * choose to return null, the correct or incorrect output, or fail an assertion. * * it is permitted to search past the end of the iterator, i.e. !hasNext() => next(?) == null * http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e978df3/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index ea0bd9b..91ec6b6 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -538,4 +538,27 @@ public class Util { return () -> new AssertionError(message); } + + public static class UnfilteredSource extends AbstractUnfilteredRowIterator implements UnfilteredRowIterator + { + Iterator<Unfiltered> content; + + public UnfilteredSource(CFMetaData cfm, DecoratedKey partitionKey, Row staticRow, Iterator<Unfiltered> content) + { + super(cfm, + partitionKey, + DeletionTime.LIVE, + cfm.partitionColumns(), + staticRow != null ? staticRow : Rows.EMPTY_STATIC_ROW, + false, + EncodingStats.NO_STATS); + this.content = content; + } + + @Override + protected Unfiltered computeNext() + { + return content.hasNext() ? content.next() : endOfData(); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e978df3/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java b/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java new file mode 100644 index 0000000..f215331 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partition; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Slice.Bound; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.partitions.AbstractBTreePartition; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.rows.Row.Deletion; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.SearchIterator; + +public class PartitionImplementationTest +{ + private static final String KEYSPACE = "PartitionImplementationTest"; + private static final String CF = "Standard"; + + private static final int ENTRIES = 250; + private static final int TESTS = 1000; + private static final int KEY_RANGE = ENTRIES * 5; + + private static final int TIMESTAMP = KEY_RANGE + 1; + + private static CFMetaData cfm; + private Random rand = new Random(2); + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + + cfm = CFMetaData.Builder.create(KEYSPACE, CF) + .addPartitionKey("pk", AsciiType.instance) + .addClusteringColumn("ck", AsciiType.instance) + .addRegularColumn("col", AsciiType.instance) + .addStaticColumn("static_col", AsciiType.instance) + .build(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + cfm); + } + + private List<Row> generateRows() + { + List<Row> content = new ArrayList<>(); + Set<Integer> keysUsed = new HashSet<>(); + for (int i = 0; i < ENTRIES; ++i) + { + int rk; + do + { + rk = rand.nextInt(KEY_RANGE); + } + while (!keysUsed.add(rk)); + content.add(makeRow(clustering(rk), "Col" + rk)); + } + return content; // not sorted + } + + Row makeRow(Clustering clustering, String colValue) + { + ColumnDefinition defCol = cfm.getColumnDefinition(new ColumnIdentifier("col", true)); + Row.Builder row = BTreeRow.unsortedBuilder(TIMESTAMP); + row.newRow(clustering); + row.addCell(BufferCell.live(cfm, defCol, TIMESTAMP, ByteBufferUtil.bytes(colValue))); + return row.build(); + } + + Row makeStaticRow() + { + ColumnDefinition defCol = cfm.getColumnDefinition(new ColumnIdentifier("static_col", true)); + Row.Builder row = BTreeRow.unsortedBuilder(TIMESTAMP); + row.newRow(Clustering.STATIC_CLUSTERING); + row.addCell(BufferCell.live(cfm, defCol, TIMESTAMP, ByteBufferUtil.bytes("static value"))); + return row.build(); + } + + private List<Unfiltered> generateMarkersOnly() + { + return addMarkers(new ArrayList<>()); + } + + private List<Unfiltered> generateUnfiltereds() + { + List<Unfiltered> content = new ArrayList<>(generateRows()); + return addMarkers(content); + } + + List<Unfiltered> addMarkers(List<Unfiltered> content) + { + List<RangeTombstoneMarker> markers = new ArrayList<>(); + Set<Integer> delTimes = new HashSet<>(); + for (int i = 0; i < ENTRIES / 10; ++i) + { + int delTime; + do + { + delTime = rand.nextInt(KEY_RANGE); + } + while (!delTimes.add(delTime)); + + int start = rand.nextInt(KEY_RANGE); + DeletionTime dt = new DeletionTime(delTime, delTime); + RangeTombstoneMarker open = RangeTombstoneBoundMarker.inclusiveOpen(false, clustering(start).getRawValues(), dt); + int end = start + rand.nextInt((KEY_RANGE - start) / 4 + 1); + RangeTombstoneMarker close = RangeTombstoneBoundMarker.inclusiveClose(false, clustering(end).getRawValues(), dt); + markers.add(open); + markers.add(close); + } + markers.sort(cfm.comparator); + + RangeTombstoneMarker toAdd = null; + Set<DeletionTime> open = new HashSet<>(); + DeletionTime current = DeletionTime.LIVE; + for (RangeTombstoneMarker marker : markers) + { + if (marker.isOpen(false)) + { + DeletionTime delTime = marker.openDeletionTime(false); + open.add(delTime); + if (delTime.supersedes(current)) + { + if (toAdd != null) + { + if (cfm.comparator.compare(toAdd, marker) != 0) + content.add(toAdd); + else + { + // gotta join + current = toAdd.isClose(false) ? toAdd.closeDeletionTime(false) : DeletionTime.LIVE; + } + } + if (current != DeletionTime.LIVE) + marker = RangeTombstoneBoundaryMarker.makeBoundary(false, marker.openBound(false).invert(), marker.openBound(false), current, delTime); + toAdd = marker; + current = delTime; + } + } + else + { + assert marker.isClose(false); + DeletionTime delTime = marker.closeDeletionTime(false); + boolean removed = open.remove(delTime); + assert removed; + if (current.equals(delTime)) + { + if (toAdd != null) + { + if (cfm.comparator.compare(toAdd, marker) != 0) + content.add(toAdd); + else + { + // gotta join + current = toAdd.closeDeletionTime(false); + marker = new RangeTombstoneBoundMarker(marker.closeBound(false), current); + } + } + DeletionTime best = open.stream().max(DeletionTime::compareTo).orElse(DeletionTime.LIVE); + if (best != DeletionTime.LIVE) + marker = RangeTombstoneBoundaryMarker.makeBoundary(false, marker.closeBound(false), marker.closeBound(false).invert(), current, best); + toAdd = marker; + current = best; + } + } + } + content.add(toAdd); + assert current == DeletionTime.LIVE; + assert open.isEmpty(); + return content; + } + + private Clustering clustering(int i) + { + return cfm.comparator.make(String.format("Row%06d", i)); + } + + private void test(Supplier<Collection<? extends Unfiltered>> content, Row staticRow) + { + for (int i = 0; i<TESTS; ++i) + { + try + { + rand = new Random(i); + testIter(content, staticRow); + } + catch (Throwable t) + { + throw new AssertionError("Test failed with seed " + i, t); + } + } + } + + private void testIter(Supplier<Collection<? extends Unfiltered>> contentSupplier, Row staticRow) + { + NavigableSet<Clusterable> sortedContent = new TreeSet<Clusterable>(cfm.comparator); + sortedContent.addAll(contentSupplier.get()); + AbstractBTreePartition partition; + try (UnfilteredRowIterator iter = new Util.UnfilteredSource(cfm, Util.dk("pk"), staticRow, sortedContent.stream().map(x -> (Unfiltered) x).iterator())) + { + partition = ImmutableBTreePartition.create(iter); + } + + ColumnDefinition defCol = cfm.getColumnDefinition(new ColumnIdentifier("col", true)); + ColumnFilter cf = ColumnFilter.selectionBuilder().add(defCol).build(); + Function<? super Clusterable, ? extends Clusterable> colFilter = x -> x instanceof Row ? ((Row) x).filter(cf, cfm) : x; + Slices slices = Slices.with(cfm.comparator, Slice.make(clustering(KEY_RANGE / 4), clustering(KEY_RANGE * 3 / 4))); + Slices multiSlices = makeSlices(); + + // lastRow + assertRowsEqual((Row) get(sortedContent.descendingSet(), x -> x instanceof Row), + partition.lastRow()); + // get(static) + assertRowsEqual(staticRow, + partition.getRow(Clustering.STATIC_CLUSTERING)); + + // get + for (int i=0; i < KEY_RANGE; ++i) + { + Clustering cl = clustering(i); + assertRowsEqual(getRow(sortedContent, cl), + partition.getRow(cl)); + } + // isEmpty + assertEquals(sortedContent.isEmpty() && staticRow == null, + partition.isEmpty()); + // hasRows + assertEquals(sortedContent.stream().anyMatch(x -> x instanceof Row), + partition.hasRows()); + + // iterator + assertIteratorsEqual(sortedContent.stream().filter(x -> x instanceof Row).iterator(), + partition.iterator()); + + // unfiltered iterator + assertIteratorsEqual(sortedContent.iterator(), + partition.unfilteredIterator()); + + // unfiltered iterator + assertIteratorsEqual(sortedContent.iterator(), + partition.unfilteredIterator(ColumnFilter.all(cfm), Slices.ALL, false)); + // column-filtered + assertIteratorsEqual(sortedContent.stream().map(colFilter).iterator(), + partition.unfilteredIterator(cf, Slices.ALL, false)); + // sliced + assertIteratorsEqual(slice(sortedContent, slices.get(0)), + partition.unfilteredIterator(ColumnFilter.all(cfm), slices, false)); + assertIteratorsEqual(streamOf(slice(sortedContent, slices.get(0))).map(colFilter).iterator(), + partition.unfilteredIterator(cf, slices, false)); + // randomly multi-sliced + assertIteratorsEqual(slice(sortedContent, multiSlices), + partition.unfilteredIterator(ColumnFilter.all(cfm), multiSlices, false)); + assertIteratorsEqual(streamOf(slice(sortedContent, multiSlices)).map(colFilter).iterator(), + partition.unfilteredIterator(cf, multiSlices, false)); + // reversed + assertIteratorsEqual(sortedContent.descendingIterator(), + partition.unfilteredIterator(ColumnFilter.all(cfm), Slices.ALL, true)); + assertIteratorsEqual(sortedContent.descendingSet().stream().map(colFilter).iterator(), + partition.unfilteredIterator(cf, Slices.ALL, true)); + assertIteratorsEqual(invert(slice(sortedContent, slices.get(0))), + partition.unfilteredIterator(ColumnFilter.all(cfm), slices, true)); + assertIteratorsEqual(streamOf(invert(slice(sortedContent, slices.get(0)))).map(colFilter).iterator(), + partition.unfilteredIterator(cf, slices, true)); + assertIteratorsEqual(invert(slice(sortedContent, multiSlices)), + partition.unfilteredIterator(ColumnFilter.all(cfm), multiSlices, true)); + assertIteratorsEqual(streamOf(invert(slice(sortedContent, multiSlices))).map(colFilter).iterator(), + partition.unfilteredIterator(cf, multiSlices, true)); + + // search iterator + testSearchIterator(sortedContent, partition, ColumnFilter.all(cfm), false); + testSearchIterator(sortedContent, partition, cf, false); + testSearchIterator(sortedContent, partition, ColumnFilter.all(cfm), true); + testSearchIterator(sortedContent, partition, cf, true); + + // sliceable iter + testSliceableIterator(sortedContent, partition, ColumnFilter.all(cfm), false); + testSliceableIterator(sortedContent, partition, cf, false); + testSliceableIterator(sortedContent, partition, ColumnFilter.all(cfm), true); + testSliceableIterator(sortedContent, partition, cf, true); + } + + void testSearchIterator(NavigableSet<Clusterable> sortedContent, Partition partition, ColumnFilter cf, boolean reversed) + { + SearchIterator<Clustering, Row> searchIter = partition.searchIterator(cf, reversed); + int pos = reversed ? KEY_RANGE : 0; + int mul = reversed ? -1 : 1; + boolean started = false; + while (searchIter.hasNext()) + { + int skip = rand.nextInt(KEY_RANGE / 10); + pos += skip * mul; + Clustering cl = clustering(pos); + Row row = searchIter.next(cl); // returns row with deletion, incl. empty row with deletion + if (row == null && skip == 0 && started) // allowed to return null if already reported row + continue; + started = true; + Row expected = getRow(sortedContent, cl); + assertEquals(expected == null, row == null); + if (row == null) + continue; + assertRowsEqual(expected.filter(cf, cfm), row); + } + } + + Slices makeSlices() + { + int pos = 0; + Slices.Builder builder = new Slices.Builder(cfm.comparator); + while (pos <= KEY_RANGE) + { + int skip = rand.nextInt(KEY_RANGE / 10) * (rand.nextInt(3) + 2 / 3); // increased chance of getting 0 + pos += skip; + int sz = rand.nextInt(KEY_RANGE / 10) + (skip == 0 ? 1 : 0); // if start is exclusive need at least sz 1 + Clustering start = clustering(pos); + pos += sz; + Clustering end = clustering(pos); + Slice slice = Slice.make(skip == 0 ? Bound.exclusiveStartOf(start) : Bound.inclusiveStartOf(start), Bound.inclusiveEndOf(end)); + builder.add(slice); + } + return builder.build(); + } + + void testSliceableIterator(NavigableSet<Clusterable> sortedContent, AbstractBTreePartition partition, ColumnFilter cf, boolean reversed) + { + Function<? super Clusterable, ? extends Clusterable> colFilter = x -> x instanceof Row ? ((Row) x).filter(cf, cfm) : x; + Slices slices = makeSlices(); + try (SliceableUnfilteredRowIterator sliceableIter = partition.sliceableUnfilteredIterator(cf, reversed)) + { + for (Slice slice : (Iterable<Slice>) () -> directed(slices, reversed)) + assertIteratorsEqual(streamOf(directed(slice(sortedContent, slice), reversed)).map(colFilter).iterator(), + sliceableIter.slice(slice)); + } + + // Try using sliceable as unfiltered iterator + try (SliceableUnfilteredRowIterator sliceableIter = partition.sliceableUnfilteredIterator(cf, reversed)) + { + assertIteratorsEqual((reversed ? sortedContent.descendingSet() : sortedContent). + stream().map(colFilter).iterator(), + sliceableIter); + } + } + + private<T> Iterator<T> invert(Iterator<T> slice) + { + Deque<T> dest = new LinkedList<>(); + Iterators.addAll(dest, slice); + return dest.descendingIterator(); + } + + private Iterator<Clusterable> slice(NavigableSet<Clusterable> sortedContent, Slices slices) + { + return Iterators.concat(streamOf(slices).map(slice -> slice(sortedContent, slice)).iterator()); + } + + private Iterator<Clusterable> slice(NavigableSet<Clusterable> sortedContent, Slice slice) + { + // Slice bounds are inclusive bounds, equal only to markers. Matched markers should be returned as one-sided boundaries. + RangeTombstoneMarker prev = (RangeTombstoneMarker) sortedContent.headSet(slice.start(), true).descendingSet().stream().filter(x -> x instanceof RangeTombstoneMarker).findFirst().orElse(null); + RangeTombstoneMarker next = (RangeTombstoneMarker) sortedContent.tailSet(slice.end(), true).stream().filter(x -> x instanceof RangeTombstoneMarker).findFirst().orElse(null); + Iterator<Clusterable> result = sortedContent.subSet(slice.start(), false, slice.end(), false).iterator(); + if (prev != null && prev.isOpen(false)) + result = Iterators.concat(Iterators.singletonIterator(new RangeTombstoneBoundMarker(slice.start(), prev.openDeletionTime(false))), result); + if (next != null && next.isClose(false)) + result = Iterators.concat(result, Iterators.singletonIterator(new RangeTombstoneBoundMarker(slice.end(), next.closeDeletionTime(false)))); + return result; + } + + private Iterator<Slice> directed(Slices slices, boolean reversed) + { + return directed(slices.iterator(), reversed); + } + + private <T> Iterator<T> directed(Iterator<T> iter, boolean reversed) + { + if (!reversed) + return iter; + return invert(iter); + } + + private <T> Stream<T> streamOf(Iterator<T> iterator) + { + Iterable<T> iterable = () -> iterator; + return streamOf(iterable); + } + + <T> Stream<T> streamOf(Iterable<T> iterable) + { + return StreamSupport.stream(iterable.spliterator(), false); + } + + private void assertIteratorsEqual(Iterator<? extends Clusterable> it1, Iterator<? extends Clusterable> it2) + { + Clusterable[] a1 = (Clusterable[]) Iterators.toArray(it1, Clusterable.class); + Clusterable[] a2 = (Clusterable[]) Iterators.toArray(it2, Clusterable.class); + if (Arrays.equals(a1, a2)) + return; + String a1s = Stream.of(a1).map(x -> "\n" + (x instanceof Unfiltered ? ((Unfiltered) x).toString(cfm) : x.toString())).collect(Collectors.toList()).toString(); + String a2s = Stream.of(a2).map(x -> "\n" + (x instanceof Unfiltered ? ((Unfiltered) x).toString(cfm) : x.toString())).collect(Collectors.toList()).toString(); + assertArrayEquals("Arrays differ. Expected " + a1s + " was " + a2s, a1, a2); + } + + private Row getRow(NavigableSet<Clusterable> sortedContent, Clustering cl) + { + NavigableSet<Clusterable> nexts = sortedContent.tailSet(cl, true); + if (nexts.isEmpty()) + return null; + Row row = nexts.first() instanceof Row && cfm.comparator.compare(cl, nexts.first()) == 0 ? (Row) nexts.first() : null; + for (Clusterable next : nexts) + if (next instanceof RangeTombstoneMarker) + { + RangeTombstoneMarker rt = (RangeTombstoneMarker) next; + if (!rt.isClose(false)) + return row; + DeletionTime delTime = rt.closeDeletionTime(false); + return row == null ? BTreeRow.emptyDeletedRow(cl, Deletion.regular(delTime)) : row.filter(ColumnFilter.all(cfm), delTime, true, cfm); + } + return row; + } + + private void assertRowsEqual(Row expected, Row actual) + { + try + { + assertEquals(expected == null, actual == null); + if (expected == null) + return; + assertEquals(expected.clustering(), actual.clustering()); + assertEquals(expected.deletion(), actual.deletion()); + assertArrayEquals(Iterables.toArray(expected.cells(), Cell.class), Iterables.toArray(expected.cells(), Cell.class)); + } catch (Throwable t) + { + throw new AssertionError(String.format("Row comparison failed, expected %s got %s", expected, actual), t); + } + } + + private static<T> T get(NavigableSet<T> sortedContent, Predicate<T> test) + { + return sortedContent.stream().filter(test).findFirst().orElse(null); + } + + @Test + public void testEmpty() + { + test(() -> Collections.<Row>emptyList(), null); + } + + @Test + public void testStaticOnly() + { + test(() -> Collections.<Row>emptyList(), makeStaticRow()); + } + + @Test + public void testRows() + { + test(this::generateRows, null); + } + + @Test + public void testRowsWithStatic() + { + test(this::generateRows, makeStaticRow()); + } + + @Test + public void testMarkersOnly() + { + test(this::generateMarkersOnly, null); + } + + @Test + public void testMarkersWithStatic() + { + test(this::generateMarkersOnly, makeStaticRow()); + } + + @Test + public void testUnfiltereds() + { + test(this::generateUnfiltereds, makeStaticRow()); + } + +}