Repository: cassandra
Updated Branches:
  refs/heads/trunk 11c8ca6b5 -> 72790dc8e


http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java 
b/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
new file mode 100644
index 0000000..92fbf69
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.plan;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.DoubleType;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.sasi.plan.Operation.OperationType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.junit.*;
+
+public class OperationTest extends SchemaLoader
+{
+    private static final String KS_NAME = "sasi";
+    private static final String CF_NAME = "test_cf";
+    private static final String CLUSTERING_CF_NAME = "clustering_test_cf";
+
+    private static ColumnFamilyStore BACKEND;
+    private static ColumnFamilyStore CLUSTERING_BACKEND;
+
+    @BeforeClass
+    public static void loadSchema() throws ConfigurationException
+    {
+        System.setProperty("cassandra.config", "cassandra-murmur.yaml");
+        SchemaLoader.loadSchema();
+        MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(KS_NAME,
+                                                                     
KeyspaceParams.simpleTransient(1),
+                                                                     
Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME),
+                                                                               
SchemaLoader.clusteringSASICFMD(KS_NAME, CLUSTERING_CF_NAME))));
+
+        BACKEND = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
+        CLUSTERING_BACKEND = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CLUSTERING_CF_NAME);
+    }
+
+    private QueryController controller;
+
+    @Before
+    public void beforeTest()
+    {
+        controller = new QueryController(BACKEND,
+                                         
PartitionRangeReadCommand.allDataRead(BACKEND.metadata, 
FBUtilities.nowInSeconds()),
+                                         TimeUnit.SECONDS.toMillis(10));
+    }
+
+    @After
+    public void afterTest()
+    {
+        controller.finish();
+    }
+
+    @Test
+    public void testAnalyze() throws Exception
+    {
+        final ColumnDefinition firstName = 
getColumn(UTF8Type.instance.decompose("first_name"));
+        final ColumnDefinition age = 
getColumn(UTF8Type.instance.decompose("age"));
+        final ColumnDefinition comment = 
getColumn(UTF8Type.instance.decompose("comment"));
+
+        // age != 5 AND age > 1 AND age != 6 AND age <= 10
+        Map<Expression.Op, Expression> expressions = 
convert(Operation.analyzeGroup(controller, OperationType.AND,
+                                                                               
 Arrays.asList(new SimpleExpression(age, Operator.NEQ, 
Int32Type.instance.decompose(5)),
+                                                                               
               new SimpleExpression(age, Operator.GT, 
Int32Type.instance.decompose(1)),
+                                                                               
               new SimpleExpression(age, Operator.NEQ, 
Int32Type.instance.decompose(6)),
+                                                                               
               new SimpleExpression(age, Operator.LTE, 
Int32Type.instance.decompose(10)))));
+
+        Expression expected = new Expression("age", Int32Type.instance)
+        {{
+            operation = Op.RANGE;
+            lower = new Bound(Int32Type.instance.decompose(1), false);
+            upper = new Bound(Int32Type.instance.decompose(10), true);
+
+            exclusions.add(Int32Type.instance.decompose(5));
+            exclusions.add(Int32Type.instance.decompose(6));
+        }};
+
+        Assert.assertEquals(1, expressions.size());
+        Assert.assertEquals(expected, expressions.get(Expression.Op.RANGE));
+
+        // age != 5 OR age >= 7
+        expressions = convert(Operation.analyzeGroup(controller, 
OperationType.OR,
+                                                    Arrays.asList(new 
SimpleExpression(age, Operator.NEQ, Int32Type.instance.decompose(5)),
+                                                                  new 
SimpleExpression(age, Operator.GTE, Int32Type.instance.decompose(7)))));
+        Assert.assertEquals(2, expressions.size());
+
+        Assert.assertEquals(new Expression("age", Int32Type.instance)
+                            {{
+                                    operation = Op.NOT_EQ;
+                                    lower = new 
Bound(Int32Type.instance.decompose(5), true);
+                                    upper = lower;
+                            }}, expressions.get(Expression.Op.NOT_EQ));
+
+        Assert.assertEquals(new Expression("age", Int32Type.instance)
+                            {{
+                                    operation = Op.RANGE;
+                                    lower = new 
Bound(Int32Type.instance.decompose(7), true);
+                            }}, expressions.get(Expression.Op.RANGE));
+
+        // age != 5 OR age < 7
+        expressions = convert(Operation.analyzeGroup(controller, 
OperationType.OR,
+                                                    Arrays.asList(new 
SimpleExpression(age, Operator.NEQ, Int32Type.instance.decompose(5)),
+                                                                  new 
SimpleExpression(age, Operator.LT, Int32Type.instance.decompose(7)))));
+
+        Assert.assertEquals(2, expressions.size());
+        Assert.assertEquals(new Expression("age", Int32Type.instance)
+                            {{
+                                    operation = Op.RANGE;
+                                    upper = new 
Bound(Int32Type.instance.decompose(7), false);
+                            }}, expressions.get(Expression.Op.RANGE));
+        Assert.assertEquals(new Expression("age", Int32Type.instance)
+                            {{
+                                    operation = Op.NOT_EQ;
+                                    lower = new 
Bound(Int32Type.instance.decompose(5), true);
+                                    upper = lower;
+                            }}, expressions.get(Expression.Op.NOT_EQ));
+
+        // age > 1 AND age < 7
+        expressions = convert(Operation.analyzeGroup(controller, 
OperationType.AND,
+                                                    Arrays.asList(new 
SimpleExpression(age, Operator.GT, Int32Type.instance.decompose(1)),
+                                                                  new 
SimpleExpression(age, Operator.LT, Int32Type.instance.decompose(7)))));
+
+        Assert.assertEquals(1, expressions.size());
+        Assert.assertEquals(new Expression("age", Int32Type.instance)
+                            {{
+                                    operation = Op.RANGE;
+                                    lower = new 
Bound(Int32Type.instance.decompose(1), false);
+                                    upper = new 
Bound(Int32Type.instance.decompose(7), false);
+                            }}, expressions.get(Expression.Op.RANGE));
+
+        // first_name = 'a' OR first_name != 'b'
+        expressions = convert(Operation.analyzeGroup(controller, 
OperationType.OR,
+                                                    Arrays.asList(new 
SimpleExpression(firstName, Operator.EQ, UTF8Type.instance.decompose("a")),
+                                                                  new 
SimpleExpression(firstName, Operator.NEQ, UTF8Type.instance.decompose("b")))));
+
+        Assert.assertEquals(2, expressions.size());
+        Assert.assertEquals(new Expression("first_name", UTF8Type.instance)
+                            {{
+                                    operation = Op.NOT_EQ;
+                                    lower = new 
Bound(UTF8Type.instance.decompose("b"), true);
+                                    upper = lower;
+                            }}, expressions.get(Expression.Op.NOT_EQ));
+        Assert.assertEquals(new Expression("first_name", UTF8Type.instance)
+                            {{
+                                    operation = Op.EQ;
+                                    lower = upper = new 
Bound(UTF8Type.instance.decompose("a"), true);
+                            }}, expressions.get(Expression.Op.EQ));
+
+        // comment = 'soft eng' and comment != 'likes do'
+        ListMultimap<ColumnDefinition, Expression> e = 
Operation.analyzeGroup(controller, OperationType.OR,
+                                                    Arrays.asList(new 
SimpleExpression(comment, Operator.EQ, UTF8Type.instance.decompose("soft eng")),
+                                                                  new 
SimpleExpression(comment, Operator.NEQ, UTF8Type.instance.decompose("likes 
do"))));
+
+        List<Expression> expectedExpressions = new ArrayList<Expression>(2)
+        {{
+                add(new Expression("comment", UTF8Type.instance)
+                {{
+                        operation = Op.EQ;
+                        lower = new Bound(UTF8Type.instance.decompose("soft"), 
true);
+                        upper = lower;
+                }});
+
+                add(new Expression("comment", UTF8Type.instance)
+                {{
+                        operation = Op.EQ;
+                        lower = new Bound(UTF8Type.instance.decompose("eng"), 
true);
+                        upper = lower;
+                }});
+
+                add(new Expression("comment", UTF8Type.instance)
+                {{
+                        operation = Op.NOT_EQ;
+                        lower = new 
Bound(UTF8Type.instance.decompose("likes"), true);
+                        upper = lower;
+                }});
+
+                add(new Expression("comment", UTF8Type.instance)
+                {{
+                        operation = Op.NOT_EQ;
+                        lower = new Bound(UTF8Type.instance.decompose("do"), 
true);
+                        upper = lower;
+                }});
+        }};
+
+        Assert.assertEquals(expectedExpressions, e.get(comment));
+
+        // first_name = 'j' and comment != 'likes do'
+        e = Operation.analyzeGroup(controller, OperationType.OR,
+                        Arrays.asList(new SimpleExpression(comment, 
Operator.NEQ, UTF8Type.instance.decompose("likes do")),
+                                      new SimpleExpression(firstName, 
Operator.EQ, UTF8Type.instance.decompose("j"))));
+
+        expectedExpressions = new ArrayList<Expression>(2)
+        {{
+                add(new Expression("comment", UTF8Type.instance)
+                {{
+                        operation = Op.NOT_EQ;
+                        lower = new 
Bound(UTF8Type.instance.decompose("likes"), true);
+                        upper = lower;
+                }});
+
+                add(new Expression("comment", UTF8Type.instance)
+                {{
+                        operation = Op.NOT_EQ;
+                        lower = new Bound(UTF8Type.instance.decompose("do"), 
true);
+                        upper = lower;
+                }});
+        }};
+
+        Assert.assertEquals(expectedExpressions, e.get(comment));
+
+        // age != 27 first_name = 'j' and age != 25
+        e = Operation.analyzeGroup(controller, OperationType.OR,
+                        Arrays.asList(new SimpleExpression(age, Operator.NEQ, 
Int32Type.instance.decompose(27)),
+                                      new SimpleExpression(firstName, 
Operator.EQ, UTF8Type.instance.decompose("j")),
+                                      new SimpleExpression(age, Operator.NEQ, 
Int32Type.instance.decompose(25))));
+
+        expectedExpressions = new ArrayList<Expression>(2)
+        {{
+                add(new Expression("age", Int32Type.instance)
+                {{
+                        operation = Op.NOT_EQ;
+                        lower = new Bound(Int32Type.instance.decompose(27), 
true);
+                        upper = lower;
+                }});
+
+                add(new Expression("age", Int32Type.instance)
+                {{
+                        operation = Op.NOT_EQ;
+                        lower = new Bound(Int32Type.instance.decompose(25), 
true);
+                        upper = lower;
+                }});
+        }};
+
+        Assert.assertEquals(expectedExpressions, e.get(age));
+    }
+
+    @Test
+    public void testSatisfiedBy() throws Exception
+    {
+        final ColumnDefinition timestamp = 
getColumn(UTF8Type.instance.decompose("timestamp"));
+        final ColumnDefinition age = 
getColumn(UTF8Type.instance.decompose("age"));
+
+        Operation.Builder builder = new Operation.Builder(OperationType.AND, 
controller, new SimpleExpression(age, Operator.NEQ, 
Int32Type.instance.decompose(5)));
+        Operation op = builder.complete();
+
+        Unfiltered row = buildRow(buildCell(age, 
Int32Type.instance.decompose(6), System.currentTimeMillis()));
+
+        Assert.assertTrue(op.satisfiedBy(row, false));
+
+        row = buildRow(buildCell(age, Int32Type.instance.decompose(5), 
System.currentTimeMillis()));
+
+        // and reject incorrect value
+        Assert.assertFalse(op.satisfiedBy(row, false));
+
+        row = buildRow(buildCell(age, Int32Type.instance.decompose(6), 
System.currentTimeMillis()));
+
+        Assert.assertTrue(op.satisfiedBy(row, false));
+
+        // range with exclusions - age != 5 AND age > 1 AND age != 6 AND age 
<= 10
+        builder = new Operation.Builder(OperationType.AND, controller,
+                                        new SimpleExpression(age, 
Operator.NEQ, Int32Type.instance.decompose(5)),
+                                        new SimpleExpression(age, Operator.GT, 
Int32Type.instance.decompose(1)),
+                                        new SimpleExpression(age, 
Operator.NEQ, Int32Type.instance.decompose(6)),
+                                        new SimpleExpression(age, 
Operator.LTE, Int32Type.instance.decompose(10)));
+        op = builder.complete();
+
+        Set<Integer> exclusions = Sets.newHashSet(0, 1, 5, 6, 11);
+        for (int i = 0; i <= 11; i++)
+        {
+            row = buildRow(buildCell(age, Int32Type.instance.decompose(i), 
System.currentTimeMillis()));
+
+            boolean result = op.satisfiedBy(row, false);
+            Assert.assertTrue(exclusions.contains(i) != result);
+        }
+
+        // now let's do something more complex - age = 5 OR age = 6
+        builder = new Operation.Builder(OperationType.OR, controller,
+                                        new SimpleExpression(age, Operator.EQ, 
Int32Type.instance.decompose(5)),
+                                        new SimpleExpression(age, Operator.EQ, 
Int32Type.instance.decompose(6)));
+
+        op = builder.complete();
+
+        exclusions = Sets.newHashSet(0, 1, 2, 3, 4, 7, 8, 9, 10);
+        for (int i = 0; i <= 10; i++)
+        {
+            row = buildRow(buildCell(age, Int32Type.instance.decompose(i), 
System.currentTimeMillis()));
+
+            boolean result = op.satisfiedBy(row, false);
+            Assert.assertTrue(exclusions.contains(i) != result);
+        }
+
+        // now let's test aggregated AND commands
+        builder = new Operation.Builder(OperationType.AND, controller);
+
+        // logical should be ignored by analyzer, but we still what to make 
sure that it is
+        //IndexExpression logical = new 
IndexExpression(ByteBufferUtil.EMPTY_BYTE_BUFFER, IndexOperator.EQ, 
ByteBufferUtil.EMPTY_BYTE_BUFFER);
+        //logical.setLogicalOp(LogicalIndexOperator.AND);
+
+        //builder.add(logical);
+        builder.add(new SimpleExpression(age, Operator.GTE, 
Int32Type.instance.decompose(0)));
+        builder.add(new SimpleExpression(age, Operator.LT, 
Int32Type.instance.decompose(10)));
+        builder.add(new SimpleExpression(age, Operator.NEQ, 
Int32Type.instance.decompose(7)));
+
+        op = builder.complete();
+
+        exclusions = Sets.newHashSet(7);
+        for (int i = 0; i < 10; i++)
+        {
+            row = buildRow(buildCell(age, Int32Type.instance.decompose(i), 
System.currentTimeMillis()));
+
+            boolean result = op.satisfiedBy(row, false);
+            Assert.assertTrue(exclusions.contains(i) != result);
+        }
+
+        // multiple analyzed expressions in the Operation timestamp >= 10 AND 
age = 5
+        builder = new Operation.Builder(OperationType.AND, controller);
+        builder.add(new SimpleExpression(timestamp, Operator.GTE, 
LongType.instance.decompose(10L)));
+        builder.add(new SimpleExpression(age, Operator.EQ, 
Int32Type.instance.decompose(5)));
+
+        op = builder.complete();
+
+        row = buildRow(buildCell(age, Int32Type.instance.decompose(6), 
System.currentTimeMillis()),
+                                  buildCell(timestamp, 
LongType.instance.decompose(11L), System.currentTimeMillis()));
+
+        Assert.assertFalse(op.satisfiedBy(row, false));
+
+        row = buildRow(buildCell(age, Int32Type.instance.decompose(5), 
System.currentTimeMillis()),
+                                  buildCell(timestamp, 
LongType.instance.decompose(22L), System.currentTimeMillis()));
+
+        Assert.assertTrue(op.satisfiedBy(row, false));
+
+        row = buildRow(buildCell(age, Int32Type.instance.decompose(5), 
System.currentTimeMillis()),
+                                  buildCell(timestamp, 
LongType.instance.decompose(9L), System.currentTimeMillis()));
+
+        Assert.assertFalse(op.satisfiedBy(row, false));
+
+        // operation with internal expressions and right child
+        builder = new Operation.Builder(OperationType.OR, controller,
+                                        new SimpleExpression(timestamp, 
Operator.GT, LongType.instance.decompose(10L)));
+        builder.setRight(new Operation.Builder(OperationType.AND, controller,
+                                               new SimpleExpression(age, 
Operator.GT, Int32Type.instance.decompose(0)),
+                                               new SimpleExpression(age, 
Operator.LT, Int32Type.instance.decompose(10))));
+        op = builder.complete();
+
+        row = buildRow(buildCell(age, Int32Type.instance.decompose(5), 
System.currentTimeMillis()),
+                                  buildCell(timestamp, 
LongType.instance.decompose(9L), System.currentTimeMillis()));
+
+        Assert.assertTrue(op.satisfiedBy(row, false));
+
+        row = buildRow(buildCell(age, Int32Type.instance.decompose(20), 
System.currentTimeMillis()),
+                                  buildCell(timestamp, 
LongType.instance.decompose(11L), System.currentTimeMillis()));
+
+        Assert.assertTrue(op.satisfiedBy(row, false));
+
+        row = buildRow(buildCell(age, Int32Type.instance.decompose(0), 
System.currentTimeMillis()),
+                                  buildCell(timestamp, 
LongType.instance.decompose(9L), System.currentTimeMillis()));
+
+        Assert.assertFalse(op.satisfiedBy(row, false));
+
+        // and for desert let's try out null and deleted rows etc.
+        builder = new Operation.Builder(OperationType.AND, controller);
+        builder.add(new SimpleExpression(age, Operator.EQ, 
Int32Type.instance.decompose(30)));
+        op = builder.complete();
+
+        Assert.assertFalse(op.satisfiedBy(null, false));
+        Assert.assertFalse(op.satisfiedBy(row, false));
+
+        long now = System.currentTimeMillis();
+
+        row = OperationTest.buildRow(
+                Row.Deletion.regular(new DeletionTime(now - 10, (int) (now / 
1000))),
+                          buildCell(age, Int32Type.instance.decompose(6), 
System.currentTimeMillis()));
+
+        Assert.assertFalse(op.satisfiedBy(row, false));
+
+        row = buildRow(deletedCell(age, System.currentTimeMillis(), 
FBUtilities.nowInSeconds()));
+
+        Assert.assertFalse(op.satisfiedBy(row, true));
+
+        try
+        {
+            Assert.assertFalse(op.satisfiedBy(buildRow(), false));
+        }
+        catch (IllegalStateException e)
+        {
+            // expected
+        }
+
+        try
+        {
+            Assert.assertFalse(op.satisfiedBy(buildRow(), true));
+        }
+        catch (IllegalStateException e)
+        {
+            Assert.fail("IllegalStateException should not be thrown when 
missing column and allowMissingColumns=true");
+        }
+    }
+
+    @Test
+    public void testAnalyzeNotIndexedButDefinedColumn() throws Exception
+    {
+        final ColumnDefinition firstName = 
getColumn(UTF8Type.instance.decompose("first_name"));
+        final ColumnDefinition height = 
getColumn(UTF8Type.instance.decompose("height"));
+
+        // first_name = 'a' AND height != 10
+        Map<Expression.Op, Expression> expressions;
+        expressions = convert(Operation.analyzeGroup(controller, 
OperationType.AND,
+                Arrays.asList(new SimpleExpression(firstName, Operator.EQ, 
UTF8Type.instance.decompose("a")),
+                              new SimpleExpression(height, Operator.NEQ, 
Int32Type.instance.decompose(5)))));
+
+        Assert.assertEquals(2, expressions.size());
+
+        Assert.assertEquals(new Expression("height", Int32Type.instance)
+        {{
+                operation = Op.NOT_EQ;
+                lower = new Bound(Int32Type.instance.decompose(5), true);
+                upper = lower;
+        }}, expressions.get(Expression.Op.NOT_EQ));
+
+        expressions = convert(Operation.analyzeGroup(controller, 
OperationType.AND,
+                Arrays.asList(new SimpleExpression(firstName, Operator.EQ, 
UTF8Type.instance.decompose("a")),
+                              new SimpleExpression(height, Operator.GT, 
Int32Type.instance.decompose(0)),
+                              new SimpleExpression(height, Operator.NEQ, 
Int32Type.instance.decompose(5)))));
+
+        Assert.assertEquals(2, expressions.size());
+
+        Assert.assertEquals(new Expression("height", Int32Type.instance)
+        {{
+            operation = Op.RANGE;
+            lower = new Bound(Int32Type.instance.decompose(0), false);
+            exclusions.add(Int32Type.instance.decompose(5));
+        }}, expressions.get(Expression.Op.RANGE));
+
+        expressions = convert(Operation.analyzeGroup(controller, 
OperationType.AND,
+                Arrays.asList(new SimpleExpression(firstName, Operator.EQ, 
UTF8Type.instance.decompose("a")),
+                              new SimpleExpression(height, Operator.NEQ, 
Int32Type.instance.decompose(5)),
+                              new SimpleExpression(height, Operator.GTE, 
Int32Type.instance.decompose(0)),
+                              new SimpleExpression(height, Operator.LT, 
Int32Type.instance.decompose(10)))));
+
+        Assert.assertEquals(2, expressions.size());
+
+        Assert.assertEquals(new Expression("height", Int32Type.instance)
+        {{
+                operation = Op.RANGE;
+                lower = new Bound(Int32Type.instance.decompose(0), true);
+                upper = new Bound(Int32Type.instance.decompose(10), false);
+                exclusions.add(Int32Type.instance.decompose(5));
+        }}, expressions.get(Expression.Op.RANGE));
+    }
+
+    @Test
+    public void testSatisfiedByWithMultipleTerms()
+    {
+        final ColumnDefinition comment = 
getColumn(UTF8Type.instance.decompose("comment"));
+
+        Unfiltered row = buildRow(
+                buildCell(comment,
+                                                          
UTF8Type.instance.decompose("software engineer is working on a project"),
+                                                          
System.currentTimeMillis()));
+
+        Operation.Builder builder = new Operation.Builder(OperationType.AND, 
controller,
+                                            new SimpleExpression(comment, 
Operator.EQ, UTF8Type.instance.decompose("eng is a work")));
+        Operation op = builder.complete();
+
+        Assert.assertTrue(op.satisfiedBy(row, false));
+
+        builder = new Operation.Builder(OperationType.AND, controller,
+                                            new SimpleExpression(comment, 
Operator.EQ, UTF8Type.instance.decompose("soft works fine")));
+        op = builder.complete();
+
+        Assert.assertTrue(op.satisfiedBy(row, false));
+    }
+
+    @Test
+    public void testSatisfiedByWithClustering()
+    {
+        ColumnDefinition location = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("location"));
+        ColumnDefinition age = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("age"));
+        ColumnDefinition height = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("height"));
+        ColumnDefinition score = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("score"));
+
+        Unfiltered row = buildRow(new 
Clustering(UTF8Type.instance.fromString("US"), 
Int32Type.instance.decompose(27)),
+                                  buildCell(height, 
Int32Type.instance.decompose(182), System.currentTimeMillis()),
+                                  buildCell(score, 
DoubleType.instance.decompose(1.0d), System.currentTimeMillis()));
+
+        Operation.Builder builder = new Operation.Builder(OperationType.AND, 
controller);
+        builder.add(new SimpleExpression(age, Operator.EQ, 
Int32Type.instance.decompose(27)));
+        builder.add(new SimpleExpression(height, Operator.EQ, 
Int32Type.instance.decompose(182)));
+
+        Assert.assertTrue(builder.complete().satisfiedBy(row, false));
+
+        builder = new Operation.Builder(OperationType.AND, controller);
+
+        builder.add(new SimpleExpression(age, Operator.EQ, 
Int32Type.instance.decompose(28)));
+        builder.add(new SimpleExpression(height, Operator.EQ, 
Int32Type.instance.decompose(182)));
+
+        Assert.assertFalse(builder.complete().satisfiedBy(row, false));
+
+        builder = new Operation.Builder(OperationType.AND, controller);
+        builder.add(new SimpleExpression(location, Operator.EQ, 
UTF8Type.instance.decompose("US")));
+        builder.add(new SimpleExpression(age, Operator.GTE, 
Int32Type.instance.decompose(27)));
+
+        Assert.assertTrue(builder.complete().satisfiedBy(row, false));
+
+        builder = new Operation.Builder(OperationType.AND, controller);
+        builder.add(new SimpleExpression(location, Operator.EQ, 
UTF8Type.instance.decompose("BY")));
+        builder.add(new SimpleExpression(age, Operator.GTE, 
Int32Type.instance.decompose(28)));
+
+        Assert.assertFalse(builder.complete().satisfiedBy(row, false));
+
+        builder = new Operation.Builder(OperationType.AND, controller);
+        builder.add(new SimpleExpression(location, Operator.EQ, 
UTF8Type.instance.decompose("US")));
+        builder.add(new SimpleExpression(age, Operator.LTE, 
Int32Type.instance.decompose(27)));
+        builder.add(new SimpleExpression(height, Operator.GTE, 
Int32Type.instance.decompose(182)));
+
+        Assert.assertTrue(builder.complete().satisfiedBy(row, false));
+
+        builder = new Operation.Builder(OperationType.AND, controller);
+        builder.add(new SimpleExpression(location, Operator.EQ, 
UTF8Type.instance.decompose("US")));
+        builder.add(new SimpleExpression(height, Operator.GTE, 
Int32Type.instance.decompose(182)));
+        builder.add(new SimpleExpression(score, Operator.EQ, 
DoubleType.instance.decompose(1.0d)));
+
+        Assert.assertTrue(builder.complete().satisfiedBy(row, false));
+
+        builder = new Operation.Builder(OperationType.AND, controller);
+        builder.add(new SimpleExpression(height, Operator.GTE, 
Int32Type.instance.decompose(182)));
+        builder.add(new SimpleExpression(score, Operator.EQ, 
DoubleType.instance.decompose(1.0d)));
+
+        Assert.assertTrue(builder.complete().satisfiedBy(row, false));
+    }
+
+    private Map<Expression.Op, Expression> convert(Multimap<ColumnDefinition, 
Expression> expressions)
+    {
+        Map<Expression.Op, Expression> converted = new HashMap<>();
+        for (Expression expression : expressions.values())
+        {
+            Expression column = converted.get(expression.getOp());
+            assert column == null; // sanity check
+            converted.put(expression.getOp(), expression);
+        }
+
+        return converted;
+    }
+
+    private static class SimpleExpression extends RowFilter.Expression
+    {
+        protected SimpleExpression(ColumnDefinition column, Operator operator, 
ByteBuffer value)
+        {
+            super(column, operator, value);
+        }
+
+        @Override
+        protected Kind kind()
+        {
+            return Kind.SIMPLE;
+        }
+
+        @Override
+        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static Unfiltered buildRow(Cell... cells)
+    {
+        return buildRow(Clustering.EMPTY, null, cells);
+    }
+
+    private static Row buildRow(Row.Deletion deletion, Cell... cells)
+    {
+        return buildRow(Clustering.EMPTY, deletion, cells);
+    }
+
+    private static Row buildRow(Clustering clustering, Cell... cells)
+    {
+        return buildRow(clustering, null, cells);
+    }
+
+    private static Row buildRow(Clustering clustering, Row.Deletion deletion, 
Cell... cells)
+    {
+        Row.Builder rowBuilder = BTreeRow.sortedBuilder();
+        rowBuilder.newRow(clustering);
+        for (Cell c : cells)
+            rowBuilder.addCell(c);
+
+        if (deletion != null)
+            rowBuilder.addRowDeletion(deletion);
+
+        return rowBuilder.build();
+    }
+
+    private static Cell buildCell(ColumnDefinition column, ByteBuffer value, 
long timestamp)
+    {
+        return BufferCell.live(BACKEND.metadata, column, timestamp, value);
+    }
+
+    private static Cell deletedCell(ColumnDefinition column, long timestamp, 
int nowInSeconds)
+    {
+        return BufferCell.tombstone(column, timestamp, nowInSeconds);
+    }
+
+    private static ColumnDefinition getColumn(ByteBuffer name)
+    {
+        return getColumn(BACKEND, name);
+    }
+
+    private static ColumnDefinition getColumn(ColumnFamilyStore cfs, 
ByteBuffer name)
+    {
+        return cfs.metadata.getColumnDefinition(name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java 
b/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
new file mode 100644
index 0000000..96e7610
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.disk.Token;
+
+public class LongIterator extends RangeIterator<Long, Token>
+{
+    private final List<LongToken> tokens;
+    private int currentIdx = 0;
+
+    public LongIterator(long[] tokens)
+    {
+        super(tokens.length == 0 ? null : tokens[0], tokens.length == 0 ? null 
: tokens[tokens.length - 1], tokens.length);
+        this.tokens = new ArrayList<>(tokens.length);
+        for (long token : tokens)
+            this.tokens.add(new LongToken(token));
+    }
+
+    @Override
+    protected Token computeNext()
+    {
+        if (currentIdx >= tokens.size())
+            return endOfData();
+
+        return tokens.get(currentIdx++);
+    }
+
+    @Override
+    protected void performSkipTo(Long nextToken)
+    {
+        for (int i = currentIdx == 0 ? 0 : currentIdx - 1; i < tokens.size(); 
i++)
+        {
+            LongToken token = tokens.get(i);
+            if (token.get().compareTo(nextToken) >= 0)
+            {
+                currentIdx = i;
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException
+    {}
+
+    public static class LongToken extends Token
+    {
+        public LongToken(long token)
+        {
+            super(token);
+        }
+
+        @Override
+        public void merge(CombinedValue<Long> other)
+        {
+            // no-op
+        }
+
+        @Override
+        public Iterator<DecoratedKey> iterator()
+        {
+            return Collections.emptyIterator();
+        }
+    }
+
+    public static List<Long> convert(RangeIterator<Long, Token> tokens)
+    {
+        List<Long> results = new ArrayList<>();
+        while (tokens.hasNext())
+            results.add(tokens.next().get());
+
+        return results;
+    }
+
+    public static List<Long> convert(final long... nums)
+    {
+        return new ArrayList<Long>(nums.length)
+        {{
+                for (long n : nums)
+                    add(n);
+        }};
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/test/unit/org/apache/cassandra/index/sasi/utils/MappedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/index/sasi/utils/MappedBufferTest.java 
b/test/unit/org/apache/cassandra/index/sasi/utils/MappedBufferTest.java
new file mode 100644
index 0000000..7ffebf1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/sasi/utils/MappedBufferTest.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.FileUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MappedBufferTest
+{
+    @Test
+    public void testBasicWriteThenRead() throws Exception
+    {
+        long numLongs = 10000;
+        final MappedBuffer buffer = createTestFile(numLongs);
+
+        Assert.assertEquals(0, buffer.position());
+        for (long i = 0; i < numLongs; i++)
+        {
+            Assert.assertEquals(i * 8, buffer.position());
+            Assert.assertEquals(i, buffer.getLong());
+        }
+
+        buffer.position(0);
+        for (long i = 0; i < numLongs; i++)
+        {
+            Assert.assertEquals(i, buffer.getLong(i * 8));
+            Assert.assertEquals(0, buffer.position());
+        }
+
+        // read all the numbers as shorts (all numbers fit into four bytes)
+        for (long i = 0; i < Math.min(Integer.MAX_VALUE, numLongs); i++)
+            Assert.assertEquals(i, buffer.getInt((i * 8) + 4));
+
+        // read all the numbers as shorts (all numbers fit into two bytes)
+        for (long i = 0; i < Math.min(Short.MAX_VALUE, numLongs); i++) {
+            Assert.assertEquals(i, buffer.getShort((i * 8) + 6));
+        }
+
+        // read all the numbers that can be represented as a single byte
+        for (long i = 0; i < 128; i++)
+            Assert.assertEquals(i, buffer.get((i * 8) + 7));
+
+        buffer.close();
+    }
+
+    @Test
+    public void testDuplicate() throws Exception
+    {
+        long numLongs = 10;
+        final MappedBuffer buffer1 = createTestFile(numLongs);
+
+        Assert.assertEquals(0, buffer1.getLong());
+        Assert.assertEquals(1, buffer1.getLong());
+
+        final MappedBuffer buffer2 = buffer1.duplicate();
+
+        Assert.assertEquals(2, buffer1.getLong());
+        Assert.assertEquals(2, buffer2.getLong());
+
+        buffer2.position(0);
+        Assert.assertEquals(3, buffer1.getLong());
+        Assert.assertEquals(0, buffer2.getLong());
+    }
+
+    @Test
+    public void testLimit() throws Exception
+    {
+        long numLongs =  10;
+        final MappedBuffer buffer1 = createTestFile(numLongs);
+
+        MappedBuffer buffer2 = buffer1.duplicate().position(16).limit(32);
+        buffer1.position(0).limit(16);
+        List<Long> longs = new ArrayList<>(4);
+
+        while (buffer1.hasRemaining())
+            longs.add(buffer1.getLong());
+
+        while (buffer2.hasRemaining())
+            longs.add(buffer2.getLong());
+
+        Assert.assertArrayEquals(new Long[]{0L, 1L, 2L, 3L}, longs.toArray());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPositionGreaterThanLimit() throws Exception
+    {
+        final MappedBuffer buffer = createTestFile(1);
+
+        buffer.limit(4);
+
+        try
+        {
+            buffer.position(buffer.limit() + 1);
+        }
+        finally
+        {
+            buffer.close();
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNegativePosition() throws Exception
+    {
+        try (MappedBuffer buffer = createTestFile(1))
+        {
+            buffer.position(-1);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testLimitGreaterThanCapacity() throws Exception
+    {
+        try (MappedBuffer buffer = createTestFile(1))
+        {
+            buffer.limit(buffer.capacity() + 1);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testLimitLessThanPosition() throws Exception
+    {
+        final MappedBuffer buffer = createTestFile(1);
+
+        buffer.position(1);
+
+        try
+        {
+            buffer.limit(0);
+        }
+        finally
+        {
+            buffer.close();
+        }
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetRelativeUnderflow() throws Exception
+    {
+        final MappedBuffer buffer = createTestFile(1);
+
+        buffer.position(buffer.limit());
+        try
+        {
+            buffer.get();
+        }
+        finally
+        {
+            buffer.close();
+        }
+
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetAbsoluteGreaterThanCapacity() throws Exception
+    {
+        try (MappedBuffer buffer = createTestFile(1))
+        {
+            buffer.get(buffer.limit());
+        }
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetAbsoluteNegativePosition() throws Exception
+    {
+        try (MappedBuffer buffer = createTestFile(1))
+        {
+            buffer.get(-1);
+        }
+    }
+
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetShortRelativeUnderflow() throws Exception
+    {
+        final MappedBuffer buffer = createTestFile(1);
+
+        buffer.position(buffer.capacity() - 1);
+        try
+        {
+            buffer.getShort();
+        }
+        finally
+        {
+            buffer.close();
+        }
+
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetShortAbsoluteGreaterThanCapacity() throws Exception
+    {
+        final MappedBuffer buffer = createTestFile(1);
+
+        Assert.assertEquals(8, buffer.capacity());
+        try
+        {
+            buffer.getShort(buffer.capacity() - 1);
+        }
+        finally
+        {
+            buffer.close();
+        }
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetShortAbsoluteNegativePosition() throws Exception
+    {
+        try (MappedBuffer buffer = createTestFile(1))
+        {
+            buffer.getShort(-1);
+        }
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetIntRelativeUnderflow() throws Exception
+    {
+        final MappedBuffer buffer = createTestFile(1);
+
+        buffer.position(buffer.capacity() - 3);
+        try
+        {
+            buffer.getInt();
+        }
+        finally
+        {
+            buffer.close();
+        }
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetIntAbsoluteGreaterThanCapacity() throws Exception
+    {
+        final MappedBuffer buffer = createTestFile(1);
+
+        Assert.assertEquals(8, buffer.capacity());
+        try
+        {
+            buffer.getInt(buffer.capacity() - 3);
+        }
+        finally
+        {
+            buffer.close();
+        }
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetIntAbsoluteNegativePosition() throws Exception
+    {
+        try (MappedBuffer buffer = createTestFile(1))
+        {
+            buffer.getInt(-1);
+        }
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetLongRelativeUnderflow() throws Exception
+    {
+        final MappedBuffer buffer = createTestFile(1);
+
+        buffer.position(buffer.capacity() - 7);
+        try
+        {
+            buffer.getLong();
+        }
+        finally
+        {
+            buffer.close();
+        }
+
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetLongAbsoluteGreaterThanCapacity() throws Exception
+    {
+        final MappedBuffer buffer = createTestFile(1);
+
+        Assert.assertEquals(8, buffer.capacity());
+        try
+        {
+            buffer.getLong(buffer.capacity() - 7);
+        }
+        finally
+        {
+            buffer.close();
+        }
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testGetLongAbsoluteNegativePosition() throws Exception
+    {
+        try (MappedBuffer buffer = createTestFile(1))
+        {
+            buffer.getLong(-1);
+        }
+    }
+
+    @Test
+    public void testGetPageRegion() throws Exception
+    {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        int numLongs = 1000;
+        int byteSize = 8;
+        int capacity = numLongs * byteSize;
+        try (MappedBuffer buffer = createTestFile(numLongs))
+        {
+            for (int i = 0; i < 1000; i++)
+            {
+                // offset, length are always aligned on sizeof(long)
+                int offset = random.nextInt(0, 1000 * byteSize - byteSize) & 
~(byteSize - 1);
+                int length = Math.min(capacity, random.nextInt(byteSize, 
capacity - offset) & ~(byteSize - 1));
+
+                ByteBuffer region = buffer.getPageRegion(offset, length);
+                for (int j = offset; j < (offset + length); j += 8)
+                    Assert.assertEquals(j / 8, region.getLong(j));
+            }
+        }
+    }
+
+    @Test (expected = IllegalArgumentException.class)
+    public void testMisalignedRegionAccess() throws Exception
+    {
+        try (MappedBuffer buffer = createTestFile(100, 8, 4, 0))
+        {
+            buffer.getPageRegion(13, 27);
+        }
+    }
+
+    @Test
+    public void testSequentialIterationWithPadding() throws Exception
+    {
+        long numValues = 1000;
+        int maxPageBits = 6; // 64 bytes page
+        int[] paddings = new int[] { 0, 3, 5, 7, 9, 11, 13 };
+
+        // test different page sizes, with different padding and types
+        for (int numPageBits = 3; numPageBits <= maxPageBits; numPageBits++)
+        {
+            for (int typeSize = 2; typeSize <= 8; typeSize *= 2)
+            {
+                for (int padding : paddings)
+                {
+                    try (MappedBuffer buffer = createTestFile(numValues, 
typeSize, numPageBits, padding))
+                    {
+                        long offset = 0;
+                        for (long j = 0; j < numValues; j++)
+                        {
+                            switch (typeSize)
+                            {
+                                case 2:
+                                    Assert.assertEquals(j, 
buffer.getShort(offset));
+                                    break;
+
+                                case 4:
+                                    Assert.assertEquals(j, 
buffer.getInt(offset));
+                                    break;
+
+                                case 8:
+                                    Assert.assertEquals(j, 
buffer.getLong(offset));
+                                    break;
+
+                                default:
+                                    throw new AssertionError();
+                            }
+
+                            offset += typeSize + padding;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSequentialIteration() throws IOException
+    {
+        long numValues = 1000;
+        for (int typeSize = 2; typeSize <= 8; typeSize *= 2)
+        {
+            try (MappedBuffer buffer = createTestFile(numValues, typeSize, 16, 
0))
+            {
+                for (int j = 0; j < numValues; j++)
+                {
+                    Assert.assertEquals(j * typeSize, buffer.position());
+
+                    switch (typeSize)
+                    {
+                        case 2:
+                            Assert.assertEquals(j, buffer.getShort());
+                            break;
+
+                        case 4:
+                            Assert.assertEquals(j, buffer.getInt());
+                            break;
+
+                        case 8:
+                            Assert.assertEquals(j, buffer.getLong());
+                            break;
+
+                        default:
+                            throw new AssertionError();
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testCompareToPage() throws IOException
+    {
+        long numValues = 100;
+        int typeSize = 8;
+
+        try (MappedBuffer buffer = createTestFile(numValues))
+        {
+            for (long i = 0; i < numValues * typeSize; i += typeSize)
+            {
+                long value = i / typeSize;
+                Assert.assertEquals(0, buffer.comparePageTo(i, typeSize, 
LongType.instance, LongType.instance.decompose(value)));
+            }
+        }
+    }
+
+    @Test
+    public void testOpenWithoutPageBits() throws IOException
+    {
+        File tmp = File.createTempFile("mapped-buffer", "tmp");
+        tmp.deleteOnExit();
+
+        RandomAccessFile file = new RandomAccessFile(tmp, "rw");
+
+        long numValues = 1000;
+        for (long i = 0; i < numValues; i++)
+            file.writeLong(i);
+
+        file.getFD().sync();
+
+        try (MappedBuffer buffer = new MappedBuffer(new 
ChannelProxy(tmp.getAbsolutePath(), file.getChannel())))
+        {
+            Assert.assertEquals(numValues * 8, buffer.limit());
+            Assert.assertEquals(numValues * 8, buffer.capacity());
+
+            for (long i = 0; i < numValues; i++)
+            {
+                Assert.assertEquals(i * 8, buffer.position());
+                Assert.assertEquals(i, buffer.getLong());
+            }
+        }
+        finally
+        {
+            FileUtils.closeQuietly(file);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIncorrectPageSize() throws Exception
+    {
+        new MappedBuffer(null, 33);
+    }
+
+    private MappedBuffer createTestFile(long numCount) throws IOException
+    {
+        return createTestFile(numCount, 8, 16, 0);
+    }
+
+    private MappedBuffer createTestFile(long numCount, int typeSize, int 
numPageBits, int padding) throws IOException
+    {
+        final File testFile = File.createTempFile("mapped-buffer-test", "db");
+        testFile.deleteOnExit();
+
+        RandomAccessFile file = new RandomAccessFile(testFile, "rw");
+
+        for (long i = 0; i < numCount; i++)
+        {
+
+            switch (typeSize)
+            {
+                case 1:
+                    file.write((byte) i);
+                    break;
+
+                case 2:
+                    file.writeShort((short) i);
+                    break;
+
+                case 4:
+                    file.writeInt((int) i);
+                    break;
+
+                case 8:
+                    // bunch of longs
+                    file.writeLong(i);
+                    break;
+
+                default:
+                    throw new IllegalArgumentException("unknown byte size: " + 
typeSize);
+            }
+
+            for (int j = 0; j < padding; j++)
+                file.write(0);
+        }
+
+        file.getFD().sync();
+
+        try
+        {
+            return new MappedBuffer(new 
ChannelProxy(testFile.getAbsolutePath(), file.getChannel()), numPageBits);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(file);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/test/unit/org/apache/cassandra/index/sasi/utils/RangeIntersectionIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/index/sasi/utils/RangeIntersectionIteratorTest.java
 
b/test/unit/org/apache/cassandra/index/sasi/utils/RangeIntersectionIteratorTest.java
new file mode 100644
index 0000000..18b9dd7
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/index/sasi/utils/RangeIntersectionIteratorTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.cassandra.index.sasi.disk.Token;
+import 
org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator.Strategy;
+import 
org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator.LookupIntersectionIterator;
+import 
org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator.BounceIntersectionIterator;
+import org.apache.cassandra.io.util.FileUtils;
+
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.cassandra.index.sasi.utils.LongIterator.convert;
+
+public class RangeIntersectionIteratorTest
+{
+    @Test
+    public void testNoOverlappingValues()
+    {
+        for (Strategy strategy : Strategy.values())
+            testNoOverlappingValues(strategy);
+    }
+
+    private void testNoOverlappingValues(Strategy strategy)
+    {
+        RangeIterator.Builder<Long, Token> builder = 
RangeIntersectionIterator.builder(strategy);
+
+        builder.add(new LongIterator(new long[] { 2L, 3L, 5L, 6L }));
+        builder.add(new LongIterator(new long[] { 1L, 7L }));
+        builder.add(new LongIterator(new long[] { 4L, 8L, 9L, 10L }));
+
+        Assert.assertEquals(convert(), convert(builder.build()));
+
+        builder = RangeIntersectionIterator.builder(strategy);
+        // both ranges overlap by min/max but not by value
+        builder.add(new LongIterator(new long[] { 1L, 5L, 7L, 9L }));
+        builder.add(new LongIterator(new long[] { 6L }));
+
+        RangeIterator<Long, Token> range = builder.build();
+
+        Assert.assertNotNull(range);
+        Assert.assertFalse(range.hasNext());
+
+        builder = RangeIntersectionIterator.builder(strategy);
+        // both ranges overlap by min/max but not by value
+        builder.add(new LongIterator(new long[] { 1L, 5L, 7L, 9L }));
+        builder.add(new LongIterator(new long[] { 0L, 10L, 12L }));
+
+        range = builder.build();
+
+        Assert.assertNotNull(range);
+        Assert.assertFalse(range.hasNext());
+    }
+
+    @Test
+    public void testOverlappingValues()
+    {
+        for (Strategy strategy : Strategy.values())
+            testOverlappingValues(strategy);
+    }
+
+    private void testOverlappingValues(Strategy strategy)
+    {
+        RangeIterator.Builder<Long, Token> builder = 
RangeIntersectionIterator.builder(strategy);
+
+        builder.add(new LongIterator(new long[] { 1L, 4L, 6L, 7L }));
+        builder.add(new LongIterator(new long[] { 2L, 4L, 5L, 6L }));
+        builder.add(new LongIterator(new long[] { 4L, 6L, 8L, 9L, 10L }));
+
+        Assert.assertEquals(convert(4L, 6L), convert(builder.build()));
+    }
+
+    @Test
+    public void testSingleIterator()
+    {
+        for (Strategy strategy : Strategy.values())
+            testSingleIterator(strategy);
+    }
+
+    private void testSingleIterator(Strategy strategy)
+    {
+        RangeIntersectionIterator.Builder<Long, Token> builder = 
RangeIntersectionIterator.builder(strategy);
+
+        builder.add(new LongIterator(new long[] { 1L, 2L, 4L, 9L }));
+
+        Assert.assertEquals(convert(1L, 2L, 4L, 9L), convert(builder.build()));
+    }
+
+    @Test
+    public void testSkipTo()
+    {
+        for (Strategy strategy : Strategy.values())
+            testSkipTo(strategy);
+    }
+
+    private void testSkipTo(Strategy strategy)
+    {
+        RangeIterator.Builder<Long, Token> builder = 
RangeIntersectionIterator.builder(strategy);
+
+        builder.add(new LongIterator(new long[] { 1L, 4L, 6L, 7L, 9L, 10L }));
+        builder.add(new LongIterator(new long[] { 2L, 4L, 5L, 6L, 7L, 10L, 12L 
}));
+        builder.add(new LongIterator(new long[] { 4L, 6L, 7L, 9L, 10L }));
+
+        RangeIterator<Long, Token> range = builder.build();
+        Assert.assertNotNull(range);
+
+        // first let's skipTo something before range
+        Assert.assertEquals(4L, (long) range.skipTo(3L).get());
+        Assert.assertEquals(4L, (long) range.getCurrent());
+
+        // now let's skip right to the send value
+        Assert.assertEquals(6L, (long) range.skipTo(5L).get());
+        Assert.assertEquals(6L, (long) range.getCurrent());
+
+        // now right to the element
+        Assert.assertEquals(7L, (long) range.skipTo(7L).get());
+        Assert.assertEquals(7L, (long) range.getCurrent());
+        Assert.assertEquals(7L, (long) range.next().get());
+
+        Assert.assertTrue(range.hasNext());
+        Assert.assertEquals(10L, (long) range.getCurrent());
+
+        // now right after the last element
+        Assert.assertNull(range.skipTo(11L));
+        Assert.assertFalse(range.hasNext());
+    }
+
+    @Test
+    public void testMinMaxAndCount()
+    {
+        for (Strategy strategy : Strategy.values())
+            testMinMaxAndCount(strategy);
+    }
+
+    private void testMinMaxAndCount(Strategy strategy)
+    {
+        RangeIterator.Builder<Long, Token> builder = 
RangeIntersectionIterator.builder(strategy);
+
+        builder.add(new LongIterator(new long[]{1L, 2L, 9L}));
+        builder.add(new LongIterator(new long[]{4L, 5L, 9L}));
+        builder.add(new LongIterator(new long[]{7L, 8L, 9L}));
+
+        Assert.assertEquals(9L, (long) builder.getMaximum());
+        Assert.assertEquals(9L, builder.getTokenCount());
+
+        RangeIterator<Long, Token> tokens = builder.build();
+
+        Assert.assertNotNull(tokens);
+        Assert.assertEquals(7L, (long) tokens.getMinimum());
+        Assert.assertEquals(9L, (long) tokens.getMaximum());
+        Assert.assertEquals(9L, tokens.getCount());
+
+        Assert.assertEquals(convert(9L), convert(builder.build()));
+    }
+
+    @Test
+    public void testBuilder()
+    {
+        for (Strategy strategy : Strategy.values())
+            testBuilder(strategy);
+    }
+
+    private void testBuilder(Strategy strategy)
+    {
+        RangeIterator.Builder<Long, Token> builder = 
RangeIntersectionIterator.builder(strategy);
+
+        Assert.assertNull(builder.getMinimum());
+        Assert.assertNull(builder.getMaximum());
+        Assert.assertEquals(0L, builder.getTokenCount());
+        Assert.assertEquals(0L, builder.rangeCount());
+
+        builder.add(new LongIterator(new long[] { 1L, 2L, 6L }));
+        builder.add(new LongIterator(new long[] { 4L, 5L, 6L }));
+        builder.add(new LongIterator(new long[] { 6L, 8L, 9L }));
+
+        Assert.assertEquals(6L, (long) builder.getMinimum());
+        Assert.assertEquals(6L, (long) builder.getMaximum());
+        Assert.assertEquals(9L, builder.getTokenCount());
+        Assert.assertEquals(3L, builder.rangeCount());
+        Assert.assertFalse(builder.statistics.isDisjoint());
+
+        Assert.assertEquals(1L, (long) builder.ranges.poll().getMinimum());
+        Assert.assertEquals(4L, (long) builder.ranges.poll().getMinimum());
+        Assert.assertEquals(6L, (long) builder.ranges.poll().getMinimum());
+
+        builder.add(new LongIterator(new long[] { 1L, 2L, 6L }));
+        builder.add(new LongIterator(new long[] { 4L, 5L, 6L }));
+        builder.add(new LongIterator(new long[] { 6L, 8L, 9L }));
+
+        Assert.assertEquals(convert(6L), convert(builder.build()));
+
+        builder = RangeIntersectionIterator.builder(strategy);
+        builder.add(new LongIterator(new long[]{ 1L, 5L, 6L }));
+        builder.add(new LongIterator(new long[]{ 3L, 5L, 6L }));
+
+        RangeIterator<Long, Token> tokens = builder.build();
+
+        Assert.assertEquals(convert(5L, 6L), convert(tokens));
+
+        FileUtils.closeQuietly(tokens);
+
+        RangeIterator emptyTokens = 
RangeIntersectionIterator.builder(strategy).build();
+        Assert.assertNull(emptyTokens);
+
+        builder = RangeIntersectionIterator.builder(strategy);
+        Assert.assertEquals(0L, builder.add((RangeIterator<Long, Token>) 
null).rangeCount());
+        Assert.assertEquals(0L, builder.add((List<RangeIterator<Long, Token>>) 
null).getTokenCount());
+        Assert.assertEquals(0L, builder.add(new LongIterator(new long[] 
{})).rangeCount());
+
+        RangeIterator<Long, Token> single = new LongIterator(new long[] { 1L, 
2L, 3L });
+        RangeIterator<Long, Token> range = RangeIntersectionIterator.<Long, 
Token>builder().add(single).build();
+
+        // because build should return first element if it's only one instead 
of building yet another iterator
+        Assert.assertEquals(range, single);
+
+        // disjoint case
+        builder = RangeIntersectionIterator.builder();
+        builder.add(new LongIterator(new long[] { 1L, 2L, 3L }));
+        builder.add(new LongIterator(new long[] { 4L, 5L, 6L }));
+
+        Assert.assertTrue(builder.statistics.isDisjoint());
+
+        RangeIterator<Long, Token> disjointIntersection = builder.build();
+        Assert.assertNotNull(disjointIntersection);
+        Assert.assertFalse(disjointIntersection.hasNext());
+
+    }
+
+    @Test
+    public void testClose() throws IOException
+    {
+        for (Strategy strategy : Strategy.values())
+            testClose(strategy);
+    }
+
+    private void testClose(Strategy strategy) throws IOException
+    {
+        RangeIterator<Long, Token> tokens = RangeIntersectionIterator.<Long, 
Token>builder(strategy)
+                                            .add(new LongIterator(new long[] { 
1L, 2L, 3L }))
+                                            .build();
+
+        Assert.assertNotNull(tokens);
+        tokens.close();
+    }
+
+    @Test
+    public void testIsOverlapping()
+    {
+        RangeIterator<Long, Token> rangeA, rangeB;
+
+        rangeA = new LongIterator(new long[] { 1L, 5L });
+        rangeB = new LongIterator(new long[] { 5L, 9L });
+        Assert.assertTrue(RangeIterator.isOverlapping(rangeA, rangeB));
+
+        rangeA = new LongIterator(new long[] { 5L, 9L });
+        rangeB = new LongIterator(new long[] { 1L, 6L });
+        Assert.assertTrue(RangeIterator.isOverlapping(rangeA, rangeB));
+
+        rangeA = new LongIterator(new long[] { 5L, 9L });
+        rangeB = new LongIterator(new long[] { 5L, 9L });
+        Assert.assertTrue(RangeIterator.isOverlapping(rangeA, rangeB));
+
+        rangeA = new LongIterator(new long[] { 1L, 4L });
+        rangeB = new LongIterator(new long[] { 5L, 9L });
+        Assert.assertFalse(RangeIterator.isOverlapping(rangeA, rangeB));
+
+        rangeA = new LongIterator(new long[] { 6L, 9L });
+        rangeB = new LongIterator(new long[] { 1L, 4L });
+        Assert.assertFalse(RangeIterator.isOverlapping(rangeA, rangeB));
+    }
+
+    @Test
+    public void testIntersectionOfRandomRanges()
+    {
+        for (Strategy strategy : Strategy.values())
+            testIntersectionOfRandomRanges(strategy);
+    }
+
+    private void testIntersectionOfRandomRanges(Strategy strategy)
+    {
+        for (int attempt = 0; attempt < 16; attempt++)
+        {
+            final ThreadLocalRandom random = ThreadLocalRandom.current();
+            final int maxRanges = random.nextInt(2, 16);
+
+            // generate randomize ranges
+            long[][] ranges = new long[maxRanges][];
+            for (int i = 0; i < ranges.length; i++)
+            {
+                int rangeSize = random.nextInt(16, 512);
+                LongSet range = new LongOpenHashSet(rangeSize);
+
+                for (int j = 0; j < rangeSize; j++)
+                    range.add(random.nextLong(0, 100));
+
+                ranges[i] = range.toArray();
+                Arrays.sort(ranges[i]);
+            }
+
+            List<Long> expected = new ArrayList<>();
+            // determine unique tokens which intersect every range
+            for (long token : ranges[0])
+            {
+                boolean intersectsAll = true;
+                for (int i = 1; i < ranges.length; i++)
+                {
+                    if (Arrays.binarySearch(ranges[i], token) < 0)
+                    {
+                        intersectsAll = false;
+                        break;
+                    }
+                }
+
+                if (intersectsAll)
+                    expected.add(token);
+            }
+
+            RangeIterator.Builder<Long, Token> builder = 
RangeIntersectionIterator.builder(strategy);
+            for (long[] range : ranges)
+                builder.add(new LongIterator(range));
+
+            Assert.assertEquals(expected, convert(builder.build()));
+        }
+    }
+
+    @Test
+    public void testIteratorPeeking()
+    {
+        RangeIterator.Builder<Long, Token> builder = 
RangeIntersectionIterator.builder();
+
+        // iterator with only one element
+        builder.add(new LongIterator(new long[] { 10L }));
+
+        // iterator with 150 elements (lookup is going to be advantageous over 
bound in this case)
+        long[] tokens = new long[150];
+        for (int i = 0; i < tokens.length; i++)
+            tokens[i] = i;
+
+        builder.add(new LongIterator(tokens));
+
+        RangeIterator<Long, Token> intersection = builder.build();
+
+        Assert.assertNotNull(intersection);
+        Assert.assertEquals(LookupIntersectionIterator.class, 
intersection.getClass());
+
+        Assert.assertTrue(intersection.hasNext());
+        Assert.assertEquals(convert(10L), convert(intersection));
+
+        builder = RangeIntersectionIterator.builder();
+
+        builder.add(new LongIterator(new long[] { 1L, 3L, 5L, 7L, 9L }));
+        builder.add(new LongIterator(new long[] { 1L, 2L, 5L, 6L }));
+
+        intersection = builder.build();
+
+        // in the situation when there is a similar number of elements inside 
ranges
+        // ping-pong (bounce) intersection is preferred as it covers gaps 
quicker then linear scan + lookup.
+        Assert.assertNotNull(intersection);
+        Assert.assertEquals(BounceIntersectionIterator.class, 
intersection.getClass());
+
+        Assert.assertTrue(intersection.hasNext());
+        Assert.assertEquals(convert(1L, 5L), convert(intersection));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java 
b/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java
new file mode 100644
index 0000000..f69086b
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.io.util.FileUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.cassandra.index.sasi.utils.LongIterator.convert;
+
+public class RangeUnionIteratorTest
+{
+    @Test
+    public void testNoOverlappingValues()
+    {
+        RangeUnionIterator.Builder<Long, Token> builder = 
RangeUnionIterator.builder();
+
+        builder.add(new LongIterator(new long[] { 2L, 3L, 5L, 6L }));
+        builder.add(new LongIterator(new long[] { 1L, 7L }));
+        builder.add(new LongIterator(new long[] { 4L, 8L, 9L, 10L }));
+
+        Assert.assertEquals(convert(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), 
convert(builder.build()));
+    }
+
+    @Test
+    public void testSingleIterator()
+    {
+        RangeUnionIterator.Builder<Long, Token> builder = 
RangeUnionIterator.builder();
+
+        builder.add(new LongIterator(new long[] { 1L, 2L, 4L, 9L }));
+
+        Assert.assertEquals(convert(1L, 2L, 4L, 9L), convert(builder.build()));
+    }
+
+    @Test
+    public void testOverlappingValues()
+    {
+        RangeUnionIterator.Builder<Long, Token> builder = 
RangeUnionIterator.builder();
+
+        builder.add(new LongIterator(new long[] { 1L, 4L, 6L, 7L }));
+        builder.add(new LongIterator(new long[] { 2L, 3L, 5L, 6L }));
+        builder.add(new LongIterator(new long[] { 4L, 6L, 8L, 9L, 10L }));
+
+        List<Long> values = convert(builder.build());
+
+        Assert.assertEquals(values.toString(), convert(1L, 2L, 3L, 4L, 5L, 6L, 
7L, 8L, 9L, 10L), values);
+    }
+
+    @Test
+    public void testNoOverlappingRanges()
+    {
+        RangeUnionIterator.Builder<Long, Token> builder = 
RangeUnionIterator.builder();
+
+        builder.add(new LongIterator(new long[] { 1L, 2L, 3L }));
+        builder.add(new LongIterator(new long[] { 4L, 5L, 6L }));
+        builder.add(new LongIterator(new long[] { 7L, 8L, 9L }));
+
+        Assert.assertEquals(convert(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L), 
convert(builder.build()));
+    }
+
+    @Test
+    public void testTwoIteratorsWithSingleValues()
+    {
+        RangeUnionIterator.Builder<Long, Token> builder = 
RangeUnionIterator.builder();
+
+        builder.add(new LongIterator(new long[] { 1L }));
+        builder.add(new LongIterator(new long[] { 1L }));
+
+        Assert.assertEquals(convert(1L), convert(builder.build()));
+    }
+
+    @Test
+    public void testDifferentSizeIterators()
+    {
+        RangeUnionIterator.Builder<Long, Token> builder = 
RangeUnionIterator.builder();
+
+        builder.add(new LongIterator(new long[] { 2L, 3L, 5L, 6L, 12L, 13L }));
+        builder.add(new LongIterator(new long[] { 1L, 7L, 14L, 15 }));
+        builder.add(new LongIterator(new long[] { 4L, 5L, 8L, 9L, 10L }));
+
+        Assert.assertEquals(convert(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 
12L, 13L, 14L, 15L), convert(builder.build()));
+    }
+
+    @Test
+    public void testRandomSequences()
+    {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        long[][] values = new long[random.nextInt(1, 20)][];
+        int numTests = random.nextInt(10, 20);
+
+        for (int tests = 0; tests < numTests; tests++)
+        {
+            RangeUnionIterator.Builder<Long, Token> builder = 
RangeUnionIterator.builder();
+            int totalCount = 0;
+
+            for (int i = 0; i < values.length; i++)
+            {
+                long[] part = new long[random.nextInt(1, 500)];
+                for (int j = 0; j < part.length; j++)
+                    part[j] = random.nextLong();
+
+                // all of the parts have to be sorted to mimic SSTable
+                Arrays.sort(part);
+
+                values[i] = part;
+                builder.add(new LongIterator(part));
+                totalCount += part.length;
+            }
+
+            long[] totalOrdering = new long[totalCount];
+            int index = 0;
+
+            for (long[] part : values)
+            {
+                for (long value : part)
+                    totalOrdering[index++] = value;
+            }
+
+            Arrays.sort(totalOrdering);
+
+            int count = 0;
+            RangeIterator<Long, Token> tokens = builder.build();
+
+            Assert.assertNotNull(tokens);
+            while (tokens.hasNext())
+                Assert.assertEquals(totalOrdering[count++], (long) 
tokens.next().get());
+
+            Assert.assertEquals(totalCount, count);
+        }
+    }
+
+    @Test
+    public void testMinMaxAndCount()
+    {
+        RangeUnionIterator.Builder<Long, Token> builder = 
RangeUnionIterator.builder();
+
+        builder.add(new LongIterator(new long[] { 1L, 2L, 3L }));
+        builder.add(new LongIterator(new long[] { 4L, 5L, 6L }));
+        builder.add(new LongIterator(new long[] { 7L, 8L, 9L }));
+
+        Assert.assertEquals(9L, (long) builder.getMaximum());
+        Assert.assertEquals(9L, builder.getTokenCount());
+
+        RangeIterator<Long, Token> tokens = builder.build();
+
+        Assert.assertNotNull(tokens);
+        Assert.assertEquals(1L, (long) tokens.getMinimum());
+        Assert.assertEquals(9L, (long) tokens.getMaximum());
+        Assert.assertEquals(9L, tokens.getCount());
+
+        for (long i = 1; i < 10; i++)
+        {
+            Assert.assertTrue(tokens.hasNext());
+            Assert.assertEquals(i, (long) tokens.next().get());
+        }
+
+        Assert.assertFalse(tokens.hasNext());
+        Assert.assertEquals(1L, (long) tokens.getMinimum());
+    }
+
+    @Test
+    public void testBuilder()
+    {
+        RangeUnionIterator.Builder<Long, Token> builder = 
RangeUnionIterator.builder();
+
+        Assert.assertNull(builder.getMinimum());
+        Assert.assertNull(builder.getMaximum());
+        Assert.assertEquals(0L, builder.getTokenCount());
+        Assert.assertEquals(0L, builder.rangeCount());
+
+        builder.add(new LongIterator(new long[] { 1L, 2L, 3L }));
+        builder.add(new LongIterator(new long[] { 4L, 5L, 6L }));
+        builder.add(new LongIterator(new long[] { 7L, 8L, 9L }));
+
+        Assert.assertEquals(1L, (long) builder.getMinimum());
+        Assert.assertEquals(9L, (long) builder.getMaximum());
+        Assert.assertEquals(9L, builder.getTokenCount());
+        Assert.assertEquals(3L, builder.rangeCount());
+        Assert.assertFalse(builder.statistics.isDisjoint());
+
+        Assert.assertEquals(1L, (long) builder.ranges.poll().getMinimum());
+        Assert.assertEquals(4L, (long) builder.ranges.poll().getMinimum());
+        Assert.assertEquals(7L, (long) builder.ranges.poll().getMinimum());
+
+        RangeIterator<Long, Token> tokens = RangeUnionIterator.build(new 
ArrayList<RangeIterator<Long, Token>>()
+        {{
+            add(new LongIterator(new long[]{1L, 2L, 4L}));
+            add(new LongIterator(new long[]{3L, 5L, 6L}));
+        }});
+
+        Assert.assertEquals(convert(1L, 2L, 3L, 4L, 5L, 6L), convert(tokens));
+
+        FileUtils.closeQuietly(tokens);
+
+        RangeIterator emptyTokens = RangeUnionIterator.builder().build();
+        Assert.assertNull(emptyTokens);
+
+        builder = RangeUnionIterator.builder();
+        Assert.assertEquals(0L, builder.add((RangeIterator<Long, Token>) 
null).rangeCount());
+        Assert.assertEquals(0L, builder.add((List<RangeIterator<Long, Token>>) 
null).getTokenCount());
+        Assert.assertEquals(0L, builder.add(new LongIterator(new long[] 
{})).rangeCount());
+
+        RangeIterator<Long, Token> single = new LongIterator(new long[] { 1L, 
2L, 3L });
+        RangeIterator<Long, Token> range = RangeIntersectionIterator.<Long, 
Token>builder().add(single).build();
+
+        // because build should return first element if it's only one instead 
of building yet another iterator
+        Assert.assertEquals(range, single);
+    }
+
+    @Test
+    public void testSkipTo()
+    {
+        RangeUnionIterator.Builder<Long, Token> builder = 
RangeUnionIterator.builder();
+
+        builder.add(new LongIterator(new long[]{1L, 2L, 3L}));
+        builder.add(new LongIterator(new long[]{4L, 5L, 6L}));
+        builder.add(new LongIterator(new long[]{7L, 8L, 9L}));
+
+        RangeIterator<Long, Token> tokens = builder.build();
+        Assert.assertNotNull(tokens);
+
+        tokens.skipTo(5L);
+        Assert.assertTrue(tokens.hasNext());
+        Assert.assertEquals(5L, (long) tokens.next().get());
+
+        tokens.skipTo(7L);
+        Assert.assertTrue(tokens.hasNext());
+        Assert.assertEquals(7L, (long) tokens.next().get());
+
+        tokens.skipTo(10L);
+        Assert.assertFalse(tokens.hasNext());
+        Assert.assertEquals(1L, (long) tokens.getMinimum());
+        Assert.assertEquals(9L, (long) tokens.getMaximum());
+    }
+
+    @Test
+    public void testMergingMultipleIterators()
+    {
+        RangeUnionIterator.Builder<Long, Token> builderA = 
RangeUnionIterator.builder();
+
+        builderA.add(new LongIterator(new long[] { 1L, 3L, 5L }));
+        builderA.add(new LongIterator(new long[] { 8L, 10L, 12L }));
+
+        RangeUnionIterator.Builder<Long, Token> builderB = 
RangeUnionIterator.builder();
+
+        builderB.add(new LongIterator(new long[] { 7L, 9L, 11L }));
+        builderB.add(new LongIterator(new long[] { 2L, 4L, 6L }));
+
+        RangeIterator<Long, Token> union = 
RangeUnionIterator.build(Arrays.asList(builderA.build(), builderB.build()));
+        Assert.assertEquals(convert(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 
11L, 12L), convert(union));
+    }
+
+    @Test
+    public void testRangeIterator()
+    {
+        LongIterator tokens = new LongIterator(new long[] { 0L, 1L, 2L, 3L });
+
+        Assert.assertEquals(0L, (long) tokens.getMinimum());
+        Assert.assertEquals(3L, (long) tokens.getMaximum());
+
+        for (int i = 0; i <= 3; i++)
+        {
+            Assert.assertTrue(tokens.hasNext());
+            Assert.assertEquals(i, (long) tokens.getCurrent());
+            Assert.assertEquals(i, (long) tokens.next().get());
+        }
+
+        tokens = new LongIterator(new long[] { 0L, 1L, 3L, 5L });
+
+        Assert.assertEquals(3L, (long) tokens.skipTo(2L).get());
+        Assert.assertTrue(tokens.hasNext());
+        Assert.assertEquals(3L, (long) tokens.getCurrent());
+        Assert.assertEquals(3L, (long) tokens.next().get());
+
+        Assert.assertEquals(5L, (long) tokens.skipTo(5L).get());
+        Assert.assertTrue(tokens.hasNext());
+        Assert.assertEquals(5L, (long) tokens.getCurrent());
+        Assert.assertEquals(5L, (long) tokens.next().get());
+
+        LongIterator empty = new LongIterator(new long[0]);
+
+        Assert.assertNull(empty.skipTo(3L));
+        Assert.assertFalse(empty.hasNext());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
 
b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
index 29ad387..9cac007 100644
--- 
a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
+++ 
b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
@@ -190,9 +190,10 @@ public class SSTableFlushObserverTest
         }
 
         @Override
-        public void nextCell(ColumnData cell)
+        public void nextUnfilteredCluster(Unfiltered row)
         {
-            rows.put(currentKey, (Cell) cell);
+            if (row.isRow())
+                ((Row) row).forEach((c) -> rows.put(currentKey, (Cell) c));
         }
 
         @Override

Reply via email to