Repository: cassandra Updated Branches: refs/heads/trunk d0c2ab508 -> 97ada5bdf
Always close RT markers returned by ReadCommand#executeLocally() patch by Aleksey Yeschenko; reviewed by Blake Eggleston and Sam Tunnicliffe for CASSANDRA-14515 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e23c9e4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e23c9e4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e23c9e4 Branch: refs/heads/trunk Commit: 4e23c9e4dba6ee772531d82980f73234bd41869a Parents: eb91942 Author: Aleksey Yeshchenko <[email protected]> Authored: Wed Jun 20 00:01:10 2018 +0100 Committer: Aleksey Yeshchenko <[email protected]> Committed: Wed Jun 20 15:45:54 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ReadCommand.java | 42 +- .../db/rows/UnfilteredRowIterators.java | 26 + .../cassandra/db/transform/RTBoundCloser.java | 110 +++++ .../db/transform/RTBoundValidator.java | 106 ++++ .../apache/cassandra/service/DataResolver.java | 21 +- .../db/transform/RTTransformationsTest.java | 482 +++++++++++++++++++ 7 files changed, 758 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ebf8764..aeeb0ae 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.17 + * Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515) * Reverse order queries with range tombstones can cause data loss (CASSANDRA-14513) * Fix regression of lagging commitlog flush log message (CASSANDRA-14451) * Add Missing dependencies in pom-all (CASSANDRA-14422) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index c93692a..f8a0795 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -33,6 +33,8 @@ import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.RTBoundCloser; +import org.apache.cassandra.db.transform.RTBoundValidator; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.Index; @@ -329,6 +331,10 @@ public abstract class ReadCommand implements ReadQuery public ReadResponse createResponse(UnfilteredPartitionIterator iterator) { + // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both + // ends equal, and there are no dangling RT bound in any partition. + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + return isDigestQuery() ? ReadResponse.createDigestResponse(iterator, this) : ReadResponse.createDataResponse(iterator, this); @@ -401,29 +407,37 @@ public abstract class ReadCommand implements ReadQuery Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name); } - UnfilteredPartitionIterator resultIterator = searcher == null - ? queryStorage(cfs, orderGroup) - : searcher.search(orderGroup); + UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, orderGroup) : searcher.search(orderGroup); try { - resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos); + iterator = withoutPurgeableTombstones(iterator, cfs); + iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos); // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so // no point in checking it again. - RowFilter updatedFilter = searcher == null - ? rowFilter() - : index.getPostIndexQueryFilter(rowFilter()); - - // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However, - // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it - // would be more efficient (the sooner we discard stuff we know we don't care, the less useless - // processing we do on it). - return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition()); + RowFilter filter = (null == searcher) ? rowFilter() : index.getPostIndexQueryFilter(rowFilter()); + + /* + * TODO: We'll currently do filtering by the rowFilter here because it's convenient. However, + * we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it + * would be more efficient (the sooner we discard stuff we know we don't care, the less useless + * processing we do on it). + */ + iterator = filter.filter(iterator, nowInSec()); + + // apply the limits/row counter; this transformation is stopping and would close the iterator as soon + // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included. + iterator = limits().filter(iterator, nowInSec(), selectsFullPartition()); + + // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter. + iterator = Transformation.apply(iterator, new RTBoundCloser()); + + return iterator; } catch (RuntimeException | Error e) { - resultIterator.close(); + iterator.close(); throw e; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 5c27363..f42f675 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -140,6 +140,32 @@ public abstract class UnfilteredRowIterators return EmptyIterators.unfilteredRow(cfm, partitionKey, isReverseOrder, staticRow, partitionDeletion); } + public static UnfilteredRowIterator singleton(Unfiltered unfiltered, + CFMetaData metadata, + DecoratedKey partitionKey, + DeletionTime partitionLevelDeletion, + PartitionColumns columns, + Row staticRow, + boolean isReverseOrder, + EncodingStats encodingStats) + { + return new AbstractUnfilteredRowIterator(metadata, partitionKey, partitionLevelDeletion, columns, staticRow, isReverseOrder, encodingStats) + { + boolean isDone = false; + + protected Unfiltered computeNext() + { + if (!isDone) + { + isDone = true; + return unfiltered; + } + + return endOfData(); + } + }; + } + /** * Digests the partition represented by the provided iterator. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java new file mode 100644 index 0000000..11f0344 --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.ReadOrderGroup; +import org.apache.cassandra.db.rows.*; + +/** + * A transformation that appends an RT bound marker to row iterators in case they don't have one. + * + * This used to happen, for example, in {@link org.apache.cassandra.db.ReadCommand#executeLocally(ReadOrderGroup)}, + * if {@link org.apache.cassandra.db.filter.DataLimits} stopped the iterator on a live row that was enclosed in an + * older RT. + * + * If we don't do this, and send a response without the closing bound, we can break read/short read protection read + * isolation, and potentially cause data loss. + * + * See CASSANDRA-14515 for context. + */ +public final class RTBoundCloser extends Transformation<UnfilteredRowIterator> +{ + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + RowsTransformation transformation = new RowsTransformation(partition); + return Transformation.apply(MoreRows.extend(partition, transformation, partition.columns()), transformation); + } + + private final static class RowsTransformation extends Transformation implements MoreRows<UnfilteredRowIterator> + { + private final UnfilteredRowIterator partition; + + private Clustering lastRowClustering; + private DeletionTime openMarkerDeletionTime; + + private RowsTransformation(UnfilteredRowIterator partition) + { + this.partition = partition; + } + + @Override + public Row applyToRow(Row row) + { + lastRowClustering = row.clustering(); + return row; + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + openMarkerDeletionTime = + marker.isOpen(partition.isReverseOrder()) ? marker.openDeletionTime(partition.isReverseOrder()) : null; + lastRowClustering = null; + return marker; + } + + @Override + public UnfilteredRowIterator moreContents() + { + // there is no open RT in the stream - nothing for us to do + if (null == openMarkerDeletionTime) + return null; + + /* + * there *is* an open RT in the stream, but there have been no rows after the opening bound - this must + * never happen in scenarios where RTBoundCloser is meant to be used; the last encountered clustering + * should be either a closing bound marker - if the iterator was exhausted fully - or a live row - if + * DataLimits stopped it short in the middle of an RT. + */ + if (null == lastRowClustering) + { + CFMetaData metadata = partition.metadata(); + String message = + String.format("UnfilteredRowIterator for %s.%s has an open RT bound as its last item", metadata.ksName, metadata.cfName); + throw new IllegalStateException(message); + } + + // create an artificial inclusive closing RT bound with bound matching last seen row's clustering + RangeTombstoneBoundMarker closingBound = + RangeTombstoneBoundMarker.inclusiveClose(partition.isReverseOrder(), lastRowClustering.getRawValues(), openMarkerDeletionTime); + + return UnfilteredRowIterators.singleton(closingBound, + partition.metadata(), + partition.partitionKey(), + partition.partitionLevelDeletion(), + partition.columns(), + partition.staticRow(), + partition.isReverseOrder(), + partition.stats()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java new file mode 100644 index 0000000..7866b14 --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +/** + * A validating transformation that sanity-checks the sequence of RT bounds and boundaries in every partition. + * + * What we validate, specifically: + * - that open markers are only followed by close markers + * - that open markers and close markers have equal deletion times + * - optionally, that the iterator closes its last RT marker + */ +public final class RTBoundValidator extends Transformation<UnfilteredRowIterator> +{ + private final boolean enforceIsClosed; + + public RTBoundValidator(boolean enforceIsClosed) + { + this.enforceIsClosed = enforceIsClosed; + } + + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + return Transformation.apply(partition, new RowsTransformation(partition.metadata(), partition.isReverseOrder(), enforceIsClosed)); + } + + private final static class RowsTransformation extends Transformation + { + private final CFMetaData metadata; + private final boolean isReverseOrder; + private final boolean enforceIsClosed; + + private DeletionTime openMarkerDeletionTime; + + private RowsTransformation(CFMetaData metadata, boolean isReverseOrder, boolean enforceIsClosed) + { + this.metadata = metadata; + this.isReverseOrder = isReverseOrder; + this.enforceIsClosed = enforceIsClosed; + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + if (null == openMarkerDeletionTime) + { + // there is no open RT in the stream - we are expecting a *_START_BOUND + if (marker.isClose(isReverseOrder)) + throw ise("unexpected end bound or boundary " + marker.toString(metadata)); + } + else + { + // there is an open RT in the stream - we are expecting a *_BOUNDARY or an *_END_BOUND + if (!marker.isClose(isReverseOrder)) + throw ise("start bound followed by another start bound " + marker.toString(metadata)); + + // deletion times of open/close markers must match + DeletionTime deletionTime = marker.closeDeletionTime(isReverseOrder); + if (!deletionTime.equals(openMarkerDeletionTime)) + throw ise("open marker and close marker have different deletion times"); + + openMarkerDeletionTime = null; + } + + if (marker.isOpen(isReverseOrder)) + openMarkerDeletionTime = marker.openDeletionTime(isReverseOrder); + + return marker; + } + + @Override + public void onPartitionClose() + { + if (enforceIsClosed && null != openMarkerDeletionTime) + throw ise("expected all RTs to be closed, but the last one is open"); + } + + private IllegalStateException ise(String why) + { + String message = String.format("UnfilteredRowIterator for %s.%s has an illegal RT bounds sequence: %s", + metadata.ksName, metadata.cfName, why); + throw new IllegalStateException(message); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 2252913..522c57b 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -398,23 +398,12 @@ public class DataResolver extends ResponseResolver if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion)) { /* - * Since there is an ongoing merged deletion, the only two ways we don't have an open repair for - * this source are that: - * - * 1) it had a range open with the same deletion as current marker, and the marker is coming from - * a short read protection response - repeating the open RT bound, or - * 2) it had a range open with the same deletion as current marker, and the marker is closing it. + * Since there is an ongoing merged deletion, the only way we don't have an open repair for + * this source is that it had a range open with the same deletion as current marker, + * and the marker is closing it. */ - if (!marker.isBoundary() && marker.isOpen(isReversed)) // (1) - { - assert currentDeletion.equals(marker.openDeletionTime(isReversed)) - : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); - } - else // (2) - { - assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed)) - : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); - } + assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed)) + : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); // and so unless it's a boundary whose opening deletion time is still equal to the current // deletion (see comment above for why this can actually happen), we have to repair the source http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java new file mode 100644 index 0000000..832c5a3 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.transform; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ClusteringPrefix.Kind; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +public final class RTTransformationsTest +{ + private static final String KEYSPACE = "RTBoundCloserTest"; + private static final String TABLE = "table"; + + private final int nowInSec = FBUtilities.nowInSeconds(); + + private CFMetaData metadata; + private DecoratedKey key; + + @Before + public void setUp() + { + metadata = + CFMetaData.Builder + .create(KEYSPACE, TABLE) + .addPartitionKey("pk", UTF8Type.instance) + .addClusteringColumn("ck0", UTF8Type.instance) + .addClusteringColumn("ck1", UTF8Type.instance) + .addClusteringColumn("ck2", UTF8Type.instance) + .build(); + key = Murmur3Partitioner.instance.decorateKey(bytes("key")); + } + + @Test + public void testAddsNothingWhenAlreadyClosed() + { + UnfilteredPartitionIterator original = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + ); + + UnfilteredPartitionIterator extended = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + ); + extended = Transformation.apply(extended, new RTBoundCloser()); + assertIteratorsEqual(original, extended); + } + + @Test + public void testAddsNothingWhenAlreadyClosedInReverseOrder() + { + UnfilteredPartitionIterator original = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + ); + + UnfilteredPartitionIterator extended = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + ); + extended = Transformation.apply(extended, new RTBoundCloser()); + assertIteratorsEqual(original, extended); + } + + @Test + public void testClosesUnclosedBound() + { + UnfilteredPartitionIterator original = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + ); + UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser()); + + UnfilteredPartitionIterator expected = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_END_BOUND, 0, "a", "1", "") + ); + assertIteratorsEqual(expected, extended); + } + + @Test + public void testClosesUnclosedBoundary() + { + UnfilteredPartitionIterator original = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a") + , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0") + , row(2, "a", "1", "") + ); + UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser()); + + UnfilteredPartitionIterator expected = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a") + , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0") + , row(2, "a", "1", "") + , bound(Kind.INCL_END_BOUND, 1, "a", "1", "") + ); + assertIteratorsEqual(expected, extended); + } + + @Test + public void testClosesUnclosedBoundInReverseOrder() + { + UnfilteredPartitionIterator original = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + ); + UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser()); + + UnfilteredPartitionIterator expected = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_START_BOUND, 0, "a", "1", "") + ); + assertIteratorsEqual(expected, extended); + } + + @Test + public void testClosesUnclosedBoundaryInReverseOrder() + { + UnfilteredPartitionIterator original = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a") + , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1") + , row(2, "a", "0", "") + ); + UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser()); + + UnfilteredPartitionIterator expected = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a") + , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1") + , row(2, "a", "0", "") + , bound(Kind.INCL_START_BOUND, 1, "a", "0", "") + ); + + assertIteratorsEqual(expected, extended); + } + + @Test + public void testFailsWithoutSeeingRows() + { + UnfilteredPartitionIterator iterator = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a") + ); + iterator = Transformation.apply(iterator, new RTBoundCloser()); + assertThrowsISEIterated(iterator); + } + + @Test + public void testValidatesLegalBounds() + { + UnfilteredPartitionIterator iterator = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + + , bound(Kind.INCL_START_BOUND, 0, "a", "2") + , row(1, "a", "2", "") + , bound(Kind.INCL_END_BOUND, 0, "a", "2") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + drain(iterator); + } + + @Test + public void testValidatesLegalBoundsInReverseOrder() + { + UnfilteredPartitionIterator iterator = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a", "2") + , row(1, "a", "2", "") + , bound(Kind.INCL_START_BOUND, 0, "a", "2") + + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + drain(iterator); + } + + @Test + public void testValidatesLegalBoundaries() + { + UnfilteredPartitionIterator iterator = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a") + + , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "1") + , row(2, "a", "1", "") + , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1") + + , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 2, "a", "2") + , row(3, "a", "2", "") + , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 2, 0, "a", "2") + + , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 3, "a", "3") + , row(4, "a", "3", "") + , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 3, 0, "a", "3") + + , bound(Kind.INCL_END_BOUND, 0, "a") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + drain(iterator); + } + + @Test + public void testValidatesLegalBoundariesInReverseOrder() + { + UnfilteredPartitionIterator iterator = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a") + + , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 3, 0, "a", "3") + , row(4, "a", "3", "") + , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 3, "a", "3") + + , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 2, 0, "a", "2") + , row(3, "a", "2", "") + , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 2, "a", "2") + + , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1") + , row(2, "a", "1", "") + , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "1") + + , bound(Kind.INCL_START_BOUND, 0, "a") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + drain(iterator); + } + + @Test + public void testComplainsAboutMismatchedTimestamps() + { + UnfilteredPartitionIterator iterator = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_END_BOUND, 1, "a", "1") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + assertThrowsISEIterated(iterator); + } + + @Test + public void testComplainsAboutMismatchedTimestampsInReverseOrder() + { + UnfilteredPartitionIterator iterator = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_START_BOUND, 1, "a", "1") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + assertThrowsISEIterated(iterator); + } + + @Test + public void testComplainsAboutInvalidSequence() + { + // duplicated start bound + UnfilteredPartitionIterator iterator = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + assertThrowsISEIterated(iterator); + + // duplicated end bound + iterator = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + assertThrowsISEIterated(iterator); + + // absent open bound + iterator = iter(false + , row(1, "a", "1", "") + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + assertThrowsISEIterated(iterator); + + // absent end bound + iterator = iter(false + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + assertThrowsISEIterated(iterator); + } + + @Test + public void testComplainsAboutInvalidSequenceInReveseOrder() + { + // duplicated start bound + UnfilteredPartitionIterator iterator = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + assertThrowsISEIterated(iterator); + + // duplicated end bound + iterator = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + assertThrowsISEIterated(iterator); + + // absent open bound + iterator = iter(true + , bound(Kind.INCL_END_BOUND, 0, "a", "1") + , row(1, "a", "1", "") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + assertThrowsISEIterated(iterator); + + // absent end bound + iterator = iter(true + , row(1, "a", "1", "") + , bound(Kind.INCL_START_BOUND, 0, "a", "1") + ); + iterator = Transformation.apply(iterator, new RTBoundValidator(true)); + assertThrowsISEIterated(iterator); + } + + private RangeTombstoneBoundMarker bound(ClusteringPrefix.Kind kind, long timestamp, Object... clusteringValues) + { + ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length]; + for (int i = 0; i < clusteringValues.length; i++) + clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]); + + return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers), new DeletionTime(timestamp, nowInSec)); + } + + private RangeTombstoneBoundaryMarker boundary(ClusteringPrefix.Kind kind, long closeTimestamp, long openTimestamp, Object... clusteringValues) + { + ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length]; + for (int i = 0; i < clusteringValues.length; i++) + clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]); + + return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers), + new DeletionTime(closeTimestamp, nowInSec), + new DeletionTime(openTimestamp, nowInSec)); + } + + private Row row(long timestamp, Object... clusteringValues) + { + ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length]; + for (int i = 0; i < clusteringValues.length; i++) + clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]); + + return BTreeRow.noCellLiveRow(new Clustering(clusteringByteBuffers), LivenessInfo.create(metadata, timestamp, nowInSec)); + } + + @SuppressWarnings("unchecked") + private static <T> ByteBuffer decompose(AbstractType<?> type, T value) + { + return ((AbstractType<T>) type).decompose(value); + } + + private UnfilteredPartitionIterator iter(boolean isReversedOrder, Unfiltered... unfiltereds) + { + Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds); + + UnfilteredRowIterator rowIter = + new AbstractUnfilteredRowIterator(metadata, + key, + DeletionTime.LIVE, + metadata.partitionColumns(), + Rows.EMPTY_STATIC_ROW, + isReversedOrder, + EncodingStats.NO_STATS) + { + protected Unfiltered computeNext() + { + return iterator.hasNext() ? iterator.next() : endOfData(); + } + }; + + return new SingletonUnfilteredPartitionIterator(rowIter, false); + } + + private void assertIteratorsEqual(UnfilteredPartitionIterator iter1, UnfilteredPartitionIterator iter2) + { + while (iter1.hasNext()) + { + assertTrue(iter2.hasNext()); + + try (UnfilteredRowIterator partition1 = iter1.next()) + { + try (UnfilteredRowIterator partition2 = iter2.next()) + { + assertIteratorsEqual(partition1, partition2); + } + } + } + assertFalse(iter2.hasNext()); + } + + private void assertIteratorsEqual(UnfilteredRowIterator iter1, UnfilteredRowIterator iter2) + { + while (iter1.hasNext()) + { + assertTrue(iter2.hasNext()); + + assertEquals(iter1.next(), iter2.next()); + } + assertFalse(iter2.hasNext()); + } + + private void assertThrowsISEIterated(UnfilteredPartitionIterator iterator) + { + Throwable t = null; + try + { + drain(iterator); + } + catch (Throwable e) + { + t = e; + } + assertTrue(t instanceof IllegalStateException); + } + + private void drain(UnfilteredPartitionIterator iter) + { + while (iter.hasNext()) + { + try (UnfilteredRowIterator partition = iter.next()) + { + while (partition.hasNext()) + partition.next(); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
