Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 2400d07bf -> 7251c9559 refs/heads/cassandra-3.11 ab640b212 -> 29db25116 refs/heads/trunk 86964da69 -> f48a319ac
Make concat work with iterators that have different subsets of columns Patch by Alex Petrov; reviewed by Sylvain Lebresne for CASSANDRA-13482. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7251c955 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7251c955 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7251c955 Branch: refs/heads/cassandra-3.0 Commit: 7251c9559805d83423ca5ddbe4f955ce668c3d9a Parents: 2400d07 Author: Alex Petrov <[email protected]> Authored: Thu Jun 8 16:59:24 2017 +0200 Committer: Alex Petrov <[email protected]> Committed: Wed Jul 12 15:44:06 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/SinglePartitionReadCommand.java | 30 ++- .../db/rows/UnfilteredRowIterators.java | 3 +- .../cassandra/db/transform/BaseIterator.java | 13 +- .../cassandra/db/transform/BasePartitions.java | 2 +- .../apache/cassandra/db/transform/BaseRows.java | 4 +- .../apache/cassandra/db/transform/MoreRows.java | 6 + .../db/transform/StoppingTransformation.java | 31 ++-- .../cassandra/db/transform/Transformation.java | 24 +++ .../cassandra/db/transform/UnfilteredRows.java | 13 ++ .../apache/cassandra/db/RowCacheCQLTest.java | 70 +++++++ .../db/rows/UnfilteredRowIteratorsTest.java | 185 +++++++++++++++++++ 12 files changed, 362 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ce2324d..bf36769 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Make concat work with iterators that have different subsets of columns (CASSANDRA-13482) * Set test.runners based on cores and memory size (CASSANDRA-13078) * Allow different NUMACTL_ARGS to be passed in (CASSANDRA-13557) * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 72b4465..b4211bb 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -429,13 +429,39 @@ public class SinglePartitionReadCommand extends ReadCommand try { - int rowsToCache = metadata().params.caching.rowsPerPartitionToCache(); + final int rowsToCache = metadata().params.caching.rowsPerPartitionToCache(); + @SuppressWarnings("resource") // we close on exception or upon closing the result of this method UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp); try { + // Use a custom iterator instead of DataLimits to avoid stopping the original iterator + UnfilteredRowIterator toCacheIterator = new WrappingUnfilteredRowIterator(iter) + { + private int rowsCounted = 0; + + @Override + public boolean hasNext() + { + return rowsCounted < rowsToCache && super.hasNext(); + } + + @Override + public Unfiltered next() + { + Unfiltered unfiltered = super.next(); + if (unfiltered.isRow()) + { + Row row = (Row) unfiltered; + if (row.hasLiveData(nowInSec())) + rowsCounted++; + } + return unfiltered; + } + }; + // We want to cache only rowsToCache rows - CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec()); + CachedPartition toCache = CachedBTreePartition.create(toCacheIterator, nowInSec()); if (sentinelSuccess && !toCache.isEmpty()) { Tracing.trace("Caching {} rows", toCache.rowCount()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 43653a9..5c27363 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -194,7 +194,6 @@ public abstract class UnfilteredRowIterators && iter1.partitionKey().equals(iter2.partitionKey()) && iter1.partitionLevelDeletion().equals(iter2.partitionLevelDeletion()) && iter1.isReverseOrder() == iter2.isReverseOrder() - && iter1.columns().equals(iter2.columns()) && iter1.staticRow().equals(iter2.staticRow()); class Extend implements MoreRows<UnfilteredRowIterator> @@ -209,7 +208,7 @@ public abstract class UnfilteredRowIterators } } - return MoreRows.extend(iter1, new Extend()); + return MoreRows.extend(iter1, new Extend(), iter1.columns().mergeTo(iter2.columns())); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/BaseIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/BaseIterator.java b/src/java/org/apache/cassandra/db/transform/BaseIterator.java index dd928eb..d00e406 100644 --- a/src/java/org/apache/cassandra/db/transform/BaseIterator.java +++ b/src/java/org/apache/cassandra/db/transform/BaseIterator.java @@ -33,7 +33,15 @@ abstract class BaseIterator<V, I extends CloseableIterator<? extends V>, O exten { I input; V next; - Stop stop; // applies at the end of the current next() + + // We require two stop signals for correctness, since the `stop` reference of the base iterator can "leak" + // into the transformations stack. Using a single `stop` signal may result into the inconsistent state, + // since stopping transformation would stop only the child iterator. + + // Signals that the base iterator has been signalled to stop. Applies at the end of the current next(). + Stop stop; + // Signals that the current child iterator has been signalled to stop. + Stop stopChild; static class Stop { @@ -49,12 +57,14 @@ abstract class BaseIterator<V, I extends CloseableIterator<? extends V>, O exten this.input = copyFrom.input; this.next = copyFrom.next; this.stop = copyFrom.stop; + this.stopChild = copyFrom.stopChild; } BaseIterator(I input) { this.input = input; this.stop = new Stop(); + this.stopChild = this.stop; } /** @@ -122,6 +132,7 @@ abstract class BaseIterator<V, I extends CloseableIterator<? extends V>, O exten BaseIterator abstr = (BaseIterator) newContents; prefix = abstr; input = (I) abstr.input; + stopChild = abstr.stop; next = apply((V) abstr.next, holder.length); // must apply all remaining functions to the next, if any } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/BasePartitions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/BasePartitions.java b/src/java/org/apache/cassandra/db/transform/BasePartitions.java index 026a39d..2f76452 100644 --- a/src/java/org/apache/cassandra/db/transform/BasePartitions.java +++ b/src/java/org/apache/cassandra/db/transform/BasePartitions.java @@ -102,7 +102,7 @@ implements BasePartitionIterator<R> } } - if (stop.isSignalled || !hasMoreContents()) + if (stop.isSignalled || stopChild.isSignalled || !hasMoreContents()) return false; } return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/BaseRows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/BaseRows.java b/src/java/org/apache/cassandra/db/transform/BaseRows.java index 0586840..e6ce1da 100644 --- a/src/java/org/apache/cassandra/db/transform/BaseRows.java +++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java @@ -126,7 +126,7 @@ implements BaseRowIterator<R> Transformation[] fs = stack; int len = length; - while (!stop.isSignalled && input.hasNext()) + while (!stop.isSignalled && !stopChild.isSignalled && input.hasNext()) { Unfiltered next = input.next(); @@ -152,7 +152,7 @@ implements BaseRowIterator<R> } } - if (stop.isSignalled || !hasMoreContents()) + if (stop.isSignalled || stopChild.isSignalled || !hasMoreContents()) return false; } return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/MoreRows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/MoreRows.java b/src/java/org/apache/cassandra/db/transform/MoreRows.java index 786e215..118739b 100644 --- a/src/java/org/apache/cassandra/db/transform/MoreRows.java +++ b/src/java/org/apache/cassandra/db/transform/MoreRows.java @@ -20,6 +20,7 @@ */ package org.apache.cassandra.db.transform; +import org.apache.cassandra.db.PartitionColumns; import org.apache.cassandra.db.rows.BaseRowIterator; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -47,6 +48,11 @@ public interface MoreRows<I extends BaseRowIterator<?>> extends MoreContents<I> return add(mutable(iterator), more); } + public static UnfilteredRowIterator extend(UnfilteredRowIterator iterator, MoreRows<? super UnfilteredRowIterator> more, PartitionColumns columns) + { + return add(Transformation.wrapIterator(iterator, columns), more); + } + public static RowIterator extend(RowIterator iterator, MoreRows<? super RowIterator> more) { return add(mutable(iterator), more); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java b/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java index 534091e..79563e9 100644 --- a/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java +++ b/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java @@ -26,8 +26,8 @@ import org.apache.cassandra.db.rows.BaseRowIterator; // A Transformation that can stop an iterator earlier than its natural exhaustion public abstract class StoppingTransformation<I extends BaseRowIterator<?>> extends Transformation<I> { - private BaseIterator.Stop stop; - private BaseIterator.Stop stopInPartition; + private BaseIterator rows; + private BaseIterator partitions; /** * If invoked by a subclass, any partitions iterator this transformation has been applied to will terminate @@ -36,8 +36,12 @@ public abstract class StoppingTransformation<I extends BaseRowIterator<?>> exten @DontInline protected void stop() { - if (stop != null) - stop.isSignalled = true; + if (partitions != null) + { + partitions.stop.isSignalled = true; + partitions.stopChild.isSignalled = true; + } + stopInPartition(); } @@ -48,33 +52,36 @@ public abstract class StoppingTransformation<I extends BaseRowIterator<?>> exten @DontInline protected void stopInPartition() { - if (stopInPartition != null) - stopInPartition.isSignalled = true; + if (rows != null) + { + rows.stop.isSignalled = true; + rows.stopChild.isSignalled = true; + } } @Override protected void attachTo(BasePartitions partitions) { - assert this.stop == null; - this.stop = partitions.stop; + assert this.partitions == null; + this.partitions = partitions; } @Override protected void attachTo(BaseRows rows) { - assert this.stopInPartition == null; - this.stopInPartition = rows.stop; + assert this.rows == null; + this.rows = rows; } @Override protected void onClose() { - stop = null; + partitions = null; } @Override protected void onPartitionClose() { - stopInPartition = null; + rows = null; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/Transformation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/Transformation.java b/src/java/org/apache/cassandra/db/transform/Transformation.java index 6a31ece..06dd057 100644 --- a/src/java/org/apache/cassandra/db/transform/Transformation.java +++ b/src/java/org/apache/cassandra/db/transform/Transformation.java @@ -21,6 +21,7 @@ package org.apache.cassandra.db.transform; import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.PartitionColumns; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.*; @@ -151,6 +152,29 @@ public abstract class Transformation<I extends BaseRowIterator<?>> : new FilteredRows(iterator); } + /** + * Even though this method is sumilar to `mutable`, it supresses the optimisation of avoiding creating an additional + * wrapping interator object (which both creates an extra object and grows the call stack during the iteration), it + * should be used with caution. + * + * It is useful in cases when the input has to be checked for more contents rather than directly checking if it + * is stopped. For example, when concatenating two iterators (pseudocode): + * + * iter1 = [row(1), row(2), row(3)] + * iter2 = [row(4), row(5), row(6)] + * + * UnfilteredRowIterators.concat(DataLimits.cqlLimits(1).filter(iter1), DataLimits.cqlLimits(1).filter(iter1)) + * + * Which should yield two rows: [row(1), row(4)]. + * + * Using stacked transformations instead of wrapping would result into returning a single row, since the first + * iterator will signal the iterator is stopped. + */ + static UnfilteredRows wrapIterator(UnfilteredRowIterator iterator, PartitionColumns columns) + { + return new UnfilteredRows(iterator, columns); + } + static <E extends BaseIterator> E add(E to, Transformation add) { to.add(add); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java index f000fcf..c631f2e 100644 --- a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java +++ b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java @@ -21,20 +21,33 @@ package org.apache.cassandra.db.transform; import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.PartitionColumns; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> implements UnfilteredRowIterator { + private PartitionColumns columns; private DeletionTime partitionLevelDeletion; public UnfilteredRows(UnfilteredRowIterator input) { + this(input, input.columns()); + } + + public UnfilteredRows(UnfilteredRowIterator input, PartitionColumns columns) + { super(input); + this.columns = columns; partitionLevelDeletion = input.partitionLevelDeletion(); } + public PartitionColumns columns() + { + return columns; + } + @Override void add(Transformation add) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/test/unit/org/apache/cassandra/db/RowCacheCQLTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowCacheCQLTest.java b/test/unit/org/apache/cassandra/db/RowCacheCQLTest.java index a3c0e25..a8f7e3d 100644 --- a/test/unit/org/apache/cassandra/db/RowCacheCQLTest.java +++ b/test/unit/org/apache/cassandra/db/RowCacheCQLTest.java @@ -37,4 +37,74 @@ public class RowCacheCQLTest extends CQLTester assertEquals(1, res.size()); assertEmpty(execute("SELECT * FROM %s WHERE p1 = ? and c1 > ?", 123L, 1000)); } + + /** + * Test for CASSANDRA-13482 + */ + @Test + public void testPartialCache() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck1 int, v1 int, v2 int, primary key (pk, ck1))" + + "WITH CACHING = { 'keys': 'ALL', 'rows_per_partition': '1' }"); + assertEmpty(execute("select * from %s where pk = 10000")); + + execute("DELETE FROM %s WHERE pk = 1 AND ck1 = 0"); + execute("DELETE FROM %s WHERE pk = 1 AND ck1 = 1"); + execute("DELETE FROM %s WHERE pk = 1 AND ck1 = 2"); + execute("INSERT INTO %s (pk, ck1, v1, v2) VALUES (1, 1, 1, 1)"); + execute("INSERT INTO %s (pk, ck1, v1, v2) VALUES (1, 2, 2, 2)"); + execute("INSERT INTO %s (pk, ck1, v1, v2) VALUES (1, 3, 3, 3)"); + execute("DELETE FROM %s WHERE pk = 1 AND ck1 = 2"); + execute("DELETE FROM %s WHERE pk = 1 AND ck1 = 3"); + execute("INSERT INTO %s (pk, ck1, v1, v2) VALUES (1, 4, 4, 4)"); + execute("INSERT INTO %s (pk, ck1, v1, v2) VALUES (1, 5, 5, 5)"); + + assertRows(execute("select * from %s where pk = 1"), + row(1, 1, 1, 1), + row(1, 4, 4, 4), + row(1, 5, 5, 5)); + assertRows(execute("select * from %s where pk = 1 LIMIT 1"), + row(1, 1, 1, 1)); + + assertRows(execute("select * from %s where pk = 1 and ck1 >=2"), + row(1, 4, 4, 4), + row(1, 5, 5, 5)); + assertRows(execute("select * from %s where pk = 1 and ck1 >=2 LIMIT 1"), + row(1, 4, 4, 4)); + + assertRows(execute("select * from %s where pk = 1 and ck1 >=2"), + row(1, 4, 4, 4), + row(1, 5, 5, 5)); + assertRows(execute("select * from %s where pk = 1 and ck1 >=2 LIMIT 1"), + row(1, 4, 4, 4)); + } + + @Test + public void testPartialCacheWithStatic() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck1 int, s int static, v1 int, primary key (pk, ck1))" + + "WITH CACHING = { 'keys': 'ALL', 'rows_per_partition': '1' }"); + assertEmpty(execute("select * from %s where pk = 10000")); + + execute("INSERT INTO %s (pk, s) VALUES (1, 1)"); + execute("INSERT INTO %s (pk, ck1, v1) VALUES (1, 2, 2)"); + execute("INSERT INTO %s (pk, ck1, v1) VALUES (1, 3, 3)"); + + execute("DELETE FROM %s WHERE pk = 2 AND ck1 = 0"); + execute("DELETE FROM %s WHERE pk = 2 AND ck1 = 1"); + execute("DELETE FROM %s WHERE pk = 3 AND ck1 = 2"); + execute("INSERT INTO %s (pk, s) VALUES (2, 2)"); + execute("INSERT INTO %s (pk, ck1, v1) VALUES (2, 1, 1)"); + execute("INSERT INTO %s (pk, ck1, v1) VALUES (2, 2, 2)"); + execute("INSERT INTO %s (pk, ck1, v1) VALUES (2, 3, 3)"); + + assertRows(execute("select * from %s WHERE pk = 1"), + row(1, 2, 1, 2), + row(1, 3, 1, 3)); + + assertRows(execute("select * from %s WHERE pk = 2"), + row(2, 1, 2, 1), + row(2, 2, 2, 2), + row(2, 3, 2, 3)); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsTest.java new file mode 100644 index 0000000..43b9549 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.rows; + +import java.util.Arrays; +import java.util.Iterator; + +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.EmptyIterators; +import org.apache.cassandra.db.PartitionColumns; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +public class UnfilteredRowIteratorsTest +{ + static final CFMetaData metadata; + static final ColumnDefinition v1Metadata; + static final ColumnDefinition v2Metadata; + + static + { + metadata = CFMetaData.Builder.create("", "") + .addPartitionKey("pk", Int32Type.instance) + .addClusteringColumn("ck", Int32Type.instance) + .addRegularColumn("v1", Int32Type.instance) + .addRegularColumn("v2", Int32Type.instance) + .build(); + v1Metadata = metadata.partitionColumns().columns(false).getSimple(0); + v2Metadata = metadata.partitionColumns().columns(false).getSimple(1); + } + + + @Test + public void concatTest() + { + UnfilteredRowIterator iter1, iter2, iter3, concat; + // simple concatenation + iter1 = rows(metadata.partitionColumns(), 1, + row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)), + row(2, cell(v1Metadata, 2), cell(v2Metadata, 2))); + iter2 = rows(metadata.partitionColumns(), 1, + row(3, cell(v1Metadata, 3), cell(v2Metadata, 3)), + row(4, cell(v1Metadata, 4), cell(v2Metadata, 4))); + concat = UnfilteredRowIterators.concat(iter1, iter2); + Assert.assertEquals(concat.columns(), metadata.partitionColumns()); + assertRows(concat, + row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)), + row(2, cell(v1Metadata, 2), cell(v2Metadata, 2)), + row(3, cell(v1Metadata, 3), cell(v2Metadata, 3)), + row(4, cell(v1Metadata, 4), cell(v2Metadata, 4))); + + // concat with RHS empty iterator + iter1 = rows(metadata.partitionColumns(), 1, + row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)), + row(2, cell(v1Metadata, 2), cell(v2Metadata, 2))); + Assert.assertEquals(concat.columns(), metadata.partitionColumns()); + assertRows(UnfilteredRowIterators.concat(iter1, EmptyIterators.unfilteredRow(metadata, dk(1), false, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE)), + row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)), + row(2, cell(v1Metadata, 2), cell(v2Metadata, 2))); + + // concat with LHS empty iterator + iter1 = rows(metadata.partitionColumns(), 1, + row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)), + row(2, cell(v1Metadata, 2), cell(v2Metadata, 2))); + Assert.assertEquals(concat.columns(), metadata.partitionColumns()); + assertRows(UnfilteredRowIterators.concat(EmptyIterators.unfilteredRow(metadata, dk(1), false, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE), iter1), + row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)), + row(2, cell(v1Metadata, 2), cell(v2Metadata, 2))); + + // concat with different columns + iter1 = rows(metadata.partitionColumns().without(v1Metadata), 1, + row(1, cell(v2Metadata, 1)), row(2, cell(v2Metadata, 2))); + iter2 = rows(metadata.partitionColumns().without(v2Metadata), 1, + row(3, cell(v1Metadata, 3)), row(4, cell(v1Metadata, 4))); + concat = UnfilteredRowIterators.concat(iter1, iter2); + Assert.assertEquals(concat.columns(), PartitionColumns.of(v1Metadata).mergeTo(PartitionColumns.of(v2Metadata))); + assertRows(concat, + row(1, cell(v2Metadata, 1)), row(2, cell(v2Metadata, 2)), + row(3, cell(v1Metadata, 3)), row(4, cell(v1Metadata, 4))); + + // concat with CQL limits + iter1 = rows(metadata.partitionColumns(), 1, + row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)), + row(2, cell(v1Metadata, 2), cell(v2Metadata, 2))); + iter2 = rows(metadata.partitionColumns(), 1, + row(3, cell(v1Metadata, 3), cell(v2Metadata, 3)), + row(4, cell(v1Metadata, 4), cell(v2Metadata, 4))); + concat = UnfilteredRowIterators.concat(DataLimits.cqlLimits(1).filter(iter1, FBUtilities.nowInSeconds()), + DataLimits.cqlLimits(1).filter(iter2, FBUtilities.nowInSeconds())); + Assert.assertEquals(concat.columns(), metadata.partitionColumns()); + assertRows(concat, + row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)), + row(3, cell(v1Metadata, 3), cell(v2Metadata, 3))); + + // concat concatenated iterators + iter1 = rows(metadata.partitionColumns(), 1, + row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)), + row(2, cell(v1Metadata, 2), cell(v2Metadata, 2))); + iter2 = rows(metadata.partitionColumns(), 1, + row(3, cell(v1Metadata, 3), cell(v2Metadata, 3)), + row(4, cell(v1Metadata, 4), cell(v2Metadata, 4))); + + concat = UnfilteredRowIterators.concat(DataLimits.cqlLimits(1).filter(iter1, FBUtilities.nowInSeconds()), + DataLimits.cqlLimits(1).filter(iter2, FBUtilities.nowInSeconds())); + + iter3 = rows(metadata.partitionColumns(), 1, + row(4, cell(v1Metadata, 4), cell(v2Metadata, 4)), + row(5, cell(v1Metadata, 5), cell(v2Metadata, 5))); + concat = UnfilteredRowIterators.concat(concat, DataLimits.cqlLimits(1).filter(iter3, FBUtilities.nowInSeconds())); + + Assert.assertEquals(concat.columns(), metadata.partitionColumns()); + assertRows(concat, + row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)), + row(3, cell(v1Metadata, 3), cell(v2Metadata, 3)), + row(4, cell(v1Metadata, 4), cell(v2Metadata, 4))); + } + + public static void assertRows(UnfilteredRowIterator iterator, Row... rows) + { + Iterator<Row> rowsIterator = Arrays.asList(rows).iterator(); + + while (iterator.hasNext() && rowsIterator.hasNext()) + Assert.assertEquals(iterator.next(), rowsIterator.next()); + + Assert.assertTrue(iterator.hasNext() == rowsIterator.hasNext()); + } + + public static DecoratedKey dk(int pk) + { + return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(pk), ByteBufferUtil.bytes(pk)); + } + + public static UnfilteredRowIterator rows(PartitionColumns columns, int pk, Row... rows) + { + Iterator<Row> rowsIterator = Arrays.asList(rows).iterator(); + return new AbstractUnfilteredRowIterator(metadata, dk(pk), DeletionTime.LIVE, columns, Rows.EMPTY_STATIC_ROW, false, EncodingStats.NO_STATS) { + protected Unfiltered computeNext() + { + return rowsIterator.hasNext() ? rowsIterator.next() : endOfData(); + } + }; + } + + public Row row(int ck, Cell... columns) + { + BTreeRow.Builder builder = new BTreeRow.Builder(true); + builder.newRow(Util.clustering(metadata.comparator, ck)); + for (Cell cell : columns) + builder.addCell(cell); + return builder.build(); + } + + public Cell cell(ColumnDefinition metadata, int v) + { + return new BufferCell(metadata, + 1L, BufferCell.NO_TTL, BufferCell.NO_DELETION_TIME, ByteBufferUtil.bytes(v), null); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
