9975: Flatten Iterator Transformation Hierarchy To improve clarity of control flow, all iterator transformations are applied via a single class that manages an explicit stack of named transformation objects.
patch by benedict; reviewed by branimir for CASSANDRA-9975 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60949747 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60949747 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60949747 Branch: refs/heads/trunk Commit: 609497471441273367013c09a1e0e1c990726ec7 Parents: a4f32c5 Author: Benedict Elliott Smith <[email protected]> Authored: Thu Jul 30 13:31:42 2015 +0100 Committer: Benedict Elliott Smith <[email protected]> Committed: Mon Oct 26 20:59:06 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/EmptyIterators.java | 214 ++++++++++++ .../cassandra/db/PartitionRangeReadCommand.java | 10 +- .../org/apache/cassandra/db/ReadCommand.java | 119 +++---- src/java/org/apache/cassandra/db/ReadQuery.java | 4 +- .../db/SinglePartitionNamesCommand.java | 2 +- .../db/SinglePartitionSliceCommand.java | 2 +- .../db/compaction/CompactionIterator.java | 17 +- .../db/filter/ClusteringIndexNamesFilter.java | 10 +- .../db/filter/ClusteringIndexSliceFilter.java | 19 +- .../apache/cassandra/db/filter/DataLimits.java | 185 ++++++++--- .../apache/cassandra/db/filter/RowFilter.java | 45 +-- .../AlteringUnfilteredPartitionIterator.java | 72 ---- .../db/partitions/BasePartitionIterator.java | 27 ++ .../partitions/CountingPartitionIterator.java | 58 ---- .../db/partitions/CountingRowIterator.java | 58 ---- .../CountingUnfilteredPartitionIterator.java | 52 --- .../CountingUnfilteredRowIterator.java | 64 ---- .../db/partitions/PartitionIterator.java | 3 +- .../db/partitions/PartitionIterators.java | 102 ++---- .../cassandra/db/partitions/PurgeFunction.java | 120 +++++++ .../db/partitions/PurgingPartitionIterator.java | 156 --------- .../partitions/UnfilteredPartitionIterator.java | 6 +- .../UnfilteredPartitionIterators.java | 125 ++----- .../partitions/WrappingPartitionIterator.java | 50 --- .../WrappingUnfilteredPartitionIterator.java | 126 ------- .../db/rows/AlteringUnfilteredRowIterator.java | 98 ------ .../cassandra/db/rows/BaseRowIterator.java | 64 ++++ .../apache/cassandra/db/rows/RowIterator.java | 32 +- .../apache/cassandra/db/rows/RowIterators.java | 68 +--- .../db/rows/UnfilteredRowIterator.java | 32 +- .../rows/UnfilteredRowIteratorSerializer.java | 2 +- .../db/rows/UnfilteredRowIterators.java | 215 +++--------- .../cassandra/db/rows/WrappingRowIterator.java | 79 ----- .../db/rows/WrappingUnfilteredRowIterator.java | 2 +- .../cassandra/db/transform/BaseIterator.java | 129 ++++++++ .../cassandra/db/transform/BasePartitions.java | 100 ++++++ .../apache/cassandra/db/transform/BaseRows.java | 139 ++++++++ .../apache/cassandra/db/transform/Filter.java | 56 ++++ .../db/transform/FilteredPartitions.java | 40 +++ .../cassandra/db/transform/FilteredRows.java | 40 +++ .../cassandra/db/transform/MoreContents.java | 8 + .../cassandra/db/transform/MorePartitions.java | 35 ++ .../apache/cassandra/db/transform/MoreRows.java | 36 ++ .../apache/cassandra/db/transform/Stack.java | 81 +++++ .../db/transform/StoppingTransformation.java | 60 ++++ .../cassandra/db/transform/Transformation.java | 145 +++++++++ .../db/transform/UnfilteredPartitions.java | 27 ++ .../cassandra/db/transform/UnfilteredRows.java | 40 +++ .../cassandra/index/SecondaryIndexBuilder.java | 9 +- .../internal/composites/CompositesSearcher.java | 24 +- .../io/sstable/ReducingKeyIterator.java | 5 +- .../io/sstable/format/big/BigTableWriter.java | 29 +- .../apache/cassandra/service/DataResolver.java | 106 +++--- .../apache/cassandra/service/StorageProxy.java | 10 +- .../service/pager/AbstractQueryPager.java | 111 +++---- .../service/pager/MultiPartitionPager.java | 14 +- .../cassandra/service/pager/QueryPager.java | 6 +- .../cassandra/service/pager/QueryPagers.java | 7 +- .../cassandra/thrift/ThriftResultsMerger.java | 26 +- .../cassandra/utils/CloseableIterator.java | 3 +- .../org/apache/cassandra/utils/Throwables.java | 9 +- .../Keyspace1-Standard1-jb-0-Summary.db | Bin 202 -> 162 bytes test/unit/org/apache/cassandra/Util.java | 4 +- .../apache/cassandra/db/TransformerTest.java | 325 +++++++++++++++++++ .../apache/cassandra/repair/ValidatorTest.java | 7 +- .../cassandra/service/DataResolverTest.java | 6 +- 67 files changed, 2201 insertions(+), 1675 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0457917..12f62f7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975) * Remove token generator (CASSANDRA-5261) * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562) * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421) http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/EmptyIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/EmptyIterators.java b/src/java/org/apache/cassandra/db/EmptyIterators.java new file mode 100644 index 0000000..6bf8fff --- /dev/null +++ b/src/java/org/apache/cassandra/db/EmptyIterators.java @@ -0,0 +1,214 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db; + +import java.util.NoSuchElementException; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.partitions.BasePartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.*; + +public class EmptyIterators +{ + + private static class EmptyBasePartitionIterator<R extends BaseRowIterator<?>> implements BasePartitionIterator<R> + { + EmptyBasePartitionIterator() + { + } + + public void close() + { + } + + public boolean hasNext() + { + return false; + } + + public R next() + { + throw new NoSuchElementException(); + } + } + + private static class EmptyUnfilteredPartitionIterator extends EmptyBasePartitionIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator + { + final CFMetaData metadata; + final boolean isForThrift; + + public EmptyUnfilteredPartitionIterator(CFMetaData metadata, boolean isForThrift) + { + this.metadata = metadata; + this.isForThrift = isForThrift; + } + + public boolean isForThrift() + { + return isForThrift; + } + + public CFMetaData metadata() + { + return metadata; + } + } + + private static class EmptyPartitionIterator extends EmptyBasePartitionIterator<RowIterator> implements PartitionIterator + { + public static final EmptyPartitionIterator instance = new EmptyPartitionIterator(); + private EmptyPartitionIterator() + { + super(); + } + } + + private static class EmptyBaseRowIterator<U extends Unfiltered> implements BaseRowIterator<U> + { + final PartitionColumns columns; + final CFMetaData metadata; + final DecoratedKey partitionKey; + final boolean isReverseOrder; + final Row staticRow; + + EmptyBaseRowIterator(PartitionColumns columns, CFMetaData metadata, DecoratedKey partitionKey, boolean isReverseOrder, Row staticRow) + { + this.columns = columns; + this.metadata = metadata; + this.partitionKey = partitionKey; + this.isReverseOrder = isReverseOrder; + this.staticRow = staticRow; + } + + public CFMetaData metadata() + { + return metadata; + } + + public boolean isReverseOrder() + { + return isReverseOrder; + } + + public PartitionColumns columns() + { + return columns; + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public Row staticRow() + { + return staticRow; + } + + public void close() + { + } + + public boolean isEmpty() + { + return staticRow == Rows.EMPTY_STATIC_ROW; + } + + public boolean hasNext() + { + return false; + } + + public U next() + { + throw new NoSuchElementException(); + } + } + + private static class EmptyUnfilteredRowIterator extends EmptyBaseRowIterator<Unfiltered> implements UnfilteredRowIterator + { + final DeletionTime partitionLevelDeletion; + public EmptyUnfilteredRowIterator(PartitionColumns columns, CFMetaData metadata, DecoratedKey partitionKey, + boolean isReverseOrder, Row staticRow, DeletionTime partitionLevelDeletion) + { + super(columns, metadata, partitionKey, isReverseOrder, staticRow); + this.partitionLevelDeletion = partitionLevelDeletion; + } + + public boolean isEmpty() + { + return partitionLevelDeletion == DeletionTime.LIVE && super.isEmpty(); + } + + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public EncodingStats stats() + { + return EncodingStats.NO_STATS; + } + } + + private static class EmptyRowIterator extends EmptyBaseRowIterator<Row> implements RowIterator + { + public EmptyRowIterator(CFMetaData metadata, DecoratedKey partitionKey, boolean isReverseOrder, Row staticRow) + { + super(PartitionColumns.NONE, metadata, partitionKey, isReverseOrder, staticRow); + } + } + + public static UnfilteredPartitionIterator unfilteredPartition(CFMetaData metadata, boolean isForThrift) + { + return new EmptyUnfilteredPartitionIterator(metadata, isForThrift); + } + + public static PartitionIterator partition() + { + return EmptyPartitionIterator.instance; + } + + // this method is the only one that can return a non-empty iterator, but it still has no rows, so it seems cleanest to keep it here + public static UnfilteredRowIterator unfilteredRow(CFMetaData metadata, DecoratedKey partitionKey, boolean isReverseOrder, Row staticRow, DeletionTime partitionDeletion) + { + PartitionColumns columns = PartitionColumns.NONE; + if (!staticRow.isEmpty()) + columns = new PartitionColumns(Columns.from(staticRow.columns()), Columns.NONE); + else + staticRow = Rows.EMPTY_STATIC_ROW; + + if (partitionDeletion.isLive()) + partitionDeletion = DeletionTime.LIVE; + + return new EmptyUnfilteredRowIterator(columns, metadata, partitionKey, isReverseOrder, staticRow, partitionDeletion); + } + + public static UnfilteredRowIterator unfilteredRow(CFMetaData metadata, DecoratedKey partitionKey, boolean isReverseOrder) + { + return new EmptyUnfilteredRowIterator(PartitionColumns.NONE, metadata, partitionKey, isReverseOrder, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE); + } + + public static RowIterator row(CFMetaData metadata, DecoratedKey partitionKey, boolean isReverseOrder) + { + return new EmptyRowIterator(metadata, partitionKey, isReverseOrder, Rows.EMPTY_STATIC_ROW); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index f17f3e3..9fce15e 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -30,7 +30,8 @@ import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.partitions.*; -import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.BaseRowIterator; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.index.Index; @@ -226,10 +227,10 @@ public class PartitionRangeReadCommand extends ReadCommand private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs) { - return new WrappingUnfilteredPartitionIterator(iter) + class CacheFilter extends Transformation { @Override - public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) + public BaseRowIterator applyToPartition(BaseRowIterator iter) { // Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done // (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage. @@ -249,7 +250,8 @@ public class PartitionRangeReadCommand extends ReadCommand return iter; } - }; + } + return Transformation.apply(iter, new CacheFilter()); } public MessageOut<ReadCommand> createMessage(int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 3b1f8a8..4d9b65b 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -30,6 +30,7 @@ import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.Index; import org.apache.cassandra.io.IVersionedSerializer; @@ -383,7 +384,7 @@ public abstract class ReadCommand implements ReadQuery */ private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos) { - return new WrappingUnfilteredPartitionIterator(iter) + class MetricRecording extends Transformation<UnfilteredRowIterator> { private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold(); private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold(); @@ -396,78 +397,71 @@ public abstract class ReadCommand implements ReadQuery private DecoratedKey currentKey; @Override - public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) { currentKey = iter.partitionKey(); + return Transformation.apply(iter, this); + } - return new AlteringUnfilteredRowIterator(iter) - { - @Override - protected Row computeNextStatic(Row row) - { - return computeNext(row); - } - - @Override - protected Row computeNext(Row row) - { - if (row.hasLiveData(ReadCommand.this.nowInSec())) - ++liveRows; - - for (Cell cell : row.cells()) - { - if (!cell.isLive(ReadCommand.this.nowInSec())) - countTombstone(row.clustering()); - } - return row; - } - - @Override - protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) - { - countTombstone(marker.clustering()); - return marker; - } - - private void countTombstone(ClusteringPrefix clustering) - { - ++tombstones; - if (tombstones > failureThreshold && respectTombstoneThresholds) - { - String query = ReadCommand.this.toCQLString(); - Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query); - throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering); - } - } - }; + @Override + public Row applyToStatic(Row row) + { + return applyToRow(row); } @Override - public void close() + public Row applyToRow(Row row) { - try + if (row.hasLiveData(ReadCommand.this.nowInSec())) + ++liveRows; + + for (Cell cell : row.cells()) { - super.close(); + if (!cell.isLive(ReadCommand.this.nowInSec())) + countTombstone(row.clustering()); } - finally + return row; + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + countTombstone(marker.clustering()); + return marker; + } + + private void countTombstone(ClusteringPrefix clustering) + { + ++tombstones; + if (tombstones > failureThreshold && respectTombstoneThresholds) { - recordLatency(metric, System.nanoTime() - startTimeNanos); + String query = ReadCommand.this.toCQLString(); + Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query); + throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering); + } + } - metric.tombstoneScannedHistogram.update(tombstones); - metric.liveScannedHistogram.update(liveRows); + @Override + public void onClose() + { + recordLatency(metric, System.nanoTime() - startTimeNanos); - boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds; - if (warnTombstones) - { - String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString()); - ClientWarn.warn(msg); - logger.warn(msg); - } + metric.tombstoneScannedHistogram.update(tombstones); + metric.liveScannedHistogram.update(liveRows); - Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "")); + boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds; + if (warnTombstones) + { + String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString()); + ClientWarn.warn(msg); + logger.warn(msg); } + + Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "")); } }; + + return Transformation.apply(iter, new MetricRecording()); } /** @@ -482,13 +476,20 @@ public abstract class ReadCommand implements ReadQuery // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs) { - return new PurgingPartitionIterator(iterator, cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()) + final boolean isForThrift = iterator.isForThrift(); + class WithoutPurgeableTombstones extends PurgeFunction { + public WithoutPurgeableTombstones() + { + super(isForThrift, cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); + } + protected long getMaxPurgeableTimestamp() { return Long.MAX_VALUE; } - }; + } + return Transformation.apply(iterator, new WithoutPurgeableTombstones()); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/ReadQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java index d1f5272..178ca7c 100644 --- a/src/java/org/apache/cassandra/db/ReadQuery.java +++ b/src/java/org/apache/cassandra/db/ReadQuery.java @@ -42,12 +42,12 @@ public interface ReadQuery public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException { - return PartitionIterators.EMPTY; + return EmptyIterators.partition(); } public PartitionIterator executeInternal(ReadOrderGroup orderGroup) { - return PartitionIterators.EMPTY; + return EmptyIterators.partition(); } public DataLimits limits() http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java index 430e4a1..763919e 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java @@ -153,7 +153,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus cfs.metric.updateSSTableIterated(sstablesIterated); if (result == null || result.isEmpty()) - return UnfilteredRowIterators.emptyIterator(metadata(), partitionKey(), false); + return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false); DecoratedKey key = result.partitionKey(); cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java index 27aab62..f4e7af1 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java @@ -235,7 +235,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus cfs.metric.updateSSTableIterated(sstablesIterated); if (iterators.isEmpty()) - return UnfilteredRowIterators.emptyIterator(cfs.metadata, partitionKey(), filter.isReversed()); + return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed()); Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated); http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index fe18c04..8a3b24b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -25,10 +25,11 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.partitions.PurgingPartitionIterator; +import org.apache.cassandra.db.partitions.PurgeFunction; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.index.transactions.CompactionTransaction; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.metrics.CompactionMetrics; @@ -98,9 +99,11 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte if (metrics != null) metrics.beginCompaction(this); - this.compacted = scanners.isEmpty() - ? UnfilteredPartitionIterators.empty(controller.cfs.metadata) - : new PurgeIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller); + UnfilteredPartitionIterator merged = scanners.isEmpty() + ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false) + : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()); + boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug + this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller)); } public boolean isForThrift() @@ -251,7 +254,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte return this.getCompactionInfo().toString(); } - private class PurgeIterator extends PurgingPartitionIterator + private class Purger extends PurgeFunction { private final CompactionController controller; @@ -261,9 +264,9 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte private long compactedUnfiltered; - private PurgeIterator(UnfilteredPartitionIterator toPurge, CompactionController controller) + private Purger(boolean isForThrift, CompactionController controller) { - super(toPurge, controller.gcBefore, controller.compactingRepaired() ? Integer.MIN_VALUE : Integer.MAX_VALUE, controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); + super(isForThrift, controller.gcBefore, controller.compactingRepaired() ? Integer.MIN_VALUE : Integer.MAX_VALUE, controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); this.controller = controller; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java index d3a289a..388cd50 100644 --- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java @@ -25,6 +25,7 @@ import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -107,20 +108,21 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter { // Note that we don't filter markers because that's a bit trickier (we don't know in advance until when // the range extend) and it's harmless to left them. - return new AlteringUnfilteredRowIterator(iterator) + class FilterNotIndexed extends Transformation { @Override - public Row computeNextStatic(Row row) + public Row applyToStatic(Row row) { return columnFilter.fetchedColumns().statics.isEmpty() ? null : row.filter(columnFilter, iterator.metadata()); } @Override - public Row computeNext(Row row) + public Row applyToRow(Row row) { return clusterings.contains(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null; } - }; + } + return Transformation.apply(iterator, new FilterNotIndexed()); } public UnfilteredRowIterator filter(final SliceableUnfilteredRowIterator iter) http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java index b2d529c..7a174ee 100644 --- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java @@ -26,6 +26,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.CachedPartition; import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -91,26 +92,26 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter // Note that we don't filter markers because that's a bit trickier (we don't know in advance until when // the range extend) and it's harmless to leave them. - return new AlteringUnfilteredRowIterator(iterator) + class FilterNotIndexed extends Transformation { - @Override - public boolean hasNext() + public boolean isDoneForPartition() { - return !tester.isDone() && super.hasNext(); + return tester.isDone(); } @Override - public Row computeNextStatic(Row row) + public Row applyToRow(Row row) { - return columnFilter.fetchedColumns().statics.isEmpty() ? null : row.filter(columnFilter, iterator.metadata()); + return tester.includes(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null; } @Override - public Row computeNext(Row row) + public Row applyToStatic(Row row) { - return tester.includes(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null; + return columnFilter.fetchedColumns().statics.isEmpty() ? Rows.EMPTY_STATIC_ROW : row.filter(columnFilter, iterator.metadata()); } - }; + } + return Transformation.apply(iterator, new FilterNotIndexed()); } public UnfilteredRowIterator filter(SliceableUnfilteredRowIterator iterator) http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java index d5eefe3..130c6ba 100644 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@ -23,6 +23,10 @@ import java.nio.ByteBuffer; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.transform.BasePartitions; +import org.apache.cassandra.db.transform.BaseRows; +import org.apache.cassandra.db.transform.StoppingTransformation; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; @@ -125,17 +129,17 @@ public abstract class DataLimits public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) { - return new CountingUnfilteredPartitionIterator(iter, newCounter(nowInSec, false)); + return this.newCounter(nowInSec, false).applyTo(iter); } public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec) { - return new CountingUnfilteredRowIterator(iter, newCounter(nowInSec, false)); + return this.newCounter(nowInSec, false).applyTo(iter); } public PartitionIterator filter(PartitionIterator iter, int nowInSec) { - return new CountingPartitionIterator(iter, this, nowInSec); + return this.newCounter(nowInSec, true).applyTo(iter); } /** @@ -144,11 +148,36 @@ public abstract class DataLimits */ public abstract float estimateTotalResults(ColumnFamilyStore cfs); - public interface Counter + public static abstract class Counter extends StoppingTransformation<BaseRowIterator<?>> { - public void newPartition(DecoratedKey partitionKey, Row staticRow); - public void newRow(Row row); - public void endOfPartition(); + // false means we do not propagate our stop signals onto the iterator, we only count + private boolean enforceLimits = true; + + public Counter onlyCount() + { + this.enforceLimits = false; + return this; + } + + public PartitionIterator applyTo(PartitionIterator partitions) + { + return Transformation.apply(partitions, this); + } + + public UnfilteredPartitionIterator applyTo(UnfilteredPartitionIterator partitions) + { + return Transformation.apply(partitions, this); + } + + public UnfilteredRowIterator applyTo(UnfilteredRowIterator partition) + { + return (UnfilteredRowIterator) applyToPartition(partition); + } + + public RowIterator applyTo(RowIterator partition) + { + return (RowIterator) applyToPartition(partition); + } /** * The number of results counted. @@ -157,12 +186,40 @@ public abstract class DataLimits * * @return the number of results counted. */ - public int counted(); + public abstract int counted(); + public abstract int countedInCurrentPartition(); - public int countedInCurrentPartition(); + public abstract boolean isDone(); + public abstract boolean isDoneForPartition(); - public boolean isDone(); - public boolean isDoneForPartition(); + @Override + protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition) + { + return partition instanceof UnfilteredRowIterator ? Transformation.apply((UnfilteredRowIterator) partition, this) + : Transformation.apply((RowIterator) partition, this); + } + + // called before we process a given partition + protected abstract void applyToPartition(DecoratedKey partitionKey, Row staticRow); + + @Override + protected void attachTo(BasePartitions partitions) + { + if (enforceLimits) + super.attachTo(partitions); + if (isDone()) + stop(); + } + + @Override + protected void attachTo(BaseRows rows) + { + if (enforceLimits) + super.attachTo(rows); + applyToPartition(rows.partitionKey(), rows.staticRow()); + if (isDoneForPartition()) + stopInPartition(); + } } /** @@ -241,13 +298,15 @@ public abstract class DataLimits return false; // Otherwise, we need to re-count + + DataLimits.Counter counter = newCounter(nowInSec, false); try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false); - CountingUnfilteredRowIterator iter = new CountingUnfilteredRowIterator(cacheIter, newCounter(nowInSec, false))) + UnfilteredRowIterator iter = counter.applyTo(cacheIter)) { // Consume the iterator until we've counted enough - while (iter.hasNext() && !iter.counter().isDone()) + while (iter.hasNext()) iter.next(); - return iter.counter().isDone(); + return counter.isDone(); } } @@ -274,7 +333,7 @@ public abstract class DataLimits return rowsPerPartition * (cfs.estimateKeys()); } - protected class CQLCounter implements Counter + protected class CQLCounter extends Counter { protected final int nowInSec; protected final boolean assumeLiveData; @@ -290,23 +349,39 @@ public abstract class DataLimits this.assumeLiveData = assumeLiveData; } - public void newPartition(DecoratedKey partitionKey, Row staticRow) + @Override + public void applyToPartition(DecoratedKey partitionKey, Row staticRow) { rowInCurrentPartition = 0; if (!staticRow.isEmpty() && (assumeLiveData || staticRow.hasLiveData(nowInSec))) hasLiveStaticRow = true; } - public void endOfPartition() + @Override + public Row applyToRow(Row row) + { + if (assumeLiveData || row.hasLiveData(nowInSec)) + incrementRowCount(); + return row; + } + + @Override + public void onPartitionClose() { // Normally, we don't count static rows as from a CQL point of view, it will be merge with other // rows in the partition. However, if we only have the static row, it will be returned as one row // so count it. if (hasLiveStaticRow && rowInCurrentPartition == 0) - { - ++rowCounted; - ++rowInCurrentPartition; - } + incrementRowCount(); + super.onPartitionClose(); + } + + private void incrementRowCount() + { + if (++rowCounted >= rowLimit) + stop(); + if (++rowInCurrentPartition >= perPartitionLimit) + stopInPartition(); } public int counted() @@ -328,15 +403,6 @@ public abstract class DataLimits { return isDone() || rowInCurrentPartition >= perPartitionLimit; } - - public void newRow(Row row) - { - if (assumeLiveData || row.hasLiveData(nowInSec)) - { - ++rowCounted; - ++rowInCurrentPartition; - } - } } @Override @@ -402,7 +468,7 @@ public abstract class DataLimits } @Override - public void newPartition(DecoratedKey partitionKey, Row staticRow) + public void applyToPartition(DecoratedKey partitionKey, Row staticRow) { if (partitionKey.getKey().equals(lastReturnedKey)) { @@ -415,7 +481,7 @@ public abstract class DataLimits } else { - super.newPartition(partitionKey, staticRow); + super.applyToPartition(partitionKey, staticRow); } } } @@ -481,13 +547,14 @@ public abstract class DataLimits return false; // Otherwise, we need to re-count + DataLimits.Counter counter = newCounter(nowInSec, false); try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false); - CountingUnfilteredRowIterator iter = new CountingUnfilteredRowIterator(cacheIter, newCounter(nowInSec, false))) + UnfilteredRowIterator iter = counter.applyTo(cacheIter)) { // Consume the iterator until we've counted enough - while (iter.hasNext() && !iter.counter().isDone()) + while (iter.hasNext()) iter.next(); - return iter.counter().isDone(); + return counter.isDone(); } } @@ -513,7 +580,7 @@ public abstract class DataLimits return cellsPerPartition * cfs.estimateKeys(); } - protected class ThriftCounter implements Counter + protected class ThriftCounter extends Counter { protected final int nowInSec; protected final boolean assumeLiveData; @@ -528,16 +595,35 @@ public abstract class DataLimits this.assumeLiveData = assumeLiveData; } - public void newPartition(DecoratedKey partitionKey, Row staticRow) + @Override + public void applyToPartition(DecoratedKey partitionKey, Row staticRow) { cellsInCurrentPartition = 0; if (!staticRow.isEmpty()) - newRow(staticRow); + applyToRow(staticRow); } - public void endOfPartition() + @Override + public Row applyToRow(Row row) { - ++partitionsCounted; + for (Cell cell : row.cells()) + { + if (assumeLiveData || cell.isLive(nowInSec)) + { + ++cellsCounted; + if (++cellsInCurrentPartition >= cellPerPartitionLimit) + stopInPartition(); + } + } + return row; + } + + @Override + public void onPartitionClose() + { + if (++partitionsCounted >= partitionLimit) + stop(); + super.onPartitionClose(); } public int counted() @@ -559,18 +645,6 @@ public abstract class DataLimits { return isDone() || cellsInCurrentPartition >= cellPerPartitionLimit; } - - public void newRow(Row row) - { - for (Cell cell : row.cells()) - { - if (assumeLiveData || cell.isLive(nowInSec)) - { - ++cellsCounted; - ++cellsInCurrentPartition; - } - } - } } @Override @@ -625,14 +699,17 @@ public abstract class DataLimits super(nowInSec, assumeLiveData); } - public void newRow(Row row) + @Override + public Row applyToRow(Row row) { // In the internal format, a row == a super column, so that's what we want to count. if (assumeLiveData || row.hasLiveData(nowInSec)) { ++cellsCounted; - ++cellsInCurrentPartition; + if (++cellsInCurrentPartition >= cellPerPartitionLimit) + stopInPartition(); } + return row; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index 0ff30af..09dc342 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.util.*; import com.google.common.base.Objects; -import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; @@ -31,6 +30,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -222,28 +222,29 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> if (expressions.isEmpty()) return iter; - return new AlteringUnfilteredPartitionIterator(iter) + class IsSatisfiedFilter extends Transformation<UnfilteredRowIterator> { - protected Row computeNext(DecoratedKey partitionKey, Row row) + DecoratedKey pk; + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) { - // We filter tombstones when passing the row to isSatisfiedBy so that the method doesn't have to bother with them. - Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec); - return purged != null && CQLFilter.this.isSatisfiedBy(partitionKey, purged) ? row : null; + pk = partition.partitionKey(); + return Transformation.apply(partition, this); } - }; - } - /** - * Returns whether the provided row (with it's partition key) satisfies - * this row filter or not (that is, if it satisfies all of its expressions). - */ - private boolean isSatisfiedBy(DecoratedKey partitionKey, Row row) - { - for (Expression e : expressions) - if (!e.isSatisfiedBy(partitionKey, row)) - return false; + public Row applyToRow(Row row) + { + Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec); + if (purged == null) + return null; - return true; + for (Expression e : expressions) + if (!e.isSatisfiedBy(pk, purged)) + return null; + return row; + } + } + + return Transformation.apply(iter, new IsSatisfiedFilter()); } protected RowFilter withNewExpressions(List<Expression> expressions) @@ -264,16 +265,17 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> if (expressions.isEmpty()) return iter; - return new WrappingUnfilteredPartitionIterator(iter) + class IsSatisfiedThriftFilter extends Transformation<UnfilteredRowIterator> { @Override - public UnfilteredRowIterator computeNext(final UnfilteredRowIterator iter) + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) { // Thrift does not filter rows, it filters entire partition if any of the expression is not // satisfied, which forces us to materialize the result (in theory we could materialize only // what we need which might or might not be everything, but we keep it simple since in practice // it's not worth that it has ever been). ImmutableBTreePartition result = ImmutableBTreePartition.create(iter); + iter.close(); // The partition needs to have a row for every expression, and the expression needs to be valid. for (Expression expr : expressions) @@ -286,7 +288,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> // If we get there, it means all expressions where satisfied, so return the original result return result.unfilteredIterator(); } - }; + } + return Transformation.apply(iter, new IsSatisfiedThriftFilter()); } protected RowFilter withNewExpressions(List<Expression> expressions) http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java deleted file mode 100644 index f7d7222..0000000 --- a/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.rows.*; - -/** - * A partition iterator that allows to filter/modify the unfiltered from the - * underlying iterators. - */ -public abstract class AlteringUnfilteredPartitionIterator extends WrappingUnfilteredPartitionIterator -{ - protected AlteringUnfilteredPartitionIterator(UnfilteredPartitionIterator wrapped) - { - super(wrapped); - } - - protected Row computeNextStatic(DecoratedKey partitionKey, Row row) - { - return row; - } - - protected Row computeNext(DecoratedKey partitionKey, Row row) - { - return row; - } - - protected RangeTombstoneMarker computeNext(DecoratedKey partitionKey, RangeTombstoneMarker marker) - { - return marker; - } - - @Override - protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) - { - final DecoratedKey partitionKey = iter.partitionKey(); - return new AlteringUnfilteredRowIterator(iter) - { - protected Row computeNextStatic(Row row) - { - return AlteringUnfilteredPartitionIterator.this.computeNextStatic(partitionKey, row); - } - - protected Row computeNext(Row row) - { - return AlteringUnfilteredPartitionIterator.this.computeNext(partitionKey, row); - } - - protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) - { - return AlteringUnfilteredPartitionIterator.this.computeNext(partitionKey, marker); - } - }; - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/BasePartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/BasePartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/BasePartitionIterator.java new file mode 100644 index 0000000..214f416 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/BasePartitionIterator.java @@ -0,0 +1,27 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.rows.BaseRowIterator; +import org.apache.cassandra.utils.CloseableIterator; + +public interface BasePartitionIterator<I extends BaseRowIterator<?>> extends CloseableIterator<I> +{ + public void close(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java deleted file mode 100644 index 16445e7..0000000 --- a/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.filter.DataLimits; - -public class CountingPartitionIterator extends WrappingPartitionIterator -{ - protected final DataLimits.Counter counter; - - public CountingPartitionIterator(PartitionIterator result, DataLimits.Counter counter) - { - super(result); - this.counter = counter; - } - - public CountingPartitionIterator(PartitionIterator result, DataLimits limits, int nowInSec) - { - this(result, limits.newCounter(nowInSec, true)); - } - - public DataLimits.Counter counter() - { - return counter; - } - - @Override - public boolean hasNext() - { - if (counter.isDone()) - return false; - - return super.hasNext(); - } - - @Override - @SuppressWarnings("resource") // Close through the closing of the returned 'CountingRowIterator' (and CountingRowIterator shouldn't throw) - public RowIterator next() - { - return new CountingRowIterator(super.next(), counter); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java deleted file mode 100644 index 4ad321e..0000000 --- a/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.filter.DataLimits; - -public class CountingRowIterator extends WrappingRowIterator -{ - protected final DataLimits.Counter counter; - - public CountingRowIterator(RowIterator iter, DataLimits.Counter counter) - { - super(iter); - this.counter = counter; - - counter.newPartition(iter.partitionKey(), iter.staticRow()); - } - - @Override - public boolean hasNext() - { - if (counter.isDoneForPartition()) - return false; - - return super.hasNext(); - } - - @Override - public Row next() - { - Row row = super.next(); - counter.newRow(row); - return row; - } - - @Override - public void close() - { - super.close(); - counter.endOfPartition(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java deleted file mode 100644 index 52eedd4..0000000 --- a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.filter.DataLimits; - -public class CountingUnfilteredPartitionIterator extends WrappingUnfilteredPartitionIterator -{ - protected final DataLimits.Counter counter; - - public CountingUnfilteredPartitionIterator(UnfilteredPartitionIterator result, DataLimits.Counter counter) - { - super(result); - this.counter = counter; - } - - public DataLimits.Counter counter() - { - return counter; - } - - @Override - public boolean hasNext() - { - if (counter.isDone()) - return false; - - return super.hasNext(); - } - - @Override - public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) - { - return new CountingUnfilteredRowIterator(iter, counter); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java deleted file mode 100644 index e5d1e75..0000000 --- a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.filter.DataLimits; - -public class CountingUnfilteredRowIterator extends WrappingUnfilteredRowIterator -{ - private final DataLimits.Counter counter; - - public CountingUnfilteredRowIterator(UnfilteredRowIterator iter, DataLimits.Counter counter) - { - super(iter); - this.counter = counter; - - counter.newPartition(iter.partitionKey(), iter.staticRow()); - } - - public DataLimits.Counter counter() - { - return counter; - } - - @Override - public boolean hasNext() - { - if (counter.isDoneForPartition()) - return false; - - return super.hasNext(); - } - - @Override - public Unfiltered next() - { - Unfiltered next = super.next(); - if (next.isRow()) - counter.newRow((Row)next); - return next; - } - - @Override - public void close() - { - super.close(); - counter.endOfPartition(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java index 36358fc..529a9e2 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java @@ -33,7 +33,6 @@ import org.apache.cassandra.db.rows.*; * reference on the returned objects for longer than the iteration, it must * make a copy of it explicitely. */ -public interface PartitionIterator extends Iterator<RowIterator>, AutoCloseable +public interface PartitionIterator extends BasePartitionIterator<RowIterator> { - public void close(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java index eeb6a4b..0b43c19 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java @@ -20,37 +20,18 @@ package org.apache.cassandra.db.partitions; import java.util.*; import java.security.MessageDigest; +import org.apache.cassandra.db.EmptyIterators; +import org.apache.cassandra.db.transform.MorePartitions; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.io.util.FileUtils; public abstract class PartitionIterators { private PartitionIterators() {} - public static final PartitionIterator EMPTY = new PartitionIterator() - { - public boolean hasNext() - { - return false; - } - - public RowIterator next() - { - throw new NoSuchElementException(); - } - - public void remove() - { - } - - public void close() - { - } - }; - @SuppressWarnings("resource") // The created resources are returned right away public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadCommand<?> command) { @@ -58,30 +39,24 @@ public abstract class PartitionIterators // want a RowIterator out of this method, so we return an empty one. RowIterator toReturn = iter.hasNext() ? iter.next() - : RowIterators.emptyIterator(command.metadata(), - command.partitionKey(), - command.clusteringIndexFilter().isReversed()); + : EmptyIterators.row(command.metadata(), + command.partitionKey(), + command.clusteringIndexFilter().isReversed()); // Note that in general, we should wrap the result so that it's close method actually // close the whole PartitionIterator. - return new WrappingRowIterator(toReturn) + class Close extends Transformation { - public void close() + public void onPartitionClose() { - try - { - super.close(); - } - finally - { - // asserting this only now because it bothers UnfilteredPartitionIterators.Serializer (which might be used - // under the provided DataIter) if hasNext() is called before the previously returned iterator hasn't been fully consumed. - assert !iter.hasNext(); - - iter.close(); - } + // asserting this only now because it bothers UnfilteredPartitionIterators.Serializer (which might be used + // under the provided DataIter) if hasNext() is called before the previously returned iterator hasn't been fully consumed. + boolean hadNext = iter.hasNext(); + iter.close(); + assert !hadNext; } - }; + } + return Transformation.apply(toReturn, new Close()); } @SuppressWarnings("resource") // The created resources are returned right away @@ -90,39 +65,17 @@ public abstract class PartitionIterators if (iterators.size() == 1) return iterators.get(0); - return new PartitionIterator() + class Extend implements MorePartitions<PartitionIterator> { - private int idx = 0; - - public boolean hasNext() + int i = 1; + public PartitionIterator moreContents() { - while (idx < iterators.size()) - { - if (iterators.get(idx).hasNext()) - return true; - - ++idx; - } - return false; + if (i >= iterators.size()) + return null; + return iterators.get(i++); } - - public RowIterator next() - { - if (!hasNext()) - throw new NoSuchElementException(); - return iterators.get(idx).next(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - - public void close() - { - FileUtils.closeQuietly(iterators); - } - }; + } + return MorePartitions.extend(iterators.get(0), new Extend()); } public static void digest(PartitionIterator iterator, MessageDigest digest) @@ -162,13 +115,14 @@ public abstract class PartitionIterators @SuppressWarnings("resource") // The created resources are returned right away public static PartitionIterator loggingIterator(PartitionIterator iterator, final String id) { - return new WrappingPartitionIterator(iterator) + class Logger extends Transformation<RowIterator> { - public RowIterator next() + public RowIterator applyToPartition(RowIterator partition) { - return RowIterators.loggingIterator(super.next(), id); + return RowIterators.loggingIterator(partition, id); } - }; + } + return Transformation.apply(iterator, new Logger()); } private static class SingletonPartitionIterator extends AbstractIterator<RowIterator> implements PartitionIterator http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java new file mode 100644 index 0000000..b7b01d6 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; + +public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator> +{ + private final boolean isForThrift; + private final DeletionPurger purger; + private final int gcBefore; + private boolean isReverseOrder; + + public PurgeFunction(boolean isForThrift, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones) + { + this.isForThrift = isForThrift; + this.gcBefore = gcBefore; + this.purger = (timestamp, localDeletionTime) -> + !(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone) + && localDeletionTime < gcBefore + && timestamp < getMaxPurgeableTimestamp(); + } + + protected abstract long getMaxPurgeableTimestamp(); + + // Called at the beginning of each new partition + protected void onNewPartition(DecoratedKey partitionKey) + { + } + + // Called for each partition that had only purged infos and are empty post-purge. + protected void onEmptyPartitionPostPurge(DecoratedKey partitionKey) + { + } + + // Called for every unfiltered. Meant for CompactionIterator to update progress + protected void updateProgress() + { + } + + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + onNewPartition(partition.partitionKey()); + + isReverseOrder = partition.isReverseOrder(); + UnfilteredRowIterator purged = Transformation.apply(partition, this); + if (!isForThrift && purged.isEmpty()) + { + onEmptyPartitionPostPurge(purged.partitionKey()); + purged.close(); + return null; + } + + return purged; + } + + public DeletionTime applyToDeletion(DeletionTime deletionTime) + { + return purger.shouldPurge(deletionTime) ? DeletionTime.LIVE : deletionTime; + } + + public Row applyToStatic(Row row) + { + updateProgress(); + return row.purge(purger, gcBefore); + } + + public Row applyToRow(Row row) + { + updateProgress(); + return row.purge(purger, gcBefore); + } + + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + updateProgress(); + boolean reversed = isReverseOrder; + if (marker.isBoundary()) + { + // We can only skip the whole marker if both deletion time are purgeable. + // If only one of them is, filterTombstoneMarker will deal with it. + RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker; + boolean shouldPurgeClose = purger.shouldPurge(boundary.closeDeletionTime(reversed)); + boolean shouldPurgeOpen = purger.shouldPurge(boundary.openDeletionTime(reversed)); + + if (shouldPurgeClose) + { + if (shouldPurgeOpen) + return null; + + return boundary.createCorrespondingOpenMarker(reversed); + } + + return shouldPurgeOpen + ? boundary.createCorrespondingCloseMarker(reversed) + : marker; + } + else + { + return purger.shouldPurge(((RangeTombstoneBoundMarker)marker).deletionTime()) ? null : marker; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java deleted file mode 100644 index 2093f53..0000000 --- a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.*; - -public abstract class PurgingPartitionIterator extends WrappingUnfilteredPartitionIterator -{ - private final DeletionPurger purger; - private final int gcBefore; - - private UnfilteredRowIterator next; - - public PurgingPartitionIterator(UnfilteredPartitionIterator iterator, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones) - { - super(iterator); - this.gcBefore = gcBefore; - this.purger = new DeletionPurger() - { - public boolean shouldPurge(long timestamp, int localDeletionTime) - { - if (onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone) - return false; - - return timestamp < getMaxPurgeableTimestamp() && localDeletionTime < gcBefore; - } - }; - } - - protected abstract long getMaxPurgeableTimestamp(); - - // Called at the beginning of each new partition - protected void onNewPartition(DecoratedKey partitionKey) - { - } - - // Called for each partition that had only purged infos and are empty post-purge. - protected void onEmptyPartitionPostPurge(DecoratedKey partitionKey) - { - } - - // Called for every unfiltered. Meant for CompactionIterator to update progress - protected void updateProgress() - { - } - - @Override - @SuppressWarnings("resource") // 'purged' closes wrapped 'iterator' - public boolean hasNext() - { - while (next == null && super.hasNext()) - { - UnfilteredRowIterator iterator = super.next(); - onNewPartition(iterator.partitionKey()); - - UnfilteredRowIterator purged = purge(iterator); - if (isForThrift() || !purged.isEmpty()) - { - next = purged; - return true; - } - - onEmptyPartitionPostPurge(purged.partitionKey()); - purged.close(); - } - return next != null; - } - - @Override - public UnfilteredRowIterator next() - { - UnfilteredRowIterator toReturn = next; - next = null; - updateProgress(); - return toReturn; - } - - private UnfilteredRowIterator purge(final UnfilteredRowIterator iter) - { - return new AlteringUnfilteredRowIterator(iter) - { - @Override - public DeletionTime partitionLevelDeletion() - { - DeletionTime dt = iter.partitionLevelDeletion(); - return purger.shouldPurge(dt) ? DeletionTime.LIVE : dt; - } - - @Override - public Row computeNextStatic(Row row) - { - return row.purge(purger, gcBefore); - } - - @Override - public Row computeNext(Row row) - { - return row.purge(purger, gcBefore); - } - - @Override - public RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) - { - boolean reversed = isReverseOrder(); - if (marker.isBoundary()) - { - // We can only skip the whole marker if both deletion time are purgeable. - // If only one of them is, filterTombstoneMarker will deal with it. - RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker; - boolean shouldPurgeClose = purger.shouldPurge(boundary.closeDeletionTime(reversed)); - boolean shouldPurgeOpen = purger.shouldPurge(boundary.openDeletionTime(reversed)); - - if (shouldPurgeClose) - { - if (shouldPurgeOpen) - return null; - - return boundary.createCorrespondingOpenMarker(reversed); - } - - return shouldPurgeOpen - ? boundary.createCorrespondingCloseMarker(reversed) - : marker; - } - else - { - return purger.shouldPurge(((RangeTombstoneBoundMarker)marker).deletionTime()) ? null : marker; - } - } - - @Override - public Unfiltered next() - { - Unfiltered next = super.next(); - updateProgress(); - return next; - } - }; - } -}; http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java index 10989df..201c934 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.db.partitions; -import java.util.Iterator; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -30,7 +28,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator; * reference on the returned objects for longer than the iteration, it must * make a copy of it explicitely. */ -public interface UnfilteredPartitionIterator extends Iterator<UnfilteredRowIterator>, AutoCloseable +public interface UnfilteredPartitionIterator extends BasePartitionIterator<UnfilteredRowIterator> { /** * Whether that partition iterator is for a thrift queries. @@ -44,6 +42,4 @@ public interface UnfilteredPartitionIterator extends Iterator<UnfilteredRowItera public boolean isForThrift(); public CFMetaData metadata(); - - public void close(); }
