Expand range tombstone validation checks to multiple interim request stages
patch by Aleksey Yeschenko; reviewed by Blake Eggleston and Sam Tunnicliffe for CASSANDRA-14824 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5e969e9c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5e969e9c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5e969e9c Branch: refs/heads/trunk Commit: 5e969e9cfd7e776dadeb51d1003bbfe79544ca08 Parents: 300fff2 Author: Aleksey Yeshchenko <alek...@apple.com> Authored: Fri Oct 12 14:20:28 2018 +0100 Committer: Aleksey Yeshchenko <alek...@apple.com> Committed: Tue Oct 16 17:01:17 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/PartitionRangeReadCommand.java | 14 +++- .../org/apache/cassandra/db/ReadCommand.java | 11 +-- .../db/SinglePartitionReadCommand.java | 76 ++++++++++++++++---- .../cassandra/db/transform/RTBoundCloser.java | 16 +++++ .../db/transform/RTBoundValidator.java | 27 +++++-- .../db/transform/RTTransformationsTest.java | 48 ++++++------- 7 files changed, 142 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3c6d3b5..6ca14a0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.18 + * Expand range tombstone validation checks to multiple interim request stages (CASSANDRA-14824) * Reverse order reads can return incomplete results (CASSANDRA-14803) * Avoid calling iter.next() in a loop when notifying indexers about range tombstones (CASSANDRA-14794) * Fix purging semi-expired RT boundaries in reversed iterators (CASSANDRA-14672) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 84e3c7d..4f936cc 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -30,6 +30,7 @@ import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.BaseRowIterator; +import org.apache.cassandra.db.transform.RTBoundValidator; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Bounds; @@ -253,8 +254,12 @@ public class PartitionRangeReadCommand extends ReadCommand { @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift()); + + @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method + UnfilteredPartitionIterator iterator = isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter; + iterators.add(RTBoundValidator.validate(iterator, RTBoundValidator.Stage.MEMTABLE, false)); + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime()); - iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter); } SSTableReadsListener readCountUpdater = newReadCountUpdater(); @@ -262,7 +267,12 @@ public class PartitionRangeReadCommand extends ReadCommand { @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift(), readCountUpdater); - iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter); + + if (isForThrift()) + iter = ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()); + + iterators.add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false)); + if (!sstable.isRepaired()) oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index f8a0795..0135d1e 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -35,6 +35,7 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.transform.RTBoundCloser; import org.apache.cassandra.db.transform.RTBoundValidator; +import org.apache.cassandra.db.transform.RTBoundValidator.Stage; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.Index; @@ -333,7 +334,7 @@ public abstract class ReadCommand implements ReadQuery { // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both // ends equal, and there are no dangling RT bound in any partition. - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = RTBoundValidator.validate(iterator, Stage.PROCESSED, true); return isDigestQuery() ? ReadResponse.createDigestResponse(iterator, this) @@ -408,10 +409,12 @@ public abstract class ReadCommand implements ReadQuery } UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, orderGroup) : searcher.search(orderGroup); + iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false); try { - iterator = withoutPurgeableTombstones(iterator, cfs); + iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs), Stage.PURGED, false); + iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos); // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so @@ -431,9 +434,7 @@ public abstract class ReadCommand implements ReadQuery iterator = limits().filter(iterator, nowInSec(), selectsFullPartition()); // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter. - iterator = Transformation.apply(iterator, new RTBoundCloser()); - - return iterator; + return RTBoundCloser.close(iterator); } catch (RuntimeException | Error e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 4b10530..4c8e0bc 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -36,6 +36,7 @@ import org.apache.cassandra.db.lifecycle.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.RTBoundValidator; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReadsListener; @@ -686,12 +687,19 @@ public class SinglePartitionReadCommand extends ReadCommand if (partition == null) continue; - @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + @SuppressWarnings("resource") UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition); - @SuppressWarnings("resource") // same as above - UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter; + + if (copyOnHeap) + iter = UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance); + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime); - iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied); + + if (isForThrift()) + iter = ThriftResultsMerger.maybeWrap(iter, nowInSec()); + + iterators.add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false)); } /* * We can't eliminate full sstables based on the timestamp of what we've already read like @@ -733,16 +741,24 @@ public class SinglePartitionReadCommand extends ReadCommand continue; } - @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator - UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), - columnFilter(), - filter.isReversed(), - isForThrift(), - metricsCollector)); if (!sstable.isRepaired()) oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); - iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter); + // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + @SuppressWarnings("resource") + UnfilteredRowIterator iter = filter.filter( + sstable.iterator(partitionKey(), + columnFilter(), + filter.isReversed(), + isForThrift(), + metricsCollector) + ); + + if (isForThrift()) + iter = ThriftResultsMerger.maybeWrap(iter, nowInSec()); + + iterators.add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false)); + mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt()); } @@ -862,7 +878,12 @@ public class SinglePartitionReadCommand extends ReadCommand UnfilteredRowIterator clonedFilter = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter; - result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false); + result = add( + RTBoundValidator.validate(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, RTBoundValidator.Stage.MEMTABLE, false), + result, + filter, + false + ); } } @@ -901,10 +922,29 @@ public class SinglePartitionReadCommand extends ReadCommand metricsCollector))) { if (!iter.partitionLevelDeletion().isLive()) - result = add(UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), Rows.EMPTY_STATIC_ROW, iter.partitionLevelDeletion(), filter.isReversed()), result, filter, sstable.isRepaired()); + { + result = add( + UnfilteredRowIterators.noRowsIterator(iter.metadata(), + iter.partitionKey(), + Rows.EMPTY_STATIC_ROW, + iter.partitionLevelDeletion(), + filter.isReversed()), + result, + filter, + sstable.isRepaired() + ); + } else - result = add(iter, result, filter, sstable.isRepaired()); + { + result = add( + RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false), + result, + filter, + sstable.isRepaired() + ); + } } + continue; } @@ -920,7 +960,13 @@ public class SinglePartitionReadCommand extends ReadCommand if (sstable.isRepaired()) onlyUnrepaired = false; - result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, sstable.isRepaired()); + + result = add( + RTBoundValidator.validate(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, RTBoundValidator.Stage.SSTABLE, false), + result, + filter, + sstable.isRepaired() + ); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java index 11f0344..ee5401d 100644 --- a/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java +++ b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java @@ -21,6 +21,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.ReadOrderGroup; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.*; /** @@ -37,6 +38,21 @@ import org.apache.cassandra.db.rows.*; */ public final class RTBoundCloser extends Transformation<UnfilteredRowIterator> { + private RTBoundCloser() + { + } + + public static UnfilteredPartitionIterator close(UnfilteredPartitionIterator partitions) + { + return Transformation.apply(partitions, new RTBoundCloser()); + } + + public static UnfilteredRowIterator close(UnfilteredRowIterator partition) + { + RowsTransformation transformation = new RowsTransformation(partition); + return Transformation.apply(MoreRows.extend(partition, transformation, partition.columns()), transformation); + } + @Override public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java index 7866b14..1f675cf 100644 --- a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java +++ b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.transform; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.RangeTombstoneMarker; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -32,29 +33,45 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator; */ public final class RTBoundValidator extends Transformation<UnfilteredRowIterator> { + public enum Stage { MEMTABLE, SSTABLE, MERGED, PURGED, PROCESSED } + + private final Stage stage; private final boolean enforceIsClosed; - public RTBoundValidator(boolean enforceIsClosed) + private RTBoundValidator(Stage stage, boolean enforceIsClosed) { + this.stage = stage; this.enforceIsClosed = enforceIsClosed; } + public static UnfilteredPartitionIterator validate(UnfilteredPartitionIterator partitions, Stage stage, boolean enforceIsClosed) + { + return Transformation.apply(partitions, new RTBoundValidator(stage, enforceIsClosed)); + } + + public static UnfilteredRowIterator validate(UnfilteredRowIterator partition, Stage stage, boolean enforceIsClosed) + { + return Transformation.apply(partition, new RowsTransformation(stage, partition.metadata(), partition.isReverseOrder(), enforceIsClosed)); + } + @Override public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) { - return Transformation.apply(partition, new RowsTransformation(partition.metadata(), partition.isReverseOrder(), enforceIsClosed)); + return Transformation.apply(partition, new RowsTransformation(stage, partition.metadata(), partition.isReverseOrder(), enforceIsClosed)); } private final static class RowsTransformation extends Transformation { + private final Stage stage; private final CFMetaData metadata; private final boolean isReverseOrder; private final boolean enforceIsClosed; private DeletionTime openMarkerDeletionTime; - private RowsTransformation(CFMetaData metadata, boolean isReverseOrder, boolean enforceIsClosed) + private RowsTransformation(Stage stage, CFMetaData metadata, boolean isReverseOrder, boolean enforceIsClosed) { + this.stage = stage; this.metadata = metadata; this.isReverseOrder = isReverseOrder; this.enforceIsClosed = enforceIsClosed; @@ -98,8 +115,8 @@ public final class RTBoundValidator extends Transformation<UnfilteredRowIterator private IllegalStateException ise(String why) { - String message = String.format("UnfilteredRowIterator for %s.%s has an illegal RT bounds sequence: %s", - metadata.ksName, metadata.cfName, why); + String message = String.format("%s UnfilteredRowIterator for %s.%s has an illegal RT bounds sequence: %s", + stage, metadata.ksName, metadata.cfName, why); throw new IllegalStateException(message); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e969e9c/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java index 832c5a3..f79b9f3 100644 --- a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java +++ b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java @@ -33,13 +33,16 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.RTBoundValidator.Stage; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.db.transform.RTBoundCloser.close; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.apache.cassandra.db.transform.RTBoundValidator.validate; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; public final class RTTransformationsTest @@ -80,8 +83,7 @@ public final class RTTransformationsTest , row(1, "a", "1", "") , bound(Kind.INCL_END_BOUND, 0, "a", "1") ); - extended = Transformation.apply(extended, new RTBoundCloser()); - assertIteratorsEqual(original, extended); + assertIteratorsEqual(original, close(extended)); } @Test @@ -98,8 +100,7 @@ public final class RTTransformationsTest , row(1, "a", "1", "") , bound(Kind.INCL_START_BOUND, 0, "a", "1") ); - extended = Transformation.apply(extended, new RTBoundCloser()); - assertIteratorsEqual(original, extended); + assertIteratorsEqual(original, close(extended)); } @Test @@ -109,7 +110,7 @@ public final class RTTransformationsTest , bound(Kind.INCL_START_BOUND, 0, "a", "1") , row(1, "a", "1", "") ); - UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser()); + UnfilteredPartitionIterator extended = close(original); UnfilteredPartitionIterator expected = iter(false , bound(Kind.INCL_START_BOUND, 0, "a", "1") @@ -127,7 +128,7 @@ public final class RTTransformationsTest , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0") , row(2, "a", "1", "") ); - UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser()); + UnfilteredPartitionIterator extended = close(original); UnfilteredPartitionIterator expected = iter(false , bound(Kind.INCL_START_BOUND, 0, "a") @@ -145,7 +146,7 @@ public final class RTTransformationsTest , bound(Kind.INCL_END_BOUND, 0, "a", "1") , row(1, "a", "1", "") ); - UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser()); + UnfilteredPartitionIterator extended = close(original); UnfilteredPartitionIterator expected = iter(true , bound(Kind.INCL_END_BOUND, 0, "a", "1") @@ -163,7 +164,7 @@ public final class RTTransformationsTest , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1") , row(2, "a", "0", "") ); - UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser()); + UnfilteredPartitionIterator extended = close(original); UnfilteredPartitionIterator expected = iter(true , bound(Kind.INCL_END_BOUND, 0, "a") @@ -181,8 +182,7 @@ public final class RTTransformationsTest UnfilteredPartitionIterator iterator = iter(false , bound(Kind.INCL_START_BOUND, 0, "a") ); - iterator = Transformation.apply(iterator, new RTBoundCloser()); - assertThrowsISEIterated(iterator); + assertThrowsISEIterated(close(iterator)); } @Test @@ -197,7 +197,7 @@ public final class RTTransformationsTest , row(1, "a", "2", "") , bound(Kind.INCL_END_BOUND, 0, "a", "2") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); drain(iterator); } @@ -213,7 +213,7 @@ public final class RTTransformationsTest , row(1, "a", "1", "") , bound(Kind.INCL_START_BOUND, 0, "a", "1") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); drain(iterator); } @@ -237,7 +237,7 @@ public final class RTTransformationsTest , bound(Kind.INCL_END_BOUND, 0, "a") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); drain(iterator); } @@ -261,7 +261,7 @@ public final class RTTransformationsTest , bound(Kind.INCL_START_BOUND, 0, "a") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); drain(iterator); } @@ -273,7 +273,7 @@ public final class RTTransformationsTest , row(1, "a", "1", "") , bound(Kind.INCL_END_BOUND, 1, "a", "1") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); assertThrowsISEIterated(iterator); } @@ -285,7 +285,7 @@ public final class RTTransformationsTest , row(1, "a", "1", "") , bound(Kind.INCL_START_BOUND, 1, "a", "1") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); assertThrowsISEIterated(iterator); } @@ -299,7 +299,7 @@ public final class RTTransformationsTest , row(1, "a", "1", "") , bound(Kind.INCL_END_BOUND, 0, "a", "1") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); assertThrowsISEIterated(iterator); // duplicated end bound @@ -309,7 +309,7 @@ public final class RTTransformationsTest , bound(Kind.INCL_END_BOUND, 0, "a", "1") , bound(Kind.INCL_END_BOUND, 0, "a", "1") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); assertThrowsISEIterated(iterator); // absent open bound @@ -317,7 +317,7 @@ public final class RTTransformationsTest , row(1, "a", "1", "") , bound(Kind.INCL_END_BOUND, 0, "a", "1") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); assertThrowsISEIterated(iterator); // absent end bound @@ -325,7 +325,7 @@ public final class RTTransformationsTest , bound(Kind.INCL_START_BOUND, 0, "a", "1") , row(1, "a", "1", "") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); assertThrowsISEIterated(iterator); } @@ -339,7 +339,7 @@ public final class RTTransformationsTest , bound(Kind.INCL_START_BOUND, 0, "a", "1") , bound(Kind.INCL_START_BOUND, 0, "a", "1") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); assertThrowsISEIterated(iterator); // duplicated end bound @@ -349,7 +349,7 @@ public final class RTTransformationsTest , row(1, "a", "1", "") , bound(Kind.INCL_START_BOUND, 0, "a", "1") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); assertThrowsISEIterated(iterator); // absent open bound @@ -357,7 +357,7 @@ public final class RTTransformationsTest , bound(Kind.INCL_END_BOUND, 0, "a", "1") , row(1, "a", "1", "") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); assertThrowsISEIterated(iterator); // absent end bound @@ -365,7 +365,7 @@ public final class RTTransformationsTest , row(1, "a", "1", "") , bound(Kind.INCL_START_BOUND, 0, "a", "1") ); - iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + iterator = validate(iterator, Stage.PROCESSED, true); assertThrowsISEIterated(iterator); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org