Repository: cassandra
Updated Branches:
  refs/heads/trunk 79f4bdb20 -> 220f253fb


Fix bug with range tombstone in reverse queries and Partition test coverage

patch by blambov; reviewed by slebresne for CASSANDRA-10059


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e978df3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e978df3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e978df3

Branch: refs/heads/trunk
Commit: 1e978df3a80a8201ce95253e3fd822fd804993d7
Parents: 814bc70
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Wed Nov 11 10:43:54 2015 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri Dec 4 10:08:51 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/cassandra/db/RangeTombstoneList.java |  47 +-
 .../apache/cassandra/utils/SearchIterator.java  |   3 +-
 test/unit/org/apache/cassandra/Util.java        |  23 +
 .../partition/PartitionImplementationTest.java  | 524 +++++++++++++++++++
 5 files changed, 566 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e978df3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f6aed18..2527c43 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.0.1
+ * Fix bug with range tombstones on reverse queries and test coverage for
+   AbstractBTreePartition (CASSANDRA-10059)
  * Remove 64k limit on collection elements (CASSANDRA-10374)
  * Remove unclear Indexer.indexes() method (CASSANDRA-10690)
  * Fix NPE on stream read error (CASSANDRA-10771)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e978df3/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java 
b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index c92a296..c67ea33 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -17,21 +17,16 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 
 import org.apache.cassandra.utils.AbstractIterator;
 import com.google.common.collect.Iterators;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
@@ -53,8 +48,6 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
  */
 public class RangeTombstoneList implements Iterable<RangeTombstone>, 
IMeasurableMemory
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(RangeTombstoneList.class);
-
     private static long EMPTY_SIZE = ObjectSizes.measure(new 
RangeTombstoneList(null, 0));
 
     private final ClusteringComparator comparator;
@@ -265,6 +258,8 @@ public class RangeTombstoneList implements 
Iterable<RangeTombstone>, IMeasurable
     /*
      * Return is the index of the range covering name if name is covered. If 
the return idx is negative,
      * no range cover name and -idx-1 is the index of the first range whose 
start is greater than name.
+     *
+     * Note that bounds are not in the range if they fall on its boundary.
      */
     private int searchInternal(ClusteringPrefix name, int startIdx, int endIdx)
     {
@@ -274,7 +269,9 @@ public class RangeTombstoneList implements 
Iterable<RangeTombstone>, IMeasurable
         int pos = Arrays.binarySearch(starts, startIdx, endIdx, name, 
comparator);
         if (pos >= 0)
         {
-            return pos;
+            // Equality only happens for bounds (as used by 
forward/reverseIterator), and bounds are equal only if they
+            // are the same or complementary, in either case the bound itself 
is not part of the range.
+            return -pos - 1;
         }
         else
         {
@@ -283,7 +280,7 @@ public class RangeTombstoneList implements 
Iterable<RangeTombstone>, IMeasurable
             if (idx < 0)
                 return -1;
 
-            return comparator.compare(name, ends[idx]) <= 0 ? idx : -idx-2;
+            return comparator.compare(name, ends[idx]) < 0 ? idx : -idx-2;
         }
     }
 
@@ -387,14 +384,14 @@ public class RangeTombstoneList implements 
Iterable<RangeTombstone>, IMeasurable
         final int start = startIdx < 0 ? -startIdx-1 : startIdx;
 
         if (start >= size)
-            return Iterators.<RangeTombstone>emptyIterator();
+            return Collections.emptyIterator();
 
         int finishIdx = slice.end() == Slice.Bound.TOP ? size - 1 : 
searchInternal(slice.end(), start, size);
         // if stopIdx is the first range after 'slice.end()' we care only 
until the previous range
         final int finish = finishIdx < 0 ? -finishIdx-2 : finishIdx;
 
         if (start > finish)
-            return Iterators.<RangeTombstone>emptyIterator();
+            return Collections.emptyIterator();
 
         if (start == finish)
         {
@@ -428,19 +425,19 @@ public class RangeTombstoneList implements 
Iterable<RangeTombstone>, IMeasurable
 
     private Iterator<RangeTombstone> reverseIterator(final Slice slice)
     {
-        int startIdx = slice.end() == Slice.Bound.TOP ? 0 : 
searchInternal(slice.end(), 0, size);
+        int startIdx = slice.end() == Slice.Bound.TOP ? size - 1 : 
searchInternal(slice.end(), 0, size);
         // if startIdx is the first range after 'slice.end()' we care only 
until the previous range
         final int start = startIdx < 0 ? -startIdx-2 : startIdx;
 
-        if (start >= size)
-            return Iterators.<RangeTombstone>emptyIterator();
+        if (start < 0)
+            return Collections.emptyIterator();
 
-        int finishIdx = slice.start() == Slice.Bound.BOTTOM ? 0 : 
searchInternal(slice.start(), 0, start);
+        int finishIdx = slice.start() == Slice.Bound.BOTTOM ? 0 : 
searchInternal(slice.start(), 0, start + 1);  // include same as finish
         // if stopIdx is the first range after 'slice.end()' we care only 
until the previous range
         final int finish = finishIdx < 0 ? -finishIdx-1 : finishIdx;
 
         if (start < finish)
-            return Iterators.<RangeTombstone>emptyIterator();
+            return Collections.emptyIterator();
 
         if (start == finish)
         {
@@ -467,7 +464,7 @@ public class RangeTombstoneList implements 
Iterable<RangeTombstone>, IMeasurable
                     return rangeTombstoneWithNewEnd(idx--, slice.end());
                 if (idx == finish && comparator.compare(starts[idx], 
slice.start()) < 0)
                     return rangeTombstoneWithNewStart(idx--, slice.start());
-                return rangeTombstone(idx++);
+                return rangeTombstone(idx--);
             }
         };
     }
@@ -665,20 +662,6 @@ public class RangeTombstoneList implements 
Iterable<RangeTombstone>, IMeasurable
         size++;
     }
 
-    private void removeInternal(int i)
-    {
-        assert i >= 0;
-
-        System.arraycopy(starts, i+1, starts, i, size - i - 1);
-        System.arraycopy(ends, i+1, ends, i, size - i - 1);
-        System.arraycopy(markedAts, i+1, markedAts, i, size - i - 1);
-        System.arraycopy(delTimes, i+1, delTimes, i, size - i - 1);
-
-        --size;
-        starts[size] = null;
-        ends[size] = null;
-    }
-
     /*
      * Grow the arrays, leaving index i "free" in the process.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e978df3/src/java/org/apache/cassandra/utils/SearchIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SearchIterator.java 
b/src/java/org/apache/cassandra/utils/SearchIterator.java
index ca7b2fa..5309f4a 100644
--- a/src/java/org/apache/cassandra/utils/SearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/SearchIterator.java
@@ -23,7 +23,8 @@ public interface SearchIterator<K, V>
 
     /**
      * Searches "forwards" (in direction of travel) in the iterator for the 
required key;
-     * if this or any key greater has already been returned by the iterator, 
null will be returned.
+     * if this or any key greater has already been returned by the iterator, 
the method may
+     * choose to return null, the correct or incorrect output, or fail an 
assertion.
      *
      * it is permitted to search past the end of the iterator, i.e. !hasNext() 
=> next(?) == null
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e978df3/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index ea0bd9b..91ec6b6 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -538,4 +538,27 @@ public class Util
     {
         return () -> new AssertionError(message);
     }
+
+    public static class UnfilteredSource extends AbstractUnfilteredRowIterator 
implements UnfilteredRowIterator
+    {
+        Iterator<Unfiltered> content;
+
+        public UnfilteredSource(CFMetaData cfm, DecoratedKey partitionKey, Row 
staticRow, Iterator<Unfiltered> content)
+        {
+            super(cfm,
+                  partitionKey,
+                  DeletionTime.LIVE,
+                  cfm.partitionColumns(),
+                  staticRow != null ? staticRow : Rows.EMPTY_STATIC_ROW,
+                  false,
+                  EncodingStats.NO_STATS);
+            this.content = content;
+        }
+
+        @Override
+        protected Unfiltered computeNext()
+        {
+            return content.hasNext() ? content.next() : endOfData();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e978df3/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java 
b/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java
new file mode 100644
index 0000000..f215331
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partition;
+
+import static org.junit.Assert.*;
+
+import java.util.*;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Slice.Bound;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.rows.Row.Deletion;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.SearchIterator;
+
+public class PartitionImplementationTest
+{
+    private static final String KEYSPACE = "PartitionImplementationTest";
+    private static final String CF = "Standard";
+
+    private static final int ENTRIES = 250;
+    private static final int TESTS = 1000;
+    private static final int KEY_RANGE = ENTRIES * 5;
+
+    private static final int TIMESTAMP = KEY_RANGE + 1;
+
+    private static CFMetaData cfm;
+    private Random rand = new Random(2);
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+
+        cfm = CFMetaData.Builder.create(KEYSPACE, CF)
+                                        .addPartitionKey("pk", 
AsciiType.instance)
+                                        .addClusteringColumn("ck", 
AsciiType.instance)
+                                        .addRegularColumn("col", 
AsciiType.instance)
+                                        .addStaticColumn("static_col", 
AsciiType.instance)
+                                        .build();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    cfm);
+    }
+
+    private List<Row> generateRows()
+    {
+        List<Row> content = new ArrayList<>();
+        Set<Integer> keysUsed = new HashSet<>();
+        for (int i = 0; i < ENTRIES; ++i)
+        {
+            int rk;
+            do
+            {
+                rk = rand.nextInt(KEY_RANGE);
+            }
+            while (!keysUsed.add(rk));
+            content.add(makeRow(clustering(rk), "Col" + rk));
+        }
+        return content; // not sorted
+    }
+
+    Row makeRow(Clustering clustering, String colValue)
+    {
+        ColumnDefinition defCol = cfm.getColumnDefinition(new 
ColumnIdentifier("col", true));
+        Row.Builder row = BTreeRow.unsortedBuilder(TIMESTAMP);
+        row.newRow(clustering);
+        row.addCell(BufferCell.live(cfm, defCol, TIMESTAMP, 
ByteBufferUtil.bytes(colValue)));
+        return row.build();
+    }
+
+    Row makeStaticRow()
+    {
+        ColumnDefinition defCol = cfm.getColumnDefinition(new 
ColumnIdentifier("static_col", true));
+        Row.Builder row = BTreeRow.unsortedBuilder(TIMESTAMP);
+        row.newRow(Clustering.STATIC_CLUSTERING);
+        row.addCell(BufferCell.live(cfm, defCol, TIMESTAMP, 
ByteBufferUtil.bytes("static value")));
+        return row.build();
+    }
+
+    private List<Unfiltered> generateMarkersOnly()
+    {
+        return addMarkers(new ArrayList<>());
+    }
+
+    private List<Unfiltered> generateUnfiltereds()
+    {
+        List<Unfiltered> content = new ArrayList<>(generateRows());
+        return addMarkers(content);
+    }
+
+    List<Unfiltered> addMarkers(List<Unfiltered> content)
+    {
+        List<RangeTombstoneMarker> markers = new ArrayList<>();
+        Set<Integer> delTimes = new HashSet<>();
+        for (int i = 0; i < ENTRIES / 10; ++i)
+        {
+            int delTime;
+            do
+            {
+                delTime = rand.nextInt(KEY_RANGE);
+            }
+            while (!delTimes.add(delTime));
+
+            int start = rand.nextInt(KEY_RANGE);
+            DeletionTime dt = new DeletionTime(delTime, delTime);
+            RangeTombstoneMarker open = 
RangeTombstoneBoundMarker.inclusiveOpen(false, 
clustering(start).getRawValues(), dt);
+            int end = start + rand.nextInt((KEY_RANGE - start) / 4 + 1);
+            RangeTombstoneMarker close = 
RangeTombstoneBoundMarker.inclusiveClose(false, clustering(end).getRawValues(), 
dt);
+            markers.add(open);
+            markers.add(close);
+        }
+        markers.sort(cfm.comparator);
+
+        RangeTombstoneMarker toAdd = null;
+        Set<DeletionTime> open = new HashSet<>();
+        DeletionTime current = DeletionTime.LIVE;
+        for (RangeTombstoneMarker marker : markers)
+        {
+            if (marker.isOpen(false))
+            {
+                DeletionTime delTime = marker.openDeletionTime(false);
+                open.add(delTime);
+                if (delTime.supersedes(current))
+                {
+                    if (toAdd != null)
+                    {
+                        if (cfm.comparator.compare(toAdd, marker) != 0)
+                            content.add(toAdd);
+                        else
+                        {
+                            // gotta join
+                            current = toAdd.isClose(false) ? 
toAdd.closeDeletionTime(false) : DeletionTime.LIVE;
+                        }
+                    }
+                    if (current != DeletionTime.LIVE)
+                        marker = 
RangeTombstoneBoundaryMarker.makeBoundary(false, 
marker.openBound(false).invert(), marker.openBound(false), current, delTime);
+                    toAdd = marker;
+                    current = delTime;
+                }
+            }
+            else
+            {
+                assert marker.isClose(false);
+                DeletionTime delTime = marker.closeDeletionTime(false);
+                boolean removed = open.remove(delTime);
+                assert removed;
+                if (current.equals(delTime))
+                {
+                    if (toAdd != null)
+                    {
+                        if (cfm.comparator.compare(toAdd, marker) != 0)
+                            content.add(toAdd);
+                        else
+                        {
+                            // gotta join
+                            current = toAdd.closeDeletionTime(false);
+                            marker = new 
RangeTombstoneBoundMarker(marker.closeBound(false), current);
+                        }
+                    }
+                    DeletionTime best = 
open.stream().max(DeletionTime::compareTo).orElse(DeletionTime.LIVE);
+                    if (best != DeletionTime.LIVE)
+                        marker = 
RangeTombstoneBoundaryMarker.makeBoundary(false, marker.closeBound(false), 
marker.closeBound(false).invert(), current, best);
+                    toAdd = marker;
+                    current = best;
+                }
+            }
+        }
+        content.add(toAdd);
+        assert current == DeletionTime.LIVE;
+        assert open.isEmpty();
+        return content;
+    }
+
+    private Clustering clustering(int i)
+    {
+        return cfm.comparator.make(String.format("Row%06d", i));
+    }
+
+    private void test(Supplier<Collection<? extends Unfiltered>> content, Row 
staticRow)
+    {
+        for (int i = 0; i<TESTS; ++i)
+        {
+            try
+            {
+                rand = new Random(i);
+                testIter(content, staticRow);
+            }
+            catch (Throwable t)
+            {
+                throw new AssertionError("Test failed with seed " + i, t);
+            }
+        }
+    }
+
+    private void testIter(Supplier<Collection<? extends Unfiltered>> 
contentSupplier, Row staticRow)
+    {
+        NavigableSet<Clusterable> sortedContent = new 
TreeSet<Clusterable>(cfm.comparator);
+        sortedContent.addAll(contentSupplier.get());
+        AbstractBTreePartition partition;
+        try (UnfilteredRowIterator iter = new Util.UnfilteredSource(cfm, 
Util.dk("pk"), staticRow, sortedContent.stream().map(x -> (Unfiltered) 
x).iterator()))
+        {
+            partition = ImmutableBTreePartition.create(iter);
+        }
+
+        ColumnDefinition defCol = cfm.getColumnDefinition(new 
ColumnIdentifier("col", true));
+        ColumnFilter cf = ColumnFilter.selectionBuilder().add(defCol).build();
+        Function<? super Clusterable, ? extends Clusterable> colFilter = x -> 
x instanceof Row ? ((Row) x).filter(cf, cfm) : x;
+        Slices slices = Slices.with(cfm.comparator, 
Slice.make(clustering(KEY_RANGE / 4), clustering(KEY_RANGE * 3 / 4)));
+        Slices multiSlices = makeSlices();
+
+        // lastRow
+        assertRowsEqual((Row) get(sortedContent.descendingSet(), x -> x 
instanceof Row),
+                        partition.lastRow());
+        // get(static)
+        assertRowsEqual(staticRow,
+                        partition.getRow(Clustering.STATIC_CLUSTERING));
+
+        // get
+        for (int i=0; i < KEY_RANGE; ++i)
+        {
+            Clustering cl = clustering(i);
+            assertRowsEqual(getRow(sortedContent, cl),
+                            partition.getRow(cl));
+        }
+        // isEmpty
+        assertEquals(sortedContent.isEmpty() && staticRow == null,
+                     partition.isEmpty());
+        // hasRows
+        assertEquals(sortedContent.stream().anyMatch(x -> x instanceof Row),
+                     partition.hasRows());
+
+        // iterator
+        assertIteratorsEqual(sortedContent.stream().filter(x -> x instanceof 
Row).iterator(),
+                             partition.iterator());
+
+        // unfiltered iterator
+        assertIteratorsEqual(sortedContent.iterator(),
+                             partition.unfilteredIterator());
+
+        // unfiltered iterator
+        assertIteratorsEqual(sortedContent.iterator(),
+                             
partition.unfilteredIterator(ColumnFilter.all(cfm), Slices.ALL, false));
+        // column-filtered
+        assertIteratorsEqual(sortedContent.stream().map(colFilter).iterator(),
+                             partition.unfilteredIterator(cf, Slices.ALL, 
false));
+        // sliced
+        assertIteratorsEqual(slice(sortedContent, slices.get(0)),
+                             
partition.unfilteredIterator(ColumnFilter.all(cfm), slices, false));
+        assertIteratorsEqual(streamOf(slice(sortedContent, 
slices.get(0))).map(colFilter).iterator(),
+                             partition.unfilteredIterator(cf, slices, false));
+        // randomly multi-sliced
+        assertIteratorsEqual(slice(sortedContent, multiSlices),
+                             
partition.unfilteredIterator(ColumnFilter.all(cfm), multiSlices, false));
+        assertIteratorsEqual(streamOf(slice(sortedContent, 
multiSlices)).map(colFilter).iterator(),
+                             partition.unfilteredIterator(cf, multiSlices, 
false));
+        // reversed
+        assertIteratorsEqual(sortedContent.descendingIterator(),
+                             
partition.unfilteredIterator(ColumnFilter.all(cfm), Slices.ALL, true));
+        
assertIteratorsEqual(sortedContent.descendingSet().stream().map(colFilter).iterator(),
+                             partition.unfilteredIterator(cf, Slices.ALL, 
true));
+        assertIteratorsEqual(invert(slice(sortedContent, slices.get(0))),
+                             
partition.unfilteredIterator(ColumnFilter.all(cfm), slices, true));
+        assertIteratorsEqual(streamOf(invert(slice(sortedContent, 
slices.get(0)))).map(colFilter).iterator(),
+                             partition.unfilteredIterator(cf, slices, true));
+        assertIteratorsEqual(invert(slice(sortedContent, multiSlices)),
+                             
partition.unfilteredIterator(ColumnFilter.all(cfm), multiSlices, true));
+        assertIteratorsEqual(streamOf(invert(slice(sortedContent, 
multiSlices))).map(colFilter).iterator(),
+                             partition.unfilteredIterator(cf, multiSlices, 
true));
+
+        // search iterator
+        testSearchIterator(sortedContent, partition, ColumnFilter.all(cfm), 
false);
+        testSearchIterator(sortedContent, partition, cf, false);
+        testSearchIterator(sortedContent, partition, ColumnFilter.all(cfm), 
true);
+        testSearchIterator(sortedContent, partition, cf, true);
+
+        // sliceable iter
+        testSliceableIterator(sortedContent, partition, ColumnFilter.all(cfm), 
false);
+        testSliceableIterator(sortedContent, partition, cf, false);
+        testSliceableIterator(sortedContent, partition, ColumnFilter.all(cfm), 
true);
+        testSliceableIterator(sortedContent, partition, cf, true);
+    }
+
+    void testSearchIterator(NavigableSet<Clusterable> sortedContent, Partition 
partition, ColumnFilter cf, boolean reversed)
+    {
+        SearchIterator<Clustering, Row> searchIter = 
partition.searchIterator(cf, reversed);
+        int pos = reversed ? KEY_RANGE : 0;
+        int mul = reversed ? -1 : 1;
+        boolean started = false;
+        while (searchIter.hasNext())
+        {
+            int skip = rand.nextInt(KEY_RANGE / 10);
+            pos += skip * mul;
+            Clustering cl = clustering(pos);
+            Row row = searchIter.next(cl);  // returns row with deletion, 
incl. empty row with deletion
+            if (row == null && skip == 0 && started)    // allowed to return 
null if already reported row
+                continue;
+            started = true;
+            Row expected = getRow(sortedContent, cl);
+            assertEquals(expected == null, row == null);
+            if (row == null)
+                continue;
+            assertRowsEqual(expected.filter(cf, cfm), row);
+        }
+    }
+
+    Slices makeSlices()
+    {
+        int pos = 0;
+        Slices.Builder builder = new Slices.Builder(cfm.comparator);
+        while (pos <= KEY_RANGE)
+        {
+            int skip = rand.nextInt(KEY_RANGE / 10) * (rand.nextInt(3) + 2 / 
3); // increased chance of getting 0
+            pos += skip;
+            int sz = rand.nextInt(KEY_RANGE / 10) + (skip == 0 ? 1 : 0);    // 
if start is exclusive need at least sz 1
+            Clustering start = clustering(pos);
+            pos += sz;
+            Clustering end = clustering(pos);
+            Slice slice = Slice.make(skip == 0 ? Bound.exclusiveStartOf(start) 
: Bound.inclusiveStartOf(start), Bound.inclusiveEndOf(end));
+            builder.add(slice);
+        }
+        return builder.build();
+    }
+
+    void testSliceableIterator(NavigableSet<Clusterable> sortedContent, 
AbstractBTreePartition partition, ColumnFilter cf, boolean reversed)
+    {
+        Function<? super Clusterable, ? extends Clusterable> colFilter = x -> 
x instanceof Row ? ((Row) x).filter(cf, cfm) : x;
+        Slices slices = makeSlices();
+        try (SliceableUnfilteredRowIterator sliceableIter = 
partition.sliceableUnfilteredIterator(cf, reversed))
+        {
+            for (Slice slice : (Iterable<Slice>) () -> directed(slices, 
reversed))
+                assertIteratorsEqual(streamOf(directed(slice(sortedContent, 
slice), reversed)).map(colFilter).iterator(),
+                                     sliceableIter.slice(slice));
+        }
+
+        // Try using sliceable as unfiltered iterator
+        try (SliceableUnfilteredRowIterator sliceableIter = 
partition.sliceableUnfilteredIterator(cf, reversed))
+        {
+            assertIteratorsEqual((reversed ? sortedContent.descendingSet() : 
sortedContent).
+                                     stream().map(colFilter).iterator(),
+                                 sliceableIter);
+        }
+    }
+
+    private<T> Iterator<T> invert(Iterator<T> slice)
+    {
+        Deque<T> dest = new LinkedList<>();
+        Iterators.addAll(dest, slice);
+        return dest.descendingIterator();
+    }
+
+    private Iterator<Clusterable> slice(NavigableSet<Clusterable> 
sortedContent, Slices slices)
+    {
+        return Iterators.concat(streamOf(slices).map(slice -> 
slice(sortedContent, slice)).iterator());
+    }
+
+    private Iterator<Clusterable> slice(NavigableSet<Clusterable> 
sortedContent, Slice slice)
+    {
+        // Slice bounds are inclusive bounds, equal only to markers. Matched 
markers should be returned as one-sided boundaries.
+        RangeTombstoneMarker prev = (RangeTombstoneMarker) 
sortedContent.headSet(slice.start(), true).descendingSet().stream().filter(x -> 
x instanceof RangeTombstoneMarker).findFirst().orElse(null);
+        RangeTombstoneMarker next = (RangeTombstoneMarker) 
sortedContent.tailSet(slice.end(), true).stream().filter(x -> x instanceof 
RangeTombstoneMarker).findFirst().orElse(null);
+        Iterator<Clusterable> result = sortedContent.subSet(slice.start(), 
false, slice.end(), false).iterator();
+        if (prev != null && prev.isOpen(false))
+            result = Iterators.concat(Iterators.singletonIterator(new 
RangeTombstoneBoundMarker(slice.start(), prev.openDeletionTime(false))), 
result);
+        if (next != null && next.isClose(false))
+            result = Iterators.concat(result, Iterators.singletonIterator(new 
RangeTombstoneBoundMarker(slice.end(), next.closeDeletionTime(false))));
+        return result;
+    }
+
+    private Iterator<Slice> directed(Slices slices, boolean reversed)
+    {
+        return directed(slices.iterator(), reversed);
+    }
+
+    private <T> Iterator<T> directed(Iterator<T> iter, boolean reversed)
+    {
+        if (!reversed)
+            return iter;
+        return invert(iter);
+    }
+
+    private <T> Stream<T> streamOf(Iterator<T> iterator)
+    {
+        Iterable<T> iterable = () -> iterator;
+        return streamOf(iterable);
+    }
+
+    <T> Stream<T> streamOf(Iterable<T> iterable)
+    {
+        return StreamSupport.stream(iterable.spliterator(), false);
+    }
+
+    private void assertIteratorsEqual(Iterator<? extends Clusterable> it1, 
Iterator<? extends Clusterable> it2)
+    {
+        Clusterable[] a1 = (Clusterable[]) Iterators.toArray(it1, 
Clusterable.class);
+        Clusterable[] a2 = (Clusterable[]) Iterators.toArray(it2, 
Clusterable.class);
+        if (Arrays.equals(a1, a2))
+            return;
+        String a1s = Stream.of(a1).map(x -> "\n" + (x instanceof Unfiltered ? 
((Unfiltered) x).toString(cfm) : 
x.toString())).collect(Collectors.toList()).toString();
+        String a2s = Stream.of(a2).map(x -> "\n" + (x instanceof Unfiltered ? 
((Unfiltered) x).toString(cfm) : 
x.toString())).collect(Collectors.toList()).toString();
+        assertArrayEquals("Arrays differ. Expected " + a1s + " was " + a2s, 
a1, a2);
+    }
+
+    private Row getRow(NavigableSet<Clusterable> sortedContent, Clustering cl)
+    {
+        NavigableSet<Clusterable> nexts = sortedContent.tailSet(cl, true);
+        if (nexts.isEmpty())
+            return null;
+        Row row = nexts.first() instanceof Row && cfm.comparator.compare(cl, 
nexts.first()) == 0 ? (Row) nexts.first() : null;
+        for (Clusterable next : nexts)
+            if (next instanceof RangeTombstoneMarker)
+            {
+                RangeTombstoneMarker rt = (RangeTombstoneMarker) next;
+                if (!rt.isClose(false))
+                    return row;
+                DeletionTime delTime = rt.closeDeletionTime(false);
+                return row == null ? BTreeRow.emptyDeletedRow(cl, 
Deletion.regular(delTime)) : row.filter(ColumnFilter.all(cfm), delTime, true, 
cfm);
+            }
+        return row;
+    }
+
+    private void assertRowsEqual(Row expected, Row actual)
+    {
+        try
+        {
+            assertEquals(expected == null, actual == null);
+            if (expected == null)
+                return;
+            assertEquals(expected.clustering(), actual.clustering());
+            assertEquals(expected.deletion(), actual.deletion());
+            assertArrayEquals(Iterables.toArray(expected.cells(), Cell.class), 
Iterables.toArray(expected.cells(), Cell.class));
+        } catch (Throwable t)
+        {
+            throw new AssertionError(String.format("Row comparison failed, 
expected %s got %s", expected, actual), t);
+        }
+    }
+
+    private static<T> T get(NavigableSet<T> sortedContent, Predicate<T> test)
+    {
+        return sortedContent.stream().filter(test).findFirst().orElse(null);
+    }
+
+    @Test
+    public void testEmpty()
+    {
+        test(() -> Collections.<Row>emptyList(), null);
+    }
+
+    @Test
+    public void testStaticOnly()
+    {
+        test(() -> Collections.<Row>emptyList(), makeStaticRow());
+    }
+
+    @Test
+    public void testRows()
+    {
+        test(this::generateRows, null);
+    }
+
+    @Test
+    public void testRowsWithStatic()
+    {
+        test(this::generateRows, makeStaticRow());
+    }
+
+    @Test
+    public void testMarkersOnly()
+    {
+        test(this::generateMarkersOnly, null);
+    }
+
+    @Test
+    public void testMarkersWithStatic()
+    {
+        test(this::generateMarkersOnly, makeStaticRow());
+    }
+
+    @Test
+    public void testUnfiltereds()
+    {
+        test(this::generateUnfiltereds, makeStaticRow());
+    }
+
+}

Reply via email to