Expand range tombstone validation checks to multiple interim request stages

patch by Aleksey Yeschenko; reviewed by Blake Eggleston and Sam
Tunnicliffe for CASSANDRA-14824


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5e969e9c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5e969e9c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5e969e9c

Branch: refs/heads/trunk
Commit: 5e969e9cfd7e776dadeb51d1003bbfe79544ca08
Parents: 300fff2
Author: Aleksey Yeshchenko <alek...@apple.com>
Authored: Fri Oct 12 14:20:28 2018 +0100
Committer: Aleksey Yeshchenko <alek...@apple.com>
Committed: Tue Oct 16 17:01:17 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/PartitionRangeReadCommand.java | 14 +++-
 .../org/apache/cassandra/db/ReadCommand.java    | 11 +--
 .../db/SinglePartitionReadCommand.java          | 76 ++++++++++++++++----
 .../cassandra/db/transform/RTBoundCloser.java   | 16 +++++
 .../db/transform/RTBoundValidator.java          | 27 +++++--
 .../db/transform/RTTransformationsTest.java     | 48 ++++++-------
 7 files changed, 142 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3c6d3b5..6ca14a0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.18
+ * Expand range tombstone validation checks to multiple interim request stages 
(CASSANDRA-14824)
  * Reverse order reads can return incomplete results (CASSANDRA-14803)
  * Avoid calling iter.next() in a loop when notifying indexers about range 
tombstones (CASSANDRA-14794)
  * Fix purging semi-expired RT boundaries in reversed iterators 
(CASSANDRA-14672)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 84e3c7d..4f936cc 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.BaseRowIterator;
+import org.apache.cassandra.db.transform.RTBoundValidator;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
@@ -253,8 +254,12 @@ public class PartitionRangeReadCommand extends ReadCommand
             {
                 @SuppressWarnings("resource") // We close on exception and on 
closing the result returned by this method
                 Memtable.MemtableUnfilteredPartitionIterator iter = 
memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift());
+
+                @SuppressWarnings("resource") // We close on exception and on 
closing the result returned by this method
+                UnfilteredPartitionIterator iterator = isForThrift() ? 
ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter;
+                iterators.add(RTBoundValidator.validate(iterator, 
RTBoundValidator.Stage.MEMTABLE, false));
+
                 oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
-                iterators.add(isForThrift() ? 
ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
             }
 
             SSTableReadsListener readCountUpdater = newReadCountUpdater();
@@ -262,7 +267,12 @@ public class PartitionRangeReadCommand extends ReadCommand
             {
                 @SuppressWarnings("resource") // We close on exception and on 
closing the result returned by this method
                 UnfilteredPartitionIterator iter = 
sstable.getScanner(columnFilter(), dataRange(), isForThrift(), 
readCountUpdater);
-                iterators.add(isForThrift() ? 
ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
+
+                if (isForThrift())
+                    iter = ThriftResultsMerger.maybeWrap(iter, metadata(), 
nowInSec());
+
+                iterators.add(RTBoundValidator.validate(iter, 
RTBoundValidator.Stage.SSTABLE, false));
+
                 if (!sstable.isRepaired())
                     oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index f8a0795..0135d1e 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.RTBoundCloser;
 import org.apache.cassandra.db.transform.RTBoundValidator;
+import org.apache.cassandra.db.transform.RTBoundValidator.Stage;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.index.Index;
@@ -333,7 +334,7 @@ public abstract class ReadCommand implements ReadQuery
     {
         // validate that the sequence of RT markers is correct: open is 
followed by close, deletion times for both
         // ends equal, and there are no dangling RT bound in any partition.
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = RTBoundValidator.validate(iterator, Stage.PROCESSED, true);
 
         return isDigestQuery()
              ? ReadResponse.createDigestResponse(iterator, this)
@@ -408,10 +409,12 @@ public abstract class ReadCommand implements ReadQuery
         }
 
         UnfilteredPartitionIterator iterator = (null == searcher) ? 
queryStorage(cfs, orderGroup) : searcher.search(orderGroup);
+        iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
 
         try
         {
-            iterator = withoutPurgeableTombstones(iterator, cfs);
+            iterator = 
RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs), 
Stage.PURGED, false);
+
             iterator = withMetricsRecording(iterator, cfs.metric, 
startTimeNanos);
 
             // If we've used a 2ndary index, we know the result already 
satisfy the primary expression used, so
@@ -431,9 +434,7 @@ public abstract class ReadCommand implements ReadQuery
             iterator = limits().filter(iterator, nowInSec(), 
selectsFullPartition());
 
             // because of the above, we need to append an aritifical end bound 
if the source iterator was stopped short by a counter.
-            iterator = Transformation.apply(iterator, new RTBoundCloser());
-
-            return iterator;
+            return RTBoundCloser.close(iterator);
         }
         catch (RuntimeException | Error e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 4b10530..4c8e0bc 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.lifecycle.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.RTBoundValidator;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
@@ -686,12 +687,19 @@ public class SinglePartitionReadCommand extends 
ReadCommand
                 if (partition == null)
                     continue;
 
-                @SuppressWarnings("resource") // 'iter' is added to iterators 
which is closed on exception, or through the closing of the final merged 
iterator
+                // 'iter' is added to iterators which is closed on exception, 
or through the closing of the final merged iterator
+                @SuppressWarnings("resource")
                 UnfilteredRowIterator iter = 
filter.getUnfilteredRowIterator(columnFilter(), partition);
-                @SuppressWarnings("resource") // same as above
-                UnfilteredRowIterator maybeCopied = copyOnHeap ? 
UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
+
+                if (copyOnHeap)
+                    iter = UnfilteredRowIterators.cloningIterator(iter, 
HeapAllocator.instance);
+
                 oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
-                iterators.add(isForThrift() ? 
ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
+
+                if (isForThrift())
+                    iter = ThriftResultsMerger.maybeWrap(iter, nowInSec());
+
+                iterators.add(RTBoundValidator.validate(iter, 
RTBoundValidator.Stage.MEMTABLE, false));
             }
             /*
              * We can't eliminate full sstables based on the timestamp of what 
we've already read like
@@ -733,16 +741,24 @@ public class SinglePartitionReadCommand extends 
ReadCommand
                     continue;
                 }
 
-                @SuppressWarnings("resource") // 'iter' is added to iterators 
which is closed on exception, or through the closing of the final merged 
iterator
-                UnfilteredRowIterator iter = 
filter.filter(sstable.iterator(partitionKey(),
-                                                                            
columnFilter(),
-                                                                            
filter.isReversed(),
-                                                                            
isForThrift(),
-                                                                            
metricsCollector));
                 if (!sstable.isRepaired())
                     oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 
-                iterators.add(isForThrift() ? 
ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
+                // 'iter' is added to iterators which is closed on exception, 
or through the closing of the final merged iterator
+                @SuppressWarnings("resource")
+                UnfilteredRowIterator iter = filter.filter(
+                    sstable.iterator(partitionKey(),
+                                     columnFilter(),
+                                     filter.isReversed(),
+                                     isForThrift(),
+                                     metricsCollector)
+                );
+
+                if (isForThrift())
+                    iter = ThriftResultsMerger.maybeWrap(iter, nowInSec());
+
+                iterators.add(RTBoundValidator.validate(iter, 
RTBoundValidator.Stage.SSTABLE, false));
+
                 mostRecentPartitionTombstone = 
Math.max(mostRecentPartitionTombstone, 
iter.partitionLevelDeletion().markedForDeleteAt());
             }
 
@@ -862,7 +878,12 @@ public class SinglePartitionReadCommand extends ReadCommand
                 UnfilteredRowIterator clonedFilter = copyOnHeap
                                                    ? 
UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
                                                    : iter;
-                result = add(isForThrift() ? 
ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, 
filter, false);
+                result = add(
+                    RTBoundValidator.validate(isForThrift() ? 
ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, 
RTBoundValidator.Stage.MEMTABLE, false),
+                    result,
+                    filter,
+                    false
+                );
             }
         }
 
@@ -901,10 +922,29 @@ public class SinglePartitionReadCommand extends 
ReadCommand
                                                                                
  metricsCollector)))
                 {
                     if (!iter.partitionLevelDeletion().isLive())
-                        result = 
add(UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), 
Rows.EMPTY_STATIC_ROW, iter.partitionLevelDeletion(), filter.isReversed()), 
result, filter, sstable.isRepaired());
+                    {
+                        result = add(
+                            
UnfilteredRowIterators.noRowsIterator(iter.metadata(),
+                                                                  
iter.partitionKey(),
+                                                                  
Rows.EMPTY_STATIC_ROW,
+                                                                  
iter.partitionLevelDeletion(),
+                                                                  
filter.isReversed()),
+                            result,
+                            filter,
+                            sstable.isRepaired()
+                        );
+                    }
                     else
-                        result = add(iter, result, filter, 
sstable.isRepaired());
+                    {
+                        result = add(
+                            RTBoundValidator.validate(iter, 
RTBoundValidator.Stage.SSTABLE, false),
+                            result,
+                            filter,
+                            sstable.isRepaired()
+                        );
+                    }
                 }
+
                 continue;
             }
 
@@ -920,7 +960,13 @@ public class SinglePartitionReadCommand extends ReadCommand
 
                 if (sstable.isRepaired())
                     onlyUnrepaired = false;
-                result = add(isForThrift() ? 
ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, 
sstable.isRepaired());
+
+                result = add(
+                    RTBoundValidator.validate(isForThrift() ? 
ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, 
RTBoundValidator.Stage.SSTABLE, false),
+                    result,
+                    filter,
+                    sstable.isRepaired()
+                );
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java 
b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
index 11f0344..ee5401d 100644
--- a/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
+++ b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
@@ -21,6 +21,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
 
 /**
@@ -37,6 +38,21 @@ import org.apache.cassandra.db.rows.*;
  */
 public final class RTBoundCloser extends Transformation<UnfilteredRowIterator>
 {
+    private RTBoundCloser()
+    {
+    }
+
+    public static UnfilteredPartitionIterator 
close(UnfilteredPartitionIterator partitions)
+    {
+        return Transformation.apply(partitions, new RTBoundCloser());
+    }
+
+    public static UnfilteredRowIterator close(UnfilteredRowIterator partition)
+    {
+        RowsTransformation transformation = new RowsTransformation(partition);
+        return Transformation.apply(MoreRows.extend(partition, transformation, 
partition.columns()), transformation);
+    }
+
     @Override
     public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java 
b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
index 7866b14..1f675cf 100644
--- a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
+++ b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.transform;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
@@ -32,29 +33,45 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  */
 public final class RTBoundValidator extends 
Transformation<UnfilteredRowIterator>
 {
+    public enum Stage { MEMTABLE, SSTABLE, MERGED, PURGED, PROCESSED }
+
+    private final Stage stage;
     private final boolean enforceIsClosed;
 
-    public RTBoundValidator(boolean enforceIsClosed)
+    private RTBoundValidator(Stage stage, boolean enforceIsClosed)
     {
+        this.stage = stage;
         this.enforceIsClosed = enforceIsClosed;
     }
 
+    public static UnfilteredPartitionIterator 
validate(UnfilteredPartitionIterator partitions, Stage stage, boolean 
enforceIsClosed)
+    {
+        return Transformation.apply(partitions, new RTBoundValidator(stage, 
enforceIsClosed));
+    }
+
+    public static UnfilteredRowIterator validate(UnfilteredRowIterator 
partition, Stage stage, boolean enforceIsClosed)
+    {
+        return Transformation.apply(partition, new RowsTransformation(stage, 
partition.metadata(), partition.isReverseOrder(), enforceIsClosed));
+    }
+
     @Override
     public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
     {
-        return Transformation.apply(partition, new 
RowsTransformation(partition.metadata(), partition.isReverseOrder(), 
enforceIsClosed));
+        return Transformation.apply(partition, new RowsTransformation(stage, 
partition.metadata(), partition.isReverseOrder(), enforceIsClosed));
     }
 
     private final static class RowsTransformation extends Transformation
     {
+        private final Stage stage;
         private final CFMetaData metadata;
         private final boolean isReverseOrder;
         private final boolean enforceIsClosed;
 
         private DeletionTime openMarkerDeletionTime;
 
-        private RowsTransformation(CFMetaData metadata, boolean 
isReverseOrder, boolean enforceIsClosed)
+        private RowsTransformation(Stage stage, CFMetaData metadata, boolean 
isReverseOrder, boolean enforceIsClosed)
         {
+            this.stage = stage;
             this.metadata = metadata;
             this.isReverseOrder = isReverseOrder;
             this.enforceIsClosed = enforceIsClosed;
@@ -98,8 +115,8 @@ public final class RTBoundValidator extends 
Transformation<UnfilteredRowIterator
 
         private IllegalStateException ise(String why)
         {
-            String message = String.format("UnfilteredRowIterator for %s.%s 
has an illegal RT bounds sequence: %s",
-                                           metadata.ksName, metadata.cfName, 
why);
+            String message = String.format("%s UnfilteredRowIterator for %s.%s 
has an illegal RT bounds sequence: %s",
+                                           stage, metadata.ksName, 
metadata.cfName, why);
             throw new IllegalStateException(message);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java 
b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
index 832c5a3..f79b9f3 100644
--- a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
+++ b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
@@ -33,13 +33,16 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.RTBoundValidator.Stage;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.db.transform.RTBoundCloser.close;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import static org.apache.cassandra.db.transform.RTBoundValidator.validate;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public final class RTTransformationsTest
@@ -80,8 +83,7 @@ public final class RTTransformationsTest
         , row(1, "a", "1", "")
         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
         );
-        extended = Transformation.apply(extended, new RTBoundCloser());
-        assertIteratorsEqual(original, extended);
+        assertIteratorsEqual(original, close(extended));
     }
 
     @Test
@@ -98,8 +100,7 @@ public final class RTTransformationsTest
         , row(1, "a", "1", "")
         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
         );
-        extended = Transformation.apply(extended, new RTBoundCloser());
-        assertIteratorsEqual(original, extended);
+        assertIteratorsEqual(original, close(extended));
     }
 
     @Test
@@ -109,7 +110,7 @@ public final class RTTransformationsTest
         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
         , row(1, "a", "1", "")
         );
-        UnfilteredPartitionIterator extended = Transformation.apply(original, 
new RTBoundCloser());
+        UnfilteredPartitionIterator extended = close(original);
 
         UnfilteredPartitionIterator expected = iter(false
         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
@@ -127,7 +128,7 @@ public final class RTTransformationsTest
         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0")
         , row(2, "a", "1", "")
         );
-        UnfilteredPartitionIterator extended = Transformation.apply(original, 
new RTBoundCloser());
+        UnfilteredPartitionIterator extended = close(original);
 
         UnfilteredPartitionIterator expected = iter(false
         , bound(Kind.INCL_START_BOUND, 0, "a")
@@ -145,7 +146,7 @@ public final class RTTransformationsTest
         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
         , row(1, "a", "1", "")
         );
-        UnfilteredPartitionIterator extended = Transformation.apply(original, 
new RTBoundCloser());
+        UnfilteredPartitionIterator extended = close(original);
 
         UnfilteredPartitionIterator expected = iter(true
         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
@@ -163,7 +164,7 @@ public final class RTTransformationsTest
         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
         , row(2, "a", "0", "")
         );
-        UnfilteredPartitionIterator extended = Transformation.apply(original, 
new RTBoundCloser());
+        UnfilteredPartitionIterator extended = close(original);
 
         UnfilteredPartitionIterator expected = iter(true
         , bound(Kind.INCL_END_BOUND, 0, "a")
@@ -181,8 +182,7 @@ public final class RTTransformationsTest
         UnfilteredPartitionIterator iterator = iter(false
         , bound(Kind.INCL_START_BOUND, 0, "a")
         );
-        iterator = Transformation.apply(iterator, new RTBoundCloser());
-        assertThrowsISEIterated(iterator);
+        assertThrowsISEIterated(close(iterator));
     }
 
     @Test
@@ -197,7 +197,7 @@ public final class RTTransformationsTest
         , row(1, "a", "2", "")
         , bound(Kind.INCL_END_BOUND, 0, "a", "2")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         drain(iterator);
     }
 
@@ -213,7 +213,7 @@ public final class RTTransformationsTest
         , row(1, "a", "1", "")
         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         drain(iterator);
     }
 
@@ -237,7 +237,7 @@ public final class RTTransformationsTest
 
         , bound(Kind.INCL_END_BOUND, 0, "a")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         drain(iterator);
     }
 
@@ -261,7 +261,7 @@ public final class RTTransformationsTest
 
         , bound(Kind.INCL_START_BOUND, 0, "a")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         drain(iterator);
     }
 
@@ -273,7 +273,7 @@ public final class RTTransformationsTest
         , row(1, "a", "1", "")
         , bound(Kind.INCL_END_BOUND, 1, "a", "1")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         assertThrowsISEIterated(iterator);
     }
 
@@ -285,7 +285,7 @@ public final class RTTransformationsTest
         , row(1, "a", "1", "")
         , bound(Kind.INCL_START_BOUND, 1, "a", "1")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         assertThrowsISEIterated(iterator);
     }
 
@@ -299,7 +299,7 @@ public final class RTTransformationsTest
         , row(1, "a", "1", "")
         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         assertThrowsISEIterated(iterator);
 
         // duplicated end bound
@@ -309,7 +309,7 @@ public final class RTTransformationsTest
         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         assertThrowsISEIterated(iterator);
 
         // absent open bound
@@ -317,7 +317,7 @@ public final class RTTransformationsTest
         , row(1, "a", "1", "")
         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         assertThrowsISEIterated(iterator);
 
         // absent end bound
@@ -325,7 +325,7 @@ public final class RTTransformationsTest
         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
         , row(1, "a", "1", "")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         assertThrowsISEIterated(iterator);
     }
 
@@ -339,7 +339,7 @@ public final class RTTransformationsTest
         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         assertThrowsISEIterated(iterator);
 
         // duplicated end bound
@@ -349,7 +349,7 @@ public final class RTTransformationsTest
         , row(1, "a", "1", "")
         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         assertThrowsISEIterated(iterator);
 
         // absent open bound
@@ -357,7 +357,7 @@ public final class RTTransformationsTest
         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
         , row(1, "a", "1", "")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         assertThrowsISEIterated(iterator);
 
         // absent end bound
@@ -365,7 +365,7 @@ public final class RTTransformationsTest
         , row(1, "a", "1", "")
         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
         );
-        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        iterator = validate(iterator, Stage.PROCESSED, true);
         assertThrowsISEIterated(iterator);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to