http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index f24c29f..2de02f6 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeoutException; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; @@ -30,6 +31,8 @@ import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.MoreRows; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.net.*; import org.apache.cassandra.tracing.Tracing; @@ -67,7 +70,7 @@ public class DataResolver extends ResponseResolver // Even though every responses should honor the limit, we might have more than requested post reconciliation, // so ensure we're respecting the limit. DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true); - return new CountingPartitionIterator(mergeWithShortReadProtection(iters, sources, counter), counter); + return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter)); } private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter) @@ -80,11 +83,11 @@ public class DataResolver extends ResponseResolver // So-called "short reads" stems from nodes returning only a subset of the results they have for a partition due to the limit, // but that subset not being enough post-reconciliation. So if we don't have limit, don't bother. - if (command.limits().isUnlimited()) - return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener); - - for (int i = 0; i < results.size(); i++) - results.set(i, new ShortReadProtectedIterator(sources[i], results.get(i), resultCounter)); + if (!command.limits().isUnlimited()) + { + for (int i = 0; i < results.size(); i++) + results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter))); + } return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener); } @@ -281,78 +284,53 @@ public class DataResolver extends ResponseResolver } } - private class ShortReadProtectedIterator extends CountingUnfilteredPartitionIterator + private class ShortReadProtection extends Transformation<UnfilteredRowIterator> { private final InetAddress source; + private final DataLimits.Counter counter; private final DataLimits.Counter postReconciliationCounter; - private ShortReadProtectedIterator(InetAddress source, UnfilteredPartitionIterator iterator, DataLimits.Counter postReconciliationCounter) + private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter) { - super(iterator, command.limits().newCounter(command.nowInSec(), false)); this.source = source; + this.counter = command.limits().newCounter(command.nowInSec(), false).onlyCount(); this.postReconciliationCounter = postReconciliationCounter; } @Override - public UnfilteredRowIterator next() + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) { - return new ShortReadProtectedRowIterator(super.next()); + partition = Transformation.apply(partition, counter); + // must apply and extend with same protection instance + ShortReadRowProtection protection = new ShortReadRowProtection(partition.metadata(), partition.partitionKey()); + partition = MoreRows.extend(partition, protection); + partition = Transformation.apply(partition, protection); // apply after, so it is retained when we extend (in case we need to reextend) + return partition; } - private class ShortReadProtectedRowIterator extends WrappingUnfilteredRowIterator + private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator> { - private boolean initialReadIsDone; - private UnfilteredRowIterator shortReadContinuation; - private Clustering lastClustering; + final CFMetaData metadata; + final DecoratedKey partitionKey; + Clustering lastClustering; + int lastCount = 0; - ShortReadProtectedRowIterator(UnfilteredRowIterator iter) + private ShortReadRowProtection(CFMetaData metadata, DecoratedKey partitionKey) { - super(iter); - } - - @Override - public boolean hasNext() - { - if (super.hasNext()) - return true; - - initialReadIsDone = true; - - if (shortReadContinuation != null && shortReadContinuation.hasNext()) - return true; - - return checkForShortRead(); + this.metadata = metadata; + this.partitionKey = partitionKey; } @Override - public Unfiltered next() + public Row applyToRow(Row row) { - Unfiltered next = initialReadIsDone ? shortReadContinuation.next() : super.next(); - - if (next.kind() == Unfiltered.Kind.ROW) - lastClustering = ((Row)next).clustering(); - - return next; + lastClustering = row.clustering(); + return row; } @Override - public void close() + public UnfilteredRowIterator moreContents() { - try - { - super.close(); - } - finally - { - if (shortReadContinuation != null) - shortReadContinuation.close(); - } - } - - private boolean checkForShortRead() - { - assert shortReadContinuation == null || !shortReadContinuation.hasNext(); - // We have a short read if the node this is the result of has returned the requested number of // rows for that partition (i.e. it has stopped returning results due to the limit), but some of // those results haven't made it in the final result post-reconciliation due to other nodes @@ -363,8 +341,9 @@ public class DataResolver extends ResponseResolver // Also note that we only get here once all the results for this node have been returned, and so // if the node had returned the requested number but we still get there, it imply some results were // skipped during reconciliation. - if (!counter.isDoneForPartition()) - return false; + if (lastCount == counter.counted() || !counter.isDoneForPartition()) + return null; + lastCount = counter.counted(); assert !postReconciliationCounter.isDoneForPartition(); @@ -378,23 +357,20 @@ public class DataResolver extends ResponseResolver // counting iterator. int n = postReconciliationCounter.countedInCurrentPartition(); int x = counter.countedInCurrentPartition(); - int toQuery = x == 0 - ? n * 2 // We didn't got any answer, so (somewhat randomly) ask for twice as much - : Math.max(((n * n) / x) - n, 1); + int toQuery = Math.max(((n * n) / x) - n, 1); DataLimits retryLimits = command.limits().forShortReadRetry(toQuery); - ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey()); - ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata().comparator, lastClustering, false); + ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); + ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata.comparator, lastClustering, false); SinglePartitionReadCommand<?> cmd = SinglePartitionReadCommand.create(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), retryLimits, - partitionKey(), + partitionKey, retryFilter); - shortReadContinuation = doShortReadRetry(cmd); - return shortReadContinuation.hasNext(); + return doShortReadRetry(cmd); } private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand<?> retryCommand) @@ -402,7 +378,7 @@ public class DataResolver extends ResponseResolver DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1); ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source)); if (StorageProxy.canDoLocalRequest(source)) - StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); else MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index f210951..d7d6c63 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1898,7 +1898,8 @@ public class StorageProxy implements StorageProxyMBean private final ConsistencyLevel consistency; private final long startTime; - private CountingPartitionIterator sentQueryIterator; + private DataLimits.Counter counter; + private PartitionIterator sentQueryIterator; private int concurrencyFactor; // The two following "metric" are maintained to improve the concurrencyFactor @@ -1928,7 +1929,7 @@ public class StorageProxy implements StorageProxyMBean // else, sends the next batch of concurrent queries (after having close the previous iterator) if (sentQueryIterator != null) { - liveReturned += sentQueryIterator.counter().counted(); + liveReturned += counter.counted(); sentQueryIterator.close(); // It's not the first batch of queries and we're not done, so we we can use what has been @@ -1989,7 +1990,7 @@ public class StorageProxy implements StorageProxyMBean return new SingleRangeResponse(handler); } - private CountingPartitionIterator sendNextRequests() + private PartitionIterator sendNextRequests() { List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor); for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++) @@ -2001,7 +2002,8 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size()); // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE. - return new CountingPartitionIterator(PartitionIterators.concat(concurrentQueries), DataLimits.NONE, command.nowInSec()); + counter = DataLimits.NONE.newCounter(command.nowInSec(), true); + return counter.applyTo(PartitionIterators.concat(concurrentQueries)); } public void close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index bdebd43..2599b8d 100644 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@ -17,12 +17,11 @@ */ package org.apache.cassandra.service.pager; -import java.util.NoSuchElementException; - import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.service.ClientState; @@ -61,87 +60,65 @@ abstract class AbstractQueryPager implements QueryPager public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException { if (isExhausted()) - return PartitionIterators.EMPTY; + return EmptyIterators.partition(); pageSize = Math.min(pageSize, remaining); - return new PagerIterator(nextPageReadCommand(pageSize).execute(consistency, clientState), limits.forPaging(pageSize), command.nowInSec()); + Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec()); + return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager); } public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException { if (isExhausted()) - return PartitionIterators.EMPTY; + return EmptyIterators.partition(); pageSize = Math.min(pageSize, remaining); - return new PagerIterator(nextPageReadCommand(pageSize).executeInternal(orderGroup), limits.forPaging(pageSize), command.nowInSec()); + Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec()); + return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(orderGroup), pager); } - private class PagerIterator extends CountingPartitionIterator + private class Pager extends Transformation<RowIterator> { private final DataLimits pageLimits; - + private final DataLimits.Counter counter; private Row lastRow; - private boolean isFirstPartition = true; - private RowIterator nextPartition; - private PagerIterator(PartitionIterator iter, DataLimits pageLimits, int nowInSec) + private Pager(DataLimits pageLimits, int nowInSec) { - super(iter, pageLimits, nowInSec); + this.counter = pageLimits.newCounter(nowInSec, true); this.pageLimits = pageLimits; } @Override - @SuppressWarnings("resource") // iter is closed by closing the result or in close() - public boolean hasNext() + public RowIterator applyToPartition(RowIterator partition) { - while (nextPartition == null && super.hasNext()) + DecoratedKey key = partition.partitionKey(); + if (lastKey == null || !lastKey.equals(key)) + remainingInPartition = limits.perPartitionCount(); + lastKey = key; + + // If this is the first partition of this page, this could be the continuation of a partition we've started + // on the previous page. In which case, we could have the problem that the partition has no more "regular" + // rows (but the page size is such we didn't knew before) but it does has a static row. We should then skip + // the partition as returning it would means to the upper layer that the partition has "only" static columns, + // which is not the case (and we know the static results have been sent on the previous page). + if (isFirstPartition) { - if (nextPartition == null) - nextPartition = super.next(); - - DecoratedKey key = nextPartition.partitionKey(); - if (lastKey == null || !lastKey.equals(key)) - remainingInPartition = limits.perPartitionCount(); - - lastKey = key; - - // If this is the first partition of this page, this could be the continuation of a partition we've started - // on the previous page. In which case, we could have the problem that the partition has no more "regular" - // rows (but the page size is such we didn't knew before) but it does has a static row. We should then skip - // the partition as returning it would means to the upper layer that the partition has "only" static columns, - // which is not the case (and we know the static results have been sent on the previous page). - if (isFirstPartition && isPreviouslyReturnedPartition(key) && !nextPartition.hasNext()) + isFirstPartition = false; + if (isPreviouslyReturnedPartition(key) && !partition.hasNext()) { - nextPartition.close(); - nextPartition = null; + partition.close(); + return null; } - - isFirstPartition = false; } - return nextPartition != null; - } - - @Override - @SuppressWarnings("resource") // iter is closed by closing the result - public RowIterator next() - { - if (!hasNext()) - throw new NoSuchElementException(); - RowIterator toReturn = nextPartition; - nextPartition = null; - - return new RowPagerIterator(toReturn); + return Transformation.apply(counter.applyTo(partition), this); } @Override - public void close() + public void onClose() { - super.close(); - if (nextPartition != null) - nextPartition.close(); - recordLast(lastKey, lastRow); int counted = counter.counted(); @@ -159,28 +136,18 @@ abstract class AbstractQueryPager implements QueryPager exhausted = counted < pageLimits.count(); } - private class RowPagerIterator extends WrappingRowIterator + public Row applyToStatic(Row row) { - RowPagerIterator(RowIterator iter) - { - super(iter); - } - - @Override - public Row staticRow() - { - Row staticRow = super.staticRow(); - if (!staticRow.isEmpty()) - lastRow = staticRow; - return staticRow; - } + if (!row.isEmpty()) + lastRow = row; + return row; + } - @Override - public Row next() - { - lastRow = super.next(); - return lastRow; - } + @Override + public Row applyToRow(Row row) + { + lastRow = row; + return row; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java index 2a214a0..8caa14d 100644 --- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.service.pager; -import java.util.List; - import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.db.*; @@ -125,9 +123,9 @@ public class MultiPartitionPager implements QueryPager { int toQuery = Math.min(remaining, pageSize); PagersIterator iter = new PagersIterator(toQuery, consistency, clientState, null); - CountingPartitionIterator countingIter = new CountingPartitionIterator(iter, limit.forPaging(toQuery), nowInSec); - iter.setCounter(countingIter.counter()); - return countingIter; + DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true); + iter.setCounter(counter); + return counter.applyTo(iter); } @SuppressWarnings("resource") // iter closed via countingIter @@ -135,9 +133,9 @@ public class MultiPartitionPager implements QueryPager { int toQuery = Math.min(remaining, pageSize); PagersIterator iter = new PagersIterator(toQuery, null, null, orderGroup); - CountingPartitionIterator countingIter = new CountingPartitionIterator(iter, limit.forPaging(toQuery), nowInSec); - iter.setCounter(countingIter.counter()); - return countingIter; + DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true); + iter.setCounter(counter); + return counter.applyTo(iter); } private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/pager/QueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java index a69335d..cdf2b97 100644 --- a/src/java/org/apache/cassandra/service/pager/QueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java @@ -18,9 +18,9 @@ package org.apache.cassandra.service.pager; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.EmptyIterators; import org.apache.cassandra.db.ReadOrderGroup; import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.partitions.PartitionIterators; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.service.ClientState; @@ -55,12 +55,12 @@ public interface QueryPager public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException { - return PartitionIterators.EMPTY; + return EmptyIterators.partition(); } public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException { - return PartitionIterators.EMPTY; + return EmptyIterators.partition(); } public boolean isExhausted() http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/pager/QueryPagers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java index eee94e6..02b5de2 100644 --- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java @@ -53,10 +53,11 @@ public class QueryPagers int count = 0; while (!pager.isExhausted()) { - try (CountingPartitionIterator iter = new CountingPartitionIterator(pager.fetchPage(pageSize, consistencyLevel, state), limits, nowInSec)) + try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state)) { - PartitionIterators.consume(iter); - count += iter.counter().counted(); + DataLimits.Counter counter = limits.newCounter(nowInSec, true); + PartitionIterators.consume(counter.applyTo(iter)); + count += counter.counted(); } } return count; http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java index 72e4399..14c0dca 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java +++ b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.utils.AbstractIterator; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -59,13 +60,12 @@ import org.apache.cassandra.db.partitions.*; * "c5": { value : 4 } * "c7": { value : 1 } */ -public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator +public class ThriftResultsMerger extends Transformation<UnfilteredRowIterator> { private final int nowInSec; - private ThriftResultsMerger(UnfilteredPartitionIterator wrapped, int nowInSec) + private ThriftResultsMerger(int nowInSec) { - super(wrapped); this.nowInSec = nowInSec; } @@ -74,7 +74,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator if (!metadata.isStaticCompactTable() && !metadata.isSuper()) return iterator; - return new ThriftResultsMerger(iterator, nowInSec); + return Transformation.apply(iterator, new ThriftResultsMerger(nowInSec)); } public static UnfilteredRowIterator maybeWrap(UnfilteredRowIterator iterator, int nowInSec) @@ -83,14 +83,15 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator return iterator; return iterator.metadata().isSuper() - ? new SuperColumnsPartitionMerger(iterator, nowInSec) + ? Transformation.apply(iterator, new SuperColumnsPartitionMerger(iterator, nowInSec)) : new PartitionMerger(iterator, nowInSec); } - protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) { return iter.metadata().isSuper() - ? new SuperColumnsPartitionMerger(iter, nowInSec) + ? Transformation.apply(iter, new SuperColumnsPartitionMerger(iter, nowInSec)) : new PartitionMerger(iter, nowInSec); } @@ -204,20 +205,19 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator } } - private static class SuperColumnsPartitionMerger extends AlteringUnfilteredRowIterator + private static class SuperColumnsPartitionMerger extends Transformation { private final int nowInSec; private final Row.Builder builder; private final ColumnDefinition superColumnMapColumn; private final AbstractType<?> columnComparator; - private SuperColumnsPartitionMerger(UnfilteredRowIterator results, int nowInSec) + private SuperColumnsPartitionMerger(UnfilteredRowIterator applyTo, int nowInSec) { - super(results); - assert results.metadata().isSuper(); + assert applyTo.metadata().isSuper(); this.nowInSec = nowInSec; - this.superColumnMapColumn = results.metadata().compactValueColumn(); + this.superColumnMapColumn = applyTo.metadata().compactValueColumn(); assert superColumnMapColumn != null && superColumnMapColumn.type instanceof MapType; this.builder = BTreeRow.sortedBuilder(); @@ -225,7 +225,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator } @Override - protected Row computeNext(Row row) + public Row applyToRow(Row row) { PeekingIterator<Cell> staticCells = Iterators.peekingIterator(simpleCellsIterator(row)); if (!staticCells.hasNext()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/utils/CloseableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java b/src/java/org/apache/cassandra/utils/CloseableIterator.java index 7474f3d..a7c4300 100644 --- a/src/java/org/apache/cassandra/utils/CloseableIterator.java +++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.util.Iterator; // so we can instantiate anonymous classes implementing both interfaces -public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable, Closeable +public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { + public void close(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/utils/Throwables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java index 923b723..8ef6a63 100644 --- a/src/java/org/apache/cassandra/utils/Throwables.java +++ b/src/java/org/apache/cassandra/utils/Throwables.java @@ -76,13 +76,18 @@ public final class Throwables @SafeVarargs public static <E extends Exception> void perform(DiscreteAction<? extends E> ... actions) throws E { - perform(Arrays.stream(actions)); + perform(Stream.of(actions)); + } + + public static <E extends Exception> void perform(Stream<? extends DiscreteAction<? extends E>> stream, DiscreteAction<? extends E> ... extra) throws E + { + perform(Stream.concat(stream, Stream.of(extra))); } @SuppressWarnings("unchecked") public static <E extends Exception> void perform(Stream<DiscreteAction<? extends E>> actions) throws E { - Throwable fail = perform(null, actions); + Throwable fail = perform((Throwable) null, actions); if (failIfCanCast(fail, null)) throw (E) fail; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/test/data/legacy-sstables/jb/Keyspace1/Keyspace1-Standard1-jb-0-Summary.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/jb/Keyspace1/Keyspace1-Standard1-jb-0-Summary.db b/test/data/legacy-sstables/jb/Keyspace1/Keyspace1-Standard1-jb-0-Summary.db index 83c68ce..1fbe040 100644 Binary files a/test/data/legacy-sstables/jb/Keyspace1/Keyspace1-Standard1-jb-0-Summary.db and b/test/data/legacy-sstables/jb/Keyspace1/Keyspace1-Standard1-jb-0-Summary.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 5e19d5e..ea0bd9b 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -103,9 +103,9 @@ public class Util return row.getCell(column); } - public static ClusteringPrefix clustering(ClusteringComparator comparator, Object... o) + public static Clustering clustering(ClusteringComparator comparator, Object... o) { - return comparator.make(o).clustering(); + return comparator.make(o); } public static Token token(String key) http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/test/unit/org/apache/cassandra/db/TransformerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/TransformerTest.java b/test/unit/org/apache/cassandra/db/TransformerTest.java new file mode 100644 index 0000000..d56d8cd --- /dev/null +++ b/test/unit/org/apache/cassandra/db/TransformerTest.java @@ -0,0 +1,325 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.FilteredRows; +import org.apache.cassandra.db.transform.MoreRows; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class TransformerTest +{ + + static final CFMetaData metadata = metadata(); + static final DecoratedKey partitionKey = new BufferDecoratedKey(new Murmur3Partitioner.LongToken(0L), ByteBufferUtil.EMPTY_BYTE_BUFFER); + static final Row staticRow = BTreeRow.singleCellRow(Clustering.STATIC_CLUSTERING, new BufferCell(metadata.partitionColumns().columns(true).getSimple(0), 0L, 0, 0, ByteBufferUtil.bytes(-1), null)); + + static CFMetaData metadata() + { + CFMetaData.Builder builder = CFMetaData.Builder.create("", ""); + builder.addPartitionKey("pk", BytesType.instance); + builder.addClusteringColumn("c", Int32Type.instance); + builder.addStaticColumn("s", Int32Type.instance); + builder.addRegularColumn("v", Int32Type.instance); + return builder.build(); + } + + // Mock Data + + static abstract class AbstractBaseRowIterator<U extends Unfiltered> extends AbstractIterator<U> implements BaseRowIterator<U> + { + private final int i; + private boolean returned; + + protected AbstractBaseRowIterator(int i) + { + this.i = i; + } + + protected U computeNext() + { + if (returned) + return endOfData(); + returned = true; + return (U) row(i); + } + + public CFMetaData metadata() + { + return metadata; + } + + public boolean isReverseOrder() + { + return false; + } + + public PartitionColumns columns() + { + return metadata.partitionColumns(); + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public Row staticRow() + { + return staticRow; + } + + public boolean isEmpty() + { + return false; + } + + public void close() + { + } + } + + private static UnfilteredRowIterator unfiltered(int i) + { + class Iter extends AbstractBaseRowIterator<Unfiltered> implements UnfilteredRowIterator + { + protected Iter(int i) + { + super(i); + } + + public DeletionTime partitionLevelDeletion() + { + return DeletionTime.LIVE; + } + + public EncodingStats stats() + { + return EncodingStats.NO_STATS; + } + } + return new Iter(i); + } + + private static RowIterator filtered(int i) + { + class Iter extends AbstractBaseRowIterator<Row> implements RowIterator + { + protected Iter(int i) + { + super(i); + } + } + return new Iter(i); + } + + private static Row row(int i) + { + return BTreeRow.singleCellRow(Util.clustering(metadata.comparator, i), + new BufferCell(metadata.partitionColumns().columns(false).getSimple(0), 1L, BufferCell.NO_TTL, BufferCell.NO_DELETION_TIME, ByteBufferUtil.bytes(i), null)); + } + + // Transformations that check mock data ranges + + private static Transformation expect(int from, int to, List<Check> checks) + { + Expect expect = new Expect(from, to); + checks.add(expect); + return expect; + } + + abstract static class Check extends Transformation + { + public abstract void check(); + } + + static class Expect extends Check + { + final int from, to; + int cur; + boolean closed; + + Expect(int from, int to) + { + this.from = from; + this.to = to; + this.cur = from; + } + + public Row applyToRow(Row row) + { + Assert.assertEquals(cur++, ByteBufferUtil.toInt(row.clustering().get(0))); + return row; + } + + public void onPartitionClose() + { + Assert.assertEquals(to, cur); + closed = true; + } + + public void check() + { + Assert.assertTrue(closed); + } + } + + // Combinations of mock data and checks for an empty, singleton, and extending (sequential) range + + private static enum Filter + { + INIT, APPLY_INNER, APPLY_OUTER, NONE + } + + private static BaseRowIterator<?> empty(Filter filter, List<Check> checks) + { + switch (filter) + { + case INIT: + return Transformation.apply(EmptyIterators.row(metadata, partitionKey, false), expect(0, 0, checks)); + case APPLY_INNER: + return Transformation.apply(FilteredRows.filter(Transformation.apply(EmptyIterators.unfilteredRow(metadata, partitionKey, false), expect(0, 0, checks)), Integer.MAX_VALUE), expect(0, 0, checks)); + case APPLY_OUTER: + case NONE: + return Transformation.apply(EmptyIterators.unfilteredRow(metadata, partitionKey, false), expect(0, 0, checks)); + default: + throw new IllegalStateException(); + } + } + + private static BaseRowIterator<?> singleton(Filter filter, int i, List<Check> checks) + { + switch (filter) + { + case INIT: + return Transformation.apply(filtered(i), expect(i, i + 1, checks)); + case APPLY_INNER: + return FilteredRows.filter(Transformation.apply(unfiltered(i), expect(i, i + 1, checks)), Integer.MAX_VALUE); + case APPLY_OUTER: + case NONE: + return Transformation.apply(unfiltered(i), expect(i, i + 1, checks)); + default: + throw new IllegalStateException(); + } + } + + private static BaseRowIterator<?> extendingIterator(int count, Filter filter, List<Check> checks) + { + class RefillNested extends Expect implements MoreRows<BaseRowIterator<?>> + { + boolean returnedEmpty, returnedSingleton, returnedNested; + RefillNested(int from) + { + super(from, count); + } + + public BaseRowIterator<?> moreContents() + { + // first call return an empty iterator, + // second call return a singleton iterator (with a function that expects to be around to receive just that item) + // third call return a nested version of ourselves, with a function that expects to receive all future values + // fourth call, return null, indicating no more iterators to return + + if (!returnedEmpty) + { + returnedEmpty = true; + return empty(filter, checks); + } + + if (!returnedSingleton) + { + returnedSingleton = true; + return singleton(filter, from, checks); + } + + if (from + 1 >= to) + return null; + + if (!returnedNested) + { + returnedNested = true; + + RefillNested refill = new RefillNested(from + 1); + checks.add(refill); + return refill.applyTo(empty(filter, checks)); + } + + return null; + } + + BaseRowIterator<?> applyTo(BaseRowIterator<?> iter) + { + if (iter instanceof UnfilteredRowIterator) + return Transformation.apply(MoreRows.extend((UnfilteredRowIterator) iter, this), this); + else + return Transformation.apply(MoreRows.extend((RowIterator) iter, this), this); + } + } + + RefillNested refill = new RefillNested(0); + checks.add(refill); + + BaseRowIterator<?> iter = empty(filter, checks); + switch (filter) + { + case APPLY_OUTER: + return FilteredRows.filter((UnfilteredRowIterator) refill.applyTo(iter), Integer.MAX_VALUE); + case APPLY_INNER: + case INIT: + case NONE: + return refill.applyTo(iter); + default: + throw new IllegalStateException(); + } + } + + @Test + public void testRowExtension() + { + for (Filter filter : Filter.values()) + { + List<Check> checks = new ArrayList<>(); + + BaseRowIterator<?> iter = extendingIterator(5, filter, checks); + for (int i = 0 ; i < 5 ; i++) + { + Unfiltered u = iter.next(); + assert u instanceof Row; + Assert.assertEquals(i, ByteBufferUtil.toInt(u.clustering().get(0))); + } + iter.close(); + + for (Check check : checks) + check.check(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index 8fe76c3..14f5707 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -20,8 +20,6 @@ package org.apache.cassandra.repair; import java.net.InetAddress; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import java.util.UUID; import org.junit.After; @@ -32,8 +30,8 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.BufferDecoratedKey; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.EmptyIterators; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -45,7 +43,6 @@ import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.concurrent.SimpleCondition; @@ -126,7 +123,7 @@ public class ValidatorTest // add a row Token mid = partitioner.midpoint(range.left, range.right); - validator.add(UnfilteredRowIterators.emptyIterator(cfs.metadata, new BufferDecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")), false)); + validator.add(EmptyIterators.unfilteredRow(cfs.metadata, new BufferDecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")), false)); validator.complete(); // confirm that the tree was validated http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/test/unit/org/apache/cassandra/service/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java index b60a039..ecffbbd 100644 --- a/test/unit/org/apache/cassandra/service/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java @@ -327,7 +327,7 @@ public class DataResolverTest .add("c2", "v2") .buildUpdate()))); InetAddress peer2 = peer(); - resolver.preprocess(readResponseMessage(peer2, UnfilteredPartitionIterators.empty(cfm))); + resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false))); try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data)) @@ -349,8 +349,8 @@ public class DataResolverTest public void testResolveWithBothEmpty() { DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); - resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.empty(cfm))); - resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.empty(cfm))); + resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm, false))); + resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm, false))); try(PartitionIterator data = resolver.resolve()) {
