http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index 900b17a..afa731c 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -22,19 +22,16 @@ import java.io.IOException; import java.security.MessageDigest; import java.util.*; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Lists; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.FilteredPartitions; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.MergeIterator; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.*; /** * Static methods to work with partition iterators. @@ -53,33 +50,6 @@ public abstract class UnfilteredPartitionIterators public void close(); } - - public static UnfilteredPartitionIterator empty(final CFMetaData metadata) - { - return new AbstractUnfilteredPartitionIterator() - { - public boolean isForThrift() - { - return false; - } - - public CFMetaData metadata() - { - return metadata; - } - - public boolean hasNext() - { - return false; - } - - public UnfilteredRowIterator next() - { - throw new NoSuchElementException(); - } - }; - } - @SuppressWarnings("resource") // The created resources are returned right away public static UnfilteredRowIterator getOnlyElement(final UnfilteredPartitionIterator iter, SinglePartitionReadCommand<?> command) { @@ -87,30 +57,24 @@ public abstract class UnfilteredPartitionIterators // want a RowIterator out of this method, so we return an empty one. UnfilteredRowIterator toReturn = iter.hasNext() ? iter.next() - : UnfilteredRowIterators.emptyIterator(command.metadata(), - command.partitionKey(), - command.clusteringIndexFilter().isReversed()); + : EmptyIterators.unfilteredRow(command.metadata(), + command.partitionKey(), + command.clusteringIndexFilter().isReversed()); // Note that in general, we should wrap the result so that it's close method actually // close the whole UnfilteredPartitionIterator. - return new WrappingUnfilteredRowIterator(toReturn) + class Close extends Transformation { - public void close() + public void onPartitionClose() { - try - { - super.close(); - } - finally - { - // asserting this only now because it bothers Serializer if hasNext() is called before - // the previously returned iterator hasn't been fully consumed. - assert !iter.hasNext(); - - iter.close(); - } + // asserting this only now because it bothers Serializer if hasNext() is called before + // the previously returned iterator hasn't been fully consumed. + boolean hadNext = iter.hasNext(); + iter.close(); + assert !hadNext; } - }; + } + return Transformation.apply(toReturn, new Close()); } public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener) @@ -121,55 +85,7 @@ public abstract class UnfilteredPartitionIterators public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec) { - return new PartitionIterator() - { - private RowIterator next; - - public boolean hasNext() - { - while (next == null && iterator.hasNext()) - { - @SuppressWarnings("resource") // closed either directly if empty, or, if assigned to next, by either - // the caller of next() or close() - UnfilteredRowIterator rowIterator = iterator.next(); - next = UnfilteredRowIterators.filter(rowIterator, nowInSec); - if (!iterator.isForThrift() && next.isEmpty()) - { - rowIterator.close(); - next = null; - } - } - return next != null; - } - - public RowIterator next() - { - if (next == null && !hasNext()) - throw new NoSuchElementException(); - - RowIterator toReturn = next; - next = null; - return toReturn; - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - - public void close() - { - try - { - iterator.close(); - } - finally - { - if (next != null) - next.close(); - } - } - }; + return FilteredPartitions.filter(iterator, nowInSec); } public static UnfilteredPartitionIterator merge(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec, final MergeListener listener) @@ -204,7 +120,7 @@ public abstract class UnfilteredPartitionIterators // Replace nulls by empty iterators for (int i = 0; i < toMerge.size(); i++) if (toMerge.get(i) == null) - toMerge.set(i, UnfilteredRowIterators.emptyIterator(metadata, partitionKey, isReverseOrder)); + toMerge.set(i, EmptyIterators.unfilteredRow(metadata, partitionKey, isReverseOrder)); return UnfilteredRowIterators.merge(toMerge, nowInSec, rowListener); } @@ -353,13 +269,14 @@ public abstract class UnfilteredPartitionIterators */ public static UnfilteredPartitionIterator loggingIterator(UnfilteredPartitionIterator iterator, final String id, final boolean fullDetails) { - return new WrappingUnfilteredPartitionIterator(iterator) + class Logging extends Transformation<UnfilteredRowIterator> { - public UnfilteredRowIterator next() + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) { - return UnfilteredRowIterators.loggingIterator(super.next(), id, fullDetails); + return UnfilteredRowIterators.loggingIterator(partition, id, fullDetails); } - }; + } + return Transformation.apply(iterator, new Logging()); } /**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java deleted file mode 100644 index 4d4be70..0000000 --- a/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import org.apache.cassandra.db.rows.RowIterator; - -public abstract class WrappingPartitionIterator implements PartitionIterator -{ - protected final PartitionIterator wrapped; - - protected WrappingPartitionIterator(PartitionIterator wrapped) - { - this.wrapped = wrapped; - } - - public boolean hasNext() - { - return wrapped.hasNext(); - } - - public RowIterator next() - { - return wrapped.next(); - } - - public void remove() - { - wrapped.remove(); - } - - public void close() - { - wrapped.close(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java deleted file mode 100644 index ebf3c28..0000000 --- a/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.rows.UnfilteredRowIterator; -import org.apache.cassandra.db.rows.UnfilteredRowIterators; - -/** - * A utility class for writing partition iterators that filter/modify other - * partition iterators. - * - * This work a little bit like Guava's AbstractIterator in that you only need - * to implement the computeNext() method, though that method takes as argument - * the UnfilteredRowIterator to filter from the wrapped partition iterator. - */ -public abstract class WrappingUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator -{ - protected final UnfilteredPartitionIterator wrapped; - - private UnfilteredRowIterator next; - - protected WrappingUnfilteredPartitionIterator(UnfilteredPartitionIterator wrapped) - { - this.wrapped = wrapped; - } - - public boolean isForThrift() - { - return wrapped.isForThrift(); - } - - public CFMetaData metadata() - { - return wrapped.metadata(); - } - - public boolean hasNext() - { - prepareNext(); - return next != null; - } - - public UnfilteredRowIterator next() - { - prepareNext(); - assert next != null; - - UnfilteredRowIterator toReturn = next; - next = null; - return toReturn; - } - - private void prepareNext() - { - while (next == null && wrapped.hasNext()) - { - @SuppressWarnings("resource") // Closed on exception, right away if empty or ignored by computeNext, or if assigned to 'next', - // either by the caller to next(), or in close(). - UnfilteredRowIterator wrappedNext = wrapped.next(); - try - { - UnfilteredRowIterator maybeNext = computeNext(wrappedNext); - - // As the wrappd iterator shouldn't return an empty iterator, if computeNext - // gave us back it's input we save the isEmpty check. - if (maybeNext != null && (isForThrift() || maybeNext == wrappedNext || !maybeNext.isEmpty())) - { - next = maybeNext; - return; - } - else - { - wrappedNext.close(); - } - } - catch (RuntimeException | Error e) - { - wrappedNext.close(); - throw e; - } - } - } - - /** - * Given the next UnfilteredRowIterator from the wrapped partition iterator, return - * the (potentially modified) UnfilteredRowIterator to return. Please note that the - * result will be skipped if it's either {@code null} of if it's empty. - * - * The default implementation return it's input unchanged to make it easier - * to write wrapping partition iterators that only change the close method. - */ - protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) - { - return iter; - } - - @Override - public void close() - { - try - { - wrapped.close(); - } - finally - { - if (next != null) - next.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java deleted file mode 100644 index a390bad..0000000 --- a/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.rows; - -import java.util.NoSuchElementException; - -import com.google.common.collect.UnmodifiableIterator; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; - -/** - * Class that makes it easier to write unfiltered iterators that filter or modify - * the returned unfiltered. - * - * The methods you want to override are {@code computeNextStatic} and the {@code computeNext} methods. - * All of these methods are allowed to return a {@code null} value with the meaning of ignoring - * the entry. - */ -public abstract class AlteringUnfilteredRowIterator extends WrappingUnfilteredRowIterator -{ - private Row staticRow; - private Unfiltered next; - - protected AlteringUnfilteredRowIterator(UnfilteredRowIterator wrapped) - { - super(wrapped); - } - - protected Row computeNextStatic(Row row) - { - return row; - } - - protected Row computeNext(Row row) - { - return row; - } - - protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) - { - return marker; - } - - public Row staticRow() - { - if (staticRow == null) - { - Row row = computeNextStatic(super.staticRow()); - staticRow = row == null ? Rows.EMPTY_STATIC_ROW : row; - } - return staticRow; - } - - public boolean hasNext() - { - while (next == null && super.hasNext()) - { - Unfiltered unfiltered = super.next(); - if (unfiltered.isRow()) - { - Row row = computeNext((Row)unfiltered); - if (row != null && !row.isEmpty()) - next = row; - } - else - { - next = computeNext((RangeTombstoneMarker)unfiltered); - } - } - return next != null; - } - - public Unfiltered next() - { - if (!hasNext()) - throw new NoSuchElementException(); - - Unfiltered toReturn = next; - next = null; - return toReturn; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java b/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java new file mode 100644 index 0000000..fb9e908 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java @@ -0,0 +1,64 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.rows; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PartitionColumns; +import org.apache.cassandra.utils.CloseableIterator; + +/** + * A common interface for Row and Unfiltered, that permits sharing of the (majority) common + * methods and functionality + */ +public interface BaseRowIterator<U extends Unfiltered> extends CloseableIterator<U> +{ + /** + * The metadata for the table this iterator on. + */ + public CFMetaData metadata(); + + /** + * Whether or not the rows returned by this iterator are in reversed + * clustering order. + */ + public boolean isReverseOrder(); + + /** + * A subset of the columns for the (static and regular) rows returned by this iterator. + * Every row returned by this iterator must guarantee that it has only those columns. + */ + public PartitionColumns columns(); + + /** + * The partition key of the partition this in an iterator over. + */ + public DecoratedKey partitionKey(); + + /** + * The static part corresponding to this partition (this can be an empty + * row). + */ + public Row staticRow(); + + /** + * Returns whether the provided iterator has no data. + */ + public boolean isEmpty(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/RowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowIterator.java b/src/java/org/apache/cassandra/db/rows/RowIterator.java index 69994dd..f0b4499 100644 --- a/src/java/org/apache/cassandra/db/rows/RowIterator.java +++ b/src/java/org/apache/cassandra/db/rows/RowIterator.java @@ -34,39 +34,9 @@ import org.apache.cassandra.db.*; * reverse clustering order if isReverseOrder is true), and the Row objects returned * by next() are only valid until the next call to hasNext() or next(). */ -public interface RowIterator extends Iterator<Row>, AutoCloseable +public interface RowIterator extends BaseRowIterator<Row> { /** - * The metadata for the table this iterator on. - */ - public CFMetaData metadata(); - - /** - * Whether or not the rows returned by this iterator are in reversed - * clustering order. - */ - public boolean isReverseOrder(); - - /** - * A subset of the columns for the (static and regular) rows returned by this iterator. - * Every row returned by this iterator must guarantee that it has only those columns. - */ - public PartitionColumns columns(); - - /** - * The partition key of the partition this in an iterator over. - */ - public DecoratedKey partitionKey(); - - /** - * The static part corresponding to this partition (this can be an empty - * row). - */ - public Row staticRow(); - - public void close(); - - /** * Returns whether the provided iterator has no data. */ public default boolean isEmpty() http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/RowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java index 30f5c50..551edb8 100644 --- a/src/java/org/apache/cassandra/db/rows/RowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java @@ -17,14 +17,13 @@ */ package org.apache.cassandra.db.rows; -import java.util.*; import java.security.MessageDigest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.utils.FBUtilities; /** @@ -49,54 +48,6 @@ public abstract class RowIterators iterator.next().digest(digest); } - public static RowIterator emptyIterator(CFMetaData cfm, DecoratedKey partitionKey, boolean isReverseOrder) - { - return iterator(cfm, partitionKey, isReverseOrder, Collections.emptyIterator()); - } - - public static RowIterator iterator(CFMetaData cfm, DecoratedKey partitionKey, boolean isReverseOrder, Iterator<Row> iterator) - { - return new RowIterator() - { - public CFMetaData metadata() - { - return cfm; - } - - public boolean isReverseOrder() - { - return isReverseOrder; - } - - public PartitionColumns columns() - { - return PartitionColumns.NONE; - } - - public DecoratedKey partitionKey() - { - return partitionKey; - } - - public Row staticRow() - { - return Rows.EMPTY_STATIC_ROW; - } - - public void close() { } - - public boolean hasNext() - { - return iterator.hasNext(); - } - - public Row next() - { - return iterator.next(); - } - }; - } - /** * Wraps the provided iterator so it logs the returned rows for debugging purposes. * <p> @@ -113,24 +64,23 @@ public abstract class RowIterators metadata.getKeyValidator().getString(iterator.partitionKey().getKey()), iterator.isReverseOrder()); - return new WrappingRowIterator(iterator) + class Log extends Transformation { @Override - public Row staticRow() + public Row applyToStatic(Row row) { - Row row = super.staticRow(); if (!row.isEmpty()) - logger.info("[{}] {}", id, row.toString(metadata())); + logger.info("[{}] {}", id, row.toString(metadata)); return row; } @Override - public Row next() + public Row applyToRow(Row row) { - Row next = super.next(); - logger.info("[{}] {}", id, next.toString(metadata())); - return next; + logger.info("[{}] {}", id, row.toString(metadata)); + return row; } - }; + } + return Transformation.apply(iterator, new Log()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java index 649fd8b..a969858 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java @@ -46,50 +46,20 @@ import org.apache.cassandra.db.*; * the returned objects for longer than the iteration, it must make a copy of * it explicitly. */ -public interface UnfilteredRowIterator extends Iterator<Unfiltered>, AutoCloseable +public interface UnfilteredRowIterator extends BaseRowIterator<Unfiltered> { /** - * The metadata for the table this iterator on. - */ - public CFMetaData metadata(); - - /** - * A subset of the columns for the (static and regular) rows returned by this iterator. - * Every row returned by this iterator must guarantee that it has only those columns. - */ - public PartitionColumns columns(); - - /** - * Whether or not the atom returned by this iterator are in reversed - * clustering order. - */ - public boolean isReverseOrder(); - - /** - * The partition key of the partition this in an iterator over. - */ - public DecoratedKey partitionKey(); - - /** * The partition level deletion for the partition this iterate over. */ public DeletionTime partitionLevelDeletion(); /** - * The static part corresponding to this partition (this can be an empty - * row). - */ - public Row staticRow(); - - /** * Return "statistics" about what is returned by this iterator. Those are used for * performance reasons (for delta-encoding for instance) and code should not * expect those to be exact. */ public EncodingStats stats(); - public void close(); - /** * Returns whether this iterator has no data (including no deletion data). */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java index 3a0558e..932ca4c 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -202,7 +202,7 @@ public class UnfilteredRowIteratorSerializer public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag, Header header) throws IOException { if (header.isEmpty) - return UnfilteredRowIterators.emptyIterator(metadata, header.key, header.isReversed); + return EmptyIterators.unfilteredRow(metadata, header.key, header.isReversed); final SerializationHelper helper = new SerializationHelper(metadata, version, flag); final SerializationHeader sHeader = header.sHeader; http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 22628e2..ea929d7 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -22,10 +22,12 @@ import java.security.MessageDigest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.AbstractIterator; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.transform.FilteredRows; +import org.apache.cassandra.db.transform.MoreRows; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.serializers.MarshalException; @@ -63,8 +65,7 @@ public abstract class UnfilteredRowIterators */ public static RowIterator filter(UnfilteredRowIterator iter, int nowInSec) { - return new FilteringIterator(iter, nowInSec); - + return FilteredRows.filter(iter, nowInSec); } /** @@ -90,72 +91,12 @@ public abstract class UnfilteredRowIterators return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener); } - public static UnfilteredRowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder) - { - return noRowsIterator(cfm, partitionKey, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, isReverseOrder); - } /** * Returns an empty atom iterator for a given partition. */ public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder) { - PartitionColumns columns = staticRow == Rows.EMPTY_STATIC_ROW ? PartitionColumns.NONE - : new PartitionColumns(Columns.from(staticRow.columns()), Columns.NONE); - return new UnfilteredRowIterator() - { - public CFMetaData metadata() - { - return cfm; - } - - public boolean isReverseOrder() - { - return isReverseOrder; - } - - public PartitionColumns columns() - { - return columns; - } - - public DecoratedKey partitionKey() - { - return partitionKey; - } - - public DeletionTime partitionLevelDeletion() - { - return partitionDeletion; - } - - public Row staticRow() - { - return staticRow; - } - - public EncodingStats stats() - { - return EncodingStats.NO_STATS; - } - - public boolean hasNext() - { - return false; - } - - public Unfiltered next() - { - throw new NoSuchElementException(); - } - - public void remove() - { - } - - public void close() - { - } - }; + return EmptyIterators.unfilteredRow(cfm, partitionKey, isReverseOrder, staticRow, partitionDeletion); } /** @@ -201,65 +142,45 @@ public abstract class UnfilteredRowIterators && iter1.columns().equals(iter2.columns()) && iter1.staticRow().equals(iter2.staticRow()); - return new AbstractUnfilteredRowIterator(iter1.metadata(), - iter1.partitionKey(), - iter1.partitionLevelDeletion(), - iter1.columns(), - iter1.staticRow(), - iter1.isReverseOrder(), - iter1.stats()) + class Extend implements MoreRows<UnfilteredRowIterator> { - protected Unfiltered computeNext() + boolean returned = false; + public UnfilteredRowIterator moreContents() { - if (iter1.hasNext()) - return iter1.next(); - - return iter2.hasNext() ? iter2.next() : endOfData(); + if (returned) + return null; + returned = true; + return iter2; } + } - @Override - public void close() - { - try - { - iter1.close(); - } - finally - { - iter2.close(); - } - } - }; + return MoreRows.extend(iter1, new Extend()); } public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator iterator, final AbstractAllocator allocator) { - return new AlteringUnfilteredRowIterator(iterator) + class Cloner extends Transformation { - private Row.Builder regularBuilder; + private final Row.Builder builder = allocator.cloningBTreeRowBuilder(); - @Override - protected Row computeNextStatic(Row row) + public Row applyToStatic(Row row) { - Row.Builder staticBuilder = allocator.cloningBTreeRowBuilder(); - return Rows.copy(row, staticBuilder).build(); + return Rows.copy(row, builder).build(); } @Override - protected Row computeNext(Row row) + public Row applyToRow(Row row) { - if (regularBuilder == null) - regularBuilder = allocator.cloningBTreeRowBuilder(); - - return Rows.copy(row, regularBuilder).build(); + return Rows.copy(row, builder).build(); } @Override - protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) { return marker.copy(allocator); } - }; + } + return Transformation.apply(iterator, new Cloner()); } /** @@ -277,24 +198,24 @@ public abstract class UnfilteredRowIterators */ public static UnfilteredRowIterator withValidation(UnfilteredRowIterator iterator, final String filename) { - return new AlteringUnfilteredRowIterator(iterator) + class Validator extends Transformation { @Override - protected Row computeNextStatic(Row row) + public Row applyToStatic(Row row) { validate(row); return row; } @Override - protected Row computeNext(Row row) + public Row applyToRow(Row row) { validate(row); return row; } @Override - protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) { validate(marker); return marker; @@ -311,7 +232,8 @@ public abstract class UnfilteredRowIterators throw new CorruptSSTableException(me, filename); } } - }; + } + return Transformation.apply(iterator, new Validator()); } /** @@ -331,30 +253,31 @@ public abstract class UnfilteredRowIterators iterator.isReverseOrder(), iterator.partitionLevelDeletion().markedForDeleteAt()); - return new AlteringUnfilteredRowIterator(iterator) + class Logger extends Transformation { @Override - protected Row computeNextStatic(Row row) + public Row applyToStatic(Row row) { if (!row.isEmpty()) - logger.info("[{}] {}", id, row.toString(metadata(), fullDetails)); + logger.info("[{}] {}", id, row.toString(metadata, fullDetails)); return row; } @Override - protected Row computeNext(Row row) + public Row applyToRow(Row row) { - logger.info("[{}] {}", id, row.toString(metadata(), fullDetails)); + logger.info("[{}] {}", id, row.toString(metadata, fullDetails)); return row; } @Override - protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) { - logger.info("[{}] {}", id, marker.toString(metadata())); + logger.info("[{}] {}", id, marker.toString(metadata)); return marker; } - }; + } + return Transformation.apply(iterator, new Logger()); } /** @@ -577,66 +500,4 @@ public abstract class UnfilteredRowIterators } } } - - private static class FilteringIterator extends AbstractIterator<Row> implements RowIterator - { - private final UnfilteredRowIterator iter; - private final int nowInSec; - - public FilteringIterator(UnfilteredRowIterator iter, int nowInSec) - { - this.iter = iter; - this.nowInSec = nowInSec; - } - - public CFMetaData metadata() - { - return iter.metadata(); - } - - public boolean isReverseOrder() - { - return iter.isReverseOrder(); - } - - public PartitionColumns columns() - { - return iter.columns(); - } - - public DecoratedKey partitionKey() - { - return iter.partitionKey(); - } - - public Row staticRow() - { - Row row = iter.staticRow(); - if (row.isEmpty()) - return Rows.EMPTY_STATIC_ROW; - - row = row.purge(DeletionPurger.PURGE_ALL, nowInSec); - return row == null ? Rows.EMPTY_STATIC_ROW : row; - } - - protected Row computeNext() - { - while (iter.hasNext()) - { - Unfiltered next = iter.next(); - if (next.isRangeTombstoneMarker()) - continue; - - Row row = ((Row)next).purge(DeletionPurger.PURGE_ALL, nowInSec); - if (row != null) - return row; - } - return endOfData(); - } - - public void close() - { - iter.close(); - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java b/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java deleted file mode 100644 index 8847a47..0000000 --- a/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.rows; - -import com.google.common.collect.UnmodifiableIterator; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; - -/** - * Abstract class to make writing atom iterators that wrap another iterator - * easier. By default, the wrapping iterator simply delegate every call to - * the wrapped iterator so concrete implementations will override some of the - * methods. - */ -public abstract class WrappingRowIterator extends UnmodifiableIterator<Row> implements RowIterator -{ - protected final RowIterator wrapped; - - protected WrappingRowIterator(RowIterator wrapped) - { - this.wrapped = wrapped; - } - - public CFMetaData metadata() - { - return wrapped.metadata(); - } - - public boolean isReverseOrder() - { - return wrapped.isReverseOrder(); - } - - public PartitionColumns columns() - { - return wrapped.columns(); - } - - public DecoratedKey partitionKey() - { - return wrapped.partitionKey(); - } - - public Row staticRow() - { - return wrapped.staticRow(); - } - - public boolean hasNext() - { - return wrapped.hasNext(); - } - - public Row next() - { - return wrapped.next(); - } - - public void close() - { - wrapped.close(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java index 84713eb..8b18554 100644 --- a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java +++ b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java @@ -29,7 +29,7 @@ import org.apache.cassandra.db.*; * some of the methods. * <p> * Note that if most of what you want to do is modifying/filtering the returned - * {@code Unfiltered}, {@link AlteringUnfilteredRowIterator} can be a simpler option. + * {@code Unfiltered}, {@link org.apache.cassandra.db.transform.Transformation.apply} can be a simpler option. */ public abstract class WrappingUnfilteredRowIterator extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator { http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/BaseIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/BaseIterator.java b/src/java/org/apache/cassandra/db/transform/BaseIterator.java new file mode 100644 index 0000000..9b95dfa --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/BaseIterator.java @@ -0,0 +1,129 @@ +package org.apache.cassandra.db.transform; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import net.nicoulaj.compilecommand.annotations.DontInline; +import org.apache.cassandra.utils.CloseableIterator; + +import static org.apache.cassandra.utils.Throwables.maybeFail; +import static org.apache.cassandra.utils.Throwables.merge; + +abstract class BaseIterator<V, I extends CloseableIterator<? extends V>, O extends V> extends Stack implements AutoCloseable, Iterator<O> +{ + I input; + V next; + Stop stop; // applies at the end of the current next() + + static class Stop + { + // TODO: consider moving "next" into here, so that a stop() when signalled outside of a function call (e.g. in attach) + // can take effect immediately; this doesn't seem to be necessary at the moment, but it might cause least surprise in future + boolean isSignalled; + } + + // responsibility for initialising next lies with the subclass + BaseIterator(BaseIterator<? extends V, ? extends I, ?> copyFrom) + { + super(copyFrom); + this.input = copyFrom.input; + this.next = copyFrom.next; + this.stop = copyFrom.stop; + } + + BaseIterator(I input) + { + this.input = input; + this.stop = new Stop(); + } + + /** + * run the corresponding runOnClose method for the first length transformations. + * + * used in hasMoreContents to close the methods preceding the MoreContents + */ + protected abstract Throwable runOnClose(int length); + + /** + * apply the relevant method from the transformation to the value. + * + * used in hasMoreContents to apply the functions that follow the MoreContents + */ + protected abstract V applyOne(V value, Transformation transformation); + + public final void close() + { + Throwable fail = runOnClose(length); + if (next instanceof AutoCloseable) + { + try { ((AutoCloseable) next).close(); } + catch (Throwable t) { fail = merge(fail, t); } + } + try { input.close(); } + catch (Throwable t) { fail = merge(fail, t); } + maybeFail(fail); + } + + public final O next() + { + if (next == null && !hasNext()) + throw new NoSuchElementException(); + + O next = (O) this.next; + this.next = null; + return next; + } + + // may set next != null if the next contents are a transforming iterator that already has data to return, + // in which case we immediately have more contents to yield + protected final boolean hasMoreContents() + { + return moreContents.length > 0 && tryGetMoreContents(); + } + + @DontInline + private boolean tryGetMoreContents() + { + for (int i = 0 ; i < moreContents.length ; i++) + { + MoreContentsHolder holder = moreContents[i]; + MoreContents provider = holder.moreContents; + I newContents = (I) provider.moreContents(); + if (newContents == null) + continue; + + input.close(); + input = newContents; + Stack prefix = EMPTY; + if (newContents instanceof BaseIterator) + { + // we're refilling with transformed contents, so swap in its internals directly + // TODO: ensure that top-level data is consistent. i.e. staticRow, partitionlevelDeletion etc are same? + BaseIterator abstr = (BaseIterator) newContents; + prefix = abstr; + input = (I) abstr.input; + next = apply((V) abstr.next, holder.length); // must apply all remaining functions to the next, if any + } + + // since we're truncating our transformation stack to only those occurring after the extend transformation + // we have to run any prior runOnClose methods + maybeFail(runOnClose(holder.length)); + refill(prefix, holder, i); + + if (next != null || input.hasNext()) + return true; + + i = -1; + } + return false; + } + + // apply the functions [from..length) + private V apply(V next, int from) + { + while (next != null & from < length) + next = applyOne(next, stack[from++]); + return next; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/BasePartitions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/BasePartitions.java b/src/java/org/apache/cassandra/db/transform/BasePartitions.java new file mode 100644 index 0000000..e795760 --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/BasePartitions.java @@ -0,0 +1,100 @@ +package org.apache.cassandra.db.transform; + +import java.util.Collections; + +import org.apache.cassandra.db.partitions.BasePartitionIterator; +import org.apache.cassandra.db.rows.BaseRowIterator; +import org.apache.cassandra.utils.Throwables; + +import static org.apache.cassandra.utils.Throwables.merge; + +public abstract class BasePartitions<R extends BaseRowIterator<?>, I extends BasePartitionIterator<? extends BaseRowIterator<?>>> +extends BaseIterator<BaseRowIterator<?>, I, R> +implements BasePartitionIterator<R> +{ + + public BasePartitions(I input) + { + super(input); + } + + BasePartitions(BasePartitions<?, ? extends I> copyFrom) + { + super(copyFrom); + } + + + // ********************************* + + + protected BaseRowIterator<?> applyOne(BaseRowIterator<?> value, Transformation transformation) + { + return value == null ? null : transformation.applyToPartition(value); + } + + void add(Transformation transformation) + { + transformation.attachTo(this); + super.add(transformation); + next = applyOne(next, transformation); + } + + protected Throwable runOnClose(int length) + { + Throwable fail = null; + Transformation[] fs = stack; + for (int i = 0 ; i < length ; i++) + { + try + { + fs[i].onClose(); + } + catch (Throwable t) + { + fail = merge(fail, t); + } + } + return fail; + } + + public final boolean hasNext() + { + BaseRowIterator<?> next = null; + try + { + + Stop stop = this.stop; + while (this.next == null) + { + Transformation[] fs = stack; + int len = length; + + while (!stop.isSignalled && input.hasNext()) + { + next = input.next(); + for (int i = 0 ; next != null & i < len ; i++) + next = fs[i].applyToPartition(next); + + if (next != null) + { + this.next = next; + return true; + } + } + + if (stop.isSignalled || !hasMoreContents()) + return false; + } + return true; + + } + catch (Throwable t) + { + if (next != null) + Throwables.close(t, Collections.singleton(next)); + throw t; + } + } + +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/BaseRows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/BaseRows.java b/src/java/org/apache/cassandra/db/transform/BaseRows.java new file mode 100644 index 0000000..78526e8 --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java @@ -0,0 +1,139 @@ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PartitionColumns; +import org.apache.cassandra.db.rows.*; + +import static org.apache.cassandra.utils.Throwables.merge; + +public abstract class BaseRows<R extends Unfiltered, I extends BaseRowIterator<? extends Unfiltered>> +extends BaseIterator<Unfiltered, I, R> +implements BaseRowIterator<R> +{ + + private Row staticRow; + + public BaseRows(I input) + { + super(input); + staticRow = input.staticRow(); + } + + // swap parameter order to avoid casting errors + BaseRows(BaseRows<?, ? extends I> copyFrom) + { + super(copyFrom); + staticRow = copyFrom.staticRow; + } + + public CFMetaData metadata() + { + return input.metadata(); + } + + public boolean isReverseOrder() + { + return input.isReverseOrder(); + } + + public PartitionColumns columns() + { + return input.columns(); + } + + public DecoratedKey partitionKey() + { + return input.partitionKey(); + } + + public Row staticRow() + { + return staticRow; + } + + + // ************************** + + + @Override + protected Throwable runOnClose(int length) + { + Throwable fail = null; + Transformation[] fs = stack; + for (int i = 0 ; i < length ; i++) + { + try + { + fs[i].onPartitionClose(); + } + catch (Throwable t) + { + fail = merge(fail, t); + } + } + return fail; + } + + @Override + void add(Transformation transformation) + { + transformation.attachTo(this); + super.add(transformation); + + // transform any existing data + staticRow = transformation.applyToStatic(staticRow); + next = applyOne(next, transformation); + } + + @Override + protected Unfiltered applyOne(Unfiltered value, Transformation transformation) + { + return value == null + ? null + : value instanceof Row + ? transformation.applyToRow((Row) value) + : transformation.applyToMarker((RangeTombstoneMarker) value); + } + + @Override + public final boolean hasNext() + { + Stop stop = this.stop; + while (this.next == null) + { + Transformation[] fs = stack; + int len = length; + + while (!stop.isSignalled && input.hasNext()) + { + Unfiltered next = input.next(); + + if (next.isRow()) + { + Row row = (Row) next; + for (int i = 0 ; row != null && i < len ; i++) + row = fs[i].applyToRow(row); + next = row; + } + else + { + RangeTombstoneMarker rtm = (RangeTombstoneMarker) next; + for (int i = 0 ; rtm != null && i < len ; i++) + rtm = fs[i].applyToMarker(rtm); + next = rtm; + } + + if (next != null) + { + this.next = next; + return true; + } + } + + if (stop.isSignalled || !hasMoreContents()) + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/Filter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java new file mode 100644 index 0000000..3bf831f --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/Filter.java @@ -0,0 +1,56 @@ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.DeletionPurger; +import org.apache.cassandra.db.rows.*; + +final class Filter extends Transformation +{ + private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration + private final int nowInSec; + public Filter(boolean filterEmpty, int nowInSec) + { + this.filterEmpty = filterEmpty; + this.nowInSec = nowInSec; + } + + public RowIterator applyToPartition(BaseRowIterator iterator) + { + RowIterator filtered = iterator instanceof UnfilteredRows + ? new FilteredRows(this, (UnfilteredRows) iterator) + : new FilteredRows((UnfilteredRowIterator) iterator, this); + + if (filterEmpty && closeIfEmpty(filtered)) + return null; + + return filtered; + } + + public Row applyToStatic(Row row) + { + if (row.isEmpty()) + return Rows.EMPTY_STATIC_ROW; + + row = row.purge(DeletionPurger.PURGE_ALL, nowInSec); + return row == null ? Rows.EMPTY_STATIC_ROW : row; + } + + public Row applyToRow(Row row) + { + return row.purge(DeletionPurger.PURGE_ALL, nowInSec); + } + + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + return null; + } + + private static boolean closeIfEmpty(BaseRowIterator<?> iter) + { + if (iter.isEmpty()) + { + iter.close(); + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java new file mode 100644 index 0000000..5a802dc --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java @@ -0,0 +1,40 @@ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.partitions.BasePartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.RowIterator; + +public final class FilteredPartitions extends BasePartitions<RowIterator, BasePartitionIterator<?>> implements PartitionIterator +{ + // wrap basic iterator for transformation + FilteredPartitions(PartitionIterator input) + { + super(input); + } + + // wrap basic unfiltered iterator for transformation, applying filter as first transformation + FilteredPartitions(UnfilteredPartitionIterator input, Filter filter) + { + super(input); + add(filter); + } + + // copy from an UnfilteredPartitions, applying a filter to convert it + FilteredPartitions(Filter filter, UnfilteredPartitions copyFrom) + { + super(copyFrom); + add(filter); + } + + /** + * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator. + */ + public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs) + { + Filter filter = new Filter(!iterator.isForThrift(), nowInSecs); + if (iterator instanceof UnfilteredPartitions) + return new FilteredPartitions(filter, (UnfilteredPartitions) iterator); + return new FilteredPartitions(iterator, filter); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/FilteredRows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/FilteredRows.java b/src/java/org/apache/cassandra/db/transform/FilteredRows.java new file mode 100644 index 0000000..b21b451 --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java @@ -0,0 +1,40 @@ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.rows.BaseRowIterator; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implements RowIterator +{ + FilteredRows(RowIterator input) + { + super(input); + } + + FilteredRows(UnfilteredRowIterator input, Filter filter) + { + super(input); + add(filter); + } + + FilteredRows(Filter filter, UnfilteredRows input) + { + super(input); + add(filter); + } + + @Override + public boolean isEmpty() + { + return staticRow().isEmpty() && !hasNext(); + } + + /** + * Filter any RangeTombstoneMarker from the iterator, transforming it into a RowIterator. + */ + public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs) + { + return new Filter(false, nowInSecs).applyToPartition(iterator); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/MoreContents.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/MoreContents.java b/src/java/org/apache/cassandra/db/transform/MoreContents.java new file mode 100644 index 0000000..7e392ca --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/MoreContents.java @@ -0,0 +1,8 @@ +package org.apache.cassandra.db.transform; + +// a shared internal interface, that is hidden to provide type-safety to the user +interface MoreContents<I> +{ + public abstract I moreContents(); +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/MorePartitions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/MorePartitions.java b/src/java/org/apache/cassandra/db/transform/MorePartitions.java new file mode 100644 index 0000000..5cfcc4c --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/MorePartitions.java @@ -0,0 +1,35 @@ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.partitions.BasePartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; + +import static org.apache.cassandra.db.transform.Transformation.add; +import static org.apache.cassandra.db.transform.Transformation.mutable; + +/** + * An interface for providing new partitions for a partitions iterator. + * + * The new contents are produced as a normal arbitrary PartitionIterator or UnfilteredPartitionIterator (as appropriate) + * + * The transforming iterator invokes this method when any current source is exhausted, then then inserts the + * new contents as the new source. + * + * If the new source is itself a product of any transformations, the two transforming iterators are merged + * so that control flow always occurs at the outermost point + */ +public interface MorePartitions<I extends BasePartitionIterator<?>> extends MoreContents<I> +{ + + public static UnfilteredPartitionIterator extend(UnfilteredPartitionIterator iterator, MorePartitions<? super UnfilteredPartitionIterator> more) + { + return add(mutable(iterator), more); + } + + public static PartitionIterator extend(PartitionIterator iterator, MorePartitions<? super PartitionIterator> more) + { + return add(mutable(iterator), more); + } + +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/MoreRows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/MoreRows.java b/src/java/org/apache/cassandra/db/transform/MoreRows.java new file mode 100644 index 0000000..f406a49 --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/MoreRows.java @@ -0,0 +1,36 @@ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.rows.BaseRowIterator; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +import static org.apache.cassandra.db.transform.Transformation.add; +import static org.apache.cassandra.db.transform.Transformation.mutable; + +/** + * An interface for providing new row contents for a partition. + * + * The new contents are produced as a normal arbitrary RowIterator or UnfilteredRowIterator (as appropriate), + * with matching staticRow, partitionKey and partitionLevelDeletion. + * + * The transforming iterator invokes this method when any current source is exhausted, then then inserts the + * new contents as the new source. + * + * If the new source is itself a product of any transformations, the two transforming iterators are merged + * so that control flow always occurs at the outermost point + */ +public interface MoreRows<I extends BaseRowIterator<?>> extends MoreContents<I> +{ + + public static UnfilteredRowIterator extend(UnfilteredRowIterator iterator, MoreRows<? super UnfilteredRowIterator> more) + { + return add(mutable(iterator), more); + } + + public static RowIterator extend(RowIterator iterator, MoreRows<? super RowIterator> more) + { + return add(mutable(iterator), more); + } + +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/Stack.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/Stack.java b/src/java/org/apache/cassandra/db/transform/Stack.java new file mode 100644 index 0000000..aac1679 --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/Stack.java @@ -0,0 +1,81 @@ +package org.apache.cassandra.db.transform; + +import java.util.Arrays; + +class Stack +{ + static final Stack EMPTY = new Stack(); + + Transformation[] stack; + int length; // number of used stack entries + MoreContentsHolder[] moreContents; // stack of more contents providers (if any; usually zero or one) + + // an internal placeholder for a MoreContents, storing the associated stack length at time it was applied + static class MoreContentsHolder + { + final MoreContents moreContents; + int length; + private MoreContentsHolder(MoreContents moreContents, int length) + { + this.moreContents = moreContents; + this.length = length; + } + } + + Stack() + { + stack = new Transformation[0]; + moreContents = new MoreContentsHolder[0]; + } + + Stack(Stack copy) + { + stack = copy.stack; + length = copy.length; + moreContents = copy.moreContents; + } + + void add(Transformation add) + { + if (length == stack.length) + stack = resize(stack); + stack[length++] = add; + } + + void add(MoreContents more) + { + this.moreContents = Arrays.copyOf(moreContents, moreContents.length + 1); + this.moreContents[moreContents.length - 1] = new MoreContentsHolder(more, length); + } + + private static <E> E[] resize(E[] array) + { + int newLen = array.length == 0 ? 5 : array.length * 2; + return Arrays.copyOf(array, newLen); + } + + // reinitialise the transformations after a moreContents applies + void refill(Stack prefix, MoreContentsHolder holder, int index) + { + // drop the transformations that were present when the MoreContents was attached, + // and prefix any transformations in the new contents (if it's a transformer) + moreContents = splice(prefix.moreContents, prefix.moreContents.length, moreContents, index, moreContents.length); + stack = splice(prefix.stack, prefix.length, stack, holder.length, length); + length += prefix.length - holder.length; + holder.length = prefix.length; + } + + private static <E> E[] splice(E[] prefix, int prefixCount, E[] keep, int keepFrom, int keepTo) + { + int keepCount = keepTo - keepFrom; + int newCount = prefixCount + keepCount; + if (newCount > keep.length) + keep = Arrays.copyOf(keep, newCount); + if (keepFrom != prefixCount) + System.arraycopy(keep, keepFrom, keep, prefixCount, keepCount); + if (prefixCount != 0) + System.arraycopy(prefix, 0, keep, 0, prefixCount); + return keep; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java b/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java new file mode 100644 index 0000000..f3afdc0 --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java @@ -0,0 +1,60 @@ +package org.apache.cassandra.db.transform; + +import net.nicoulaj.compilecommand.annotations.DontInline; +import org.apache.cassandra.db.rows.BaseRowIterator; + +// A Transformation that can stop an iterator earlier than its natural exhaustion +public abstract class StoppingTransformation<I extends BaseRowIterator<?>> extends Transformation<I> +{ + private BaseIterator.Stop stop; + private BaseIterator.Stop stopInPartition; + + /** + * If invoked by a subclass, any partitions iterator this transformation has been applied to will terminate + * after any currently-processing item is returned, as will any row/unfiltered iterator + */ + @DontInline + protected void stop() + { + if (stop != null) + stop.isSignalled = true; + stopInPartition(); + } + + /** + * If invoked by a subclass, any rows/unfiltered iterator this transformation has been applied to will terminate + * after any currently-processing item is returned + */ + @DontInline + protected void stopInPartition() + { + if (stopInPartition != null) + stopInPartition.isSignalled = true; + } + + @Override + protected void attachTo(BasePartitions partitions) + { + assert this.stop == null; + this.stop = partitions.stop; + } + + @Override + protected void attachTo(BaseRows rows) + { + assert this.stopInPartition == null; + this.stopInPartition = rows.stop; + } + + @Override + protected void onClose() + { + stop = null; + } + + @Override + protected void onPartitionClose() + { + stopInPartition = null; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/Transformation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/Transformation.java b/src/java/org/apache/cassandra/db/transform/Transformation.java new file mode 100644 index 0000000..29e2e15 --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/Transformation.java @@ -0,0 +1,145 @@ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.*; + +/** + * We have a single common superclass for all Transformations to make implementation efficient. + * we have a shared stack for all transformations, and can share the same transformation across partition and row + * iterators, reducing garbage. Internal code is also simplified by always having a basic no-op implementation to invoke. + * + * Only the necessary methods need be overridden. Early termination is provided by invoking the method's stop or stopInPartition + * methods, rather than having their own abstract method to invoke, as this is both more efficient and simpler to reason about. + */ +public abstract class Transformation<I extends BaseRowIterator<?>> +{ + // internal methods for StoppableTransformation only + void attachTo(BasePartitions partitions) { } + void attachTo(BaseRows rows) { } + + /** + * Run on the close of any (logical) partitions iterator this function was applied to + * + * We stipulate logical, because if applied to a transformed iterator the lifetime of the iterator + * object may be longer than the lifetime of the "logical" iterator it was applied to; if the iterator + * is refilled with MoreContents, for instance, the iterator may outlive this function + */ + protected void onClose() { } + + /** + * Run on the close of any (logical) rows iterator this function was applied to + * + * We stipulate logical, because if applied to a transformed iterator the lifetime of the iterator + * object may be longer than the lifetime of the "logical" iterator it was applied to; if the iterator + * is refilled with MoreContents, for instance, the iterator may outlive this function + */ + protected void onPartitionClose() { } + + /** + * Applied to any rows iterator (partition) we encounter in a partitions iterator + */ + protected I applyToPartition(I partition) + { + return partition; + } + + /** + * Applied to any row we encounter in a rows iterator + */ + protected Row applyToRow(Row row) + { + return row; + } + + /** + * Applied to any RTM we encounter in a rows/unfiltered iterator + */ + protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + return marker; + } + + /** + * Applied to the static row of any rows iterator. + * + * NOTE that this is only applied to the first iterator in any sequence of iterators filled by a MoreContents; + * the static data for such iterators is all expected to be equal + */ + protected Row applyToStatic(Row row) + { + return row; + } + + /** + * Applied to the partition-level deletion of any rows iterator. + * + * NOTE that this is only applied to the first iterator in any sequence of iterators filled by a MoreContents; + * the static data for such iterators is all expected to be equal + */ + protected DeletionTime applyToDeletion(DeletionTime deletionTime) + { + return deletionTime; + } + + + //****************************************************** + // Static Application Methods + //****************************************************** + + + public static UnfilteredPartitionIterator apply(UnfilteredPartitionIterator iterator, Transformation<? super UnfilteredRowIterator> transformation) + { + return add(mutable(iterator), transformation); + } + public static PartitionIterator apply(PartitionIterator iterator, Transformation<? super RowIterator> transformation) + { + return add(mutable(iterator), transformation); + } + public static UnfilteredRowIterator apply(UnfilteredRowIterator iterator, Transformation<?> transformation) + { + return add(mutable(iterator), transformation); + } + public static RowIterator apply(RowIterator iterator, Transformation<?> transformation) + { + return add(mutable(iterator), transformation); + } + + static UnfilteredPartitions mutable(UnfilteredPartitionIterator iterator) + { + return iterator instanceof UnfilteredPartitions + ? (UnfilteredPartitions) iterator + : new UnfilteredPartitions(iterator); + } + static FilteredPartitions mutable(PartitionIterator iterator) + { + return iterator instanceof FilteredPartitions + ? (FilteredPartitions) iterator + : new FilteredPartitions(iterator); + } + static UnfilteredRows mutable(UnfilteredRowIterator iterator) + { + return iterator instanceof UnfilteredRows + ? (UnfilteredRows) iterator + : new UnfilteredRows(iterator); + } + static FilteredRows mutable(RowIterator iterator) + { + return iterator instanceof FilteredRows + ? (FilteredRows) iterator + : new FilteredRows(iterator); + } + + static <E extends BaseIterator> E add(E to, Transformation add) + { + to.add(add); + return to; + } + static <E extends BaseIterator> E add(E to, MoreContents add) + { + to.add(add); + return to; + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java new file mode 100644 index 0000000..4e40545 --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java @@ -0,0 +1,27 @@ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +final class UnfilteredPartitions extends BasePartitions<UnfilteredRowIterator, UnfilteredPartitionIterator> implements UnfilteredPartitionIterator +{ + final boolean isForThrift; + + // wrap an iterator for transformation + public UnfilteredPartitions(UnfilteredPartitionIterator input) + { + super(input); + this.isForThrift = input.isForThrift(); + } + + public boolean isForThrift() + { + return isForThrift; + } + + public CFMetaData metadata() + { + return input.metadata(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java new file mode 100644 index 0000000..98640ae --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java @@ -0,0 +1,40 @@ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> implements UnfilteredRowIterator +{ + private DeletionTime partitionLevelDeletion; + + public UnfilteredRows(UnfilteredRowIterator input) + { + super(input); + partitionLevelDeletion = input.partitionLevelDeletion(); + } + + @Override + void add(Transformation add) + { + super.add(add); + partitionLevelDeletion = add.applyToDeletion(partitionLevelDeletion); + } + + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public EncodingStats stats() + { + return input.stats(); + } + + @Override + public boolean isEmpty() + { + return staticRow().isEmpty() && partitionLevelDeletion().isLive() && !hasNext(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java index d6ae8e2..e66f0a3 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java @@ -71,14 +71,7 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder } finally { - try - { - iter.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + iter.close(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java index f1751f5..46701bc 100644 --- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.index.internal.CassandraIndex; import org.apache.cassandra.index.internal.CassandraIndexSearcher; import org.apache.cassandra.index.internal.IndexEntry; @@ -203,18 +204,13 @@ public class CompositesSearcher extends CassandraIndexSearcher }); } - return new AlteringUnfilteredRowIterator(dataIter) + ClusteringComparator comparator = dataIter.metadata().comparator; + class Transform extends Transformation { private int entriesIdx; - public void close() - { - deleteAllEntries(staleEntries, writeOp, nowInSec); - super.close(); - } - @Override - protected Row computeNext(Row row) + public Row applyToRow(Row row) { IndexEntry entry = findEntry(row.clustering()); if (!index.isStale(row, indexValue, nowInSec)) @@ -234,7 +230,7 @@ public class CompositesSearcher extends CassandraIndexSearcher // next entry, the one at 'entriesIdx'. However, we can have stale entries, entries // that have no corresponding row in the base table typically because of a range // tombstone or partition level deletion. Delete such stale entries. - int cmp = metadata().comparator.compare(entry.indexedEntryClustering, clustering); + int cmp = comparator.compare(entry.indexedEntryClustering, clustering); assert cmp <= 0; // this would means entries are not in clustering order, which shouldn't happen if (cmp == 0) return entry; @@ -244,6 +240,14 @@ public class CompositesSearcher extends CassandraIndexSearcher // entries correspond to the rows we've queried, so we shouldn't have a row that has no corresponding entry. throw new AssertionError(); } - }; + + @Override + public void onClose() + { + deleteAllEntries(staleEntries, writeOp, nowInSec); + } + } + + return Transformation.apply(dataIter, new Transform()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java index bbc56cc..6f395f8 100644 --- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.io.sstable; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -70,12 +69,10 @@ public class ReducingKeyIterator implements CloseableIterator<DecoratedKey> } } - public void close() throws IOException + public void close() { if (mi != null) - { mi.close(); - } } public long getTotalBytes() http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index e02c919..b6077e0 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.cassandra.db.*; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; @@ -143,11 +144,11 @@ public class BigTableWriter extends SSTableWriter long startPosition = beforeAppend(key); - try (StatsCollector withStats = new StatsCollector(iterator, metadataCollector)) + try (UnfilteredRowIterator collecting = Transformation.apply(iterator, new StatsCollector(metadataCollector))) { - ColumnIndex index = ColumnIndex.writeAndBuildIndex(withStats, dataFile, header, descriptor.version); + ColumnIndex index = ColumnIndex.writeAndBuildIndex(collecting, dataFile, header, descriptor.version); - RowIndexEntry entry = RowIndexEntry.create(startPosition, iterator.partitionLevelDeletion(), index); + RowIndexEntry entry = RowIndexEntry.create(startPosition, collecting.partitionLevelDeletion(), index); long endPosition = dataFile.position(); long rowSize = endPosition - startPosition; @@ -171,20 +172,18 @@ public class BigTableWriter extends SSTableWriter } } - private static class StatsCollector extends AlteringUnfilteredRowIterator + private static class StatsCollector extends Transformation { private final MetadataCollector collector; private int cellCount; - StatsCollector(UnfilteredRowIterator iter, MetadataCollector collector) + StatsCollector(MetadataCollector collector) { - super(iter); this.collector = collector; - collector.update(iter.partitionLevelDeletion()); } @Override - protected Row computeNextStatic(Row row) + public Row applyToStatic(Row row) { if (!row.isEmpty()) cellCount += Rows.collectStats(row, collector); @@ -192,7 +191,7 @@ public class BigTableWriter extends SSTableWriter } @Override - protected Row computeNext(Row row) + public Row applyToRow(Row row) { collector.updateClusteringValues(row.clustering()); cellCount += Rows.collectStats(row, collector); @@ -200,7 +199,7 @@ public class BigTableWriter extends SSTableWriter } @Override - protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) { collector.updateClusteringValues(marker.clustering()); if (marker.isBoundary()) @@ -217,10 +216,16 @@ public class BigTableWriter extends SSTableWriter } @Override - public void close() + public void onPartitionClose() { collector.addCellPerPartitionCount(cellCount); - super.close(); + } + + @Override + public DeletionTime applyToDeletion(DeletionTime deletionTime) + { + collector.update(deletionTime); + return deletionTime; } }
