http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java new file mode 100644 index 0000000..0e18d4a --- /dev/null +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.columniterator; + +import java.io.IOException; +import java.util.*; + +import com.google.common.collect.AbstractIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.AbstractPartitionData; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * A Cell Iterator in reversed clustering order over SSTable + */ +public class SSTableReversedIterator extends AbstractSSTableIterator +{ + private static final Logger logger = LoggerFactory.getLogger(SSTableReversedIterator.class); + + public SSTableReversedIterator(SSTableReader sstable, DecoratedKey key, ColumnFilter columns, boolean isForThrift) + { + this(sstable, null, key, sstable.getPosition(key, SSTableReader.Operator.EQ), columns, isForThrift); + } + + public SSTableReversedIterator(SSTableReader sstable, + FileDataInput file, + DecoratedKey key, + RowIndexEntry indexEntry, + ColumnFilter columns, + boolean isForThrift) + { + super(sstable, file, key, indexEntry, columns, isForThrift); + } + + protected Reader createReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) + { + return indexEntry.isIndexed() + ? new ReverseIndexedReader(indexEntry, file, isAtPartitionStart, shouldCloseFile) + : new ReverseReader(file, isAtPartitionStart, shouldCloseFile); + } + + public boolean isReverseOrder() + { + return true; + } + + private ReusablePartitionData createBuffer(int blocksCount) + { + int estimatedRowCount = 16; + int columnCount = metadata().partitionColumns().regulars.columnCount(); + if (columnCount == 0 || metadata().clusteringColumns().size() == 0) + { + estimatedRowCount = 1; + } + else + { + try + { + // To avoid wasted resizing we guess-estimate the number of rows we're likely to read. For that + // we use the stats on the number of rows per partition for that sstable. + // FIXME: so far we only keep stats on cells, so to get a rough estimate on the number of rows, + // we divide by the number of regular columns the table has. We should fix once we collect the + // stats on rows + int estimatedRowsPerPartition = (int)(sstable.getEstimatedColumnCount().percentile(0.75) / columnCount); + estimatedRowCount = Math.max(estimatedRowsPerPartition / blocksCount, 1); + } + catch (IllegalStateException e) + { + // The EstimatedHistogram mean() method can throw this (if it overflows). While such overflow + // shouldn't happen, it's not worth taking the risk of letting the exception bubble up. + } + } + return new ReusablePartitionData(metadata(), partitionKey(), DeletionTime.LIVE, columns(), estimatedRowCount); + } + + private class ReverseReader extends Reader + { + private ReusablePartitionData partition; + private UnfilteredRowIterator iterator; + + private ReverseReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) + { + super(file, shouldCloseFile); + assert isAtPartitionStart; + } + + public boolean hasNext() throws IOException + { + if (partition == null) + { + partition = createBuffer(1); + partition.populateFrom(this, null, null, new Tester() + { + public boolean isDone() + { + return false; + } + }); + iterator = partition.unfilteredIterator(columns, Slices.ALL, true); + } + return iterator.hasNext(); + } + + public Unfiltered next() throws IOException + { + if (!hasNext()) + throw new NoSuchElementException(); + return iterator.next(); + } + + public Iterator<Unfiltered> slice(final Slice slice) throws IOException + { + if (partition == null) + { + partition = createBuffer(1); + partition.populateFrom(this, slice.start(), slice.end(), new Tester() + { + public boolean isDone() + { + return false; + } + }); + } + + return partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true); + } + } + + private class ReverseIndexedReader extends IndexedReader + { + private ReusablePartitionData partition; + private UnfilteredRowIterator iterator; + + private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) + { + super(file, shouldCloseFile, indexEntry, isAtPartitionStart); + this.currentIndexIdx = indexEntry.columnsIndex().size(); + } + + public boolean hasNext() throws IOException + { + // If it's called before we've created the file, create it. This then mean + // we're reading from the end of the partition. + if (!isInit) + { + seekToPosition(indexEntry.position); + ByteBufferUtil.skipShortLength(file); // partition key + DeletionTime.serializer.skip(file); // partition deletion + if (sstable.header.hasStatic()) + UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper); + isInit = true; + } + + if (partition == null) + { + partition = createBuffer(indexes.size()); + partition.populateFrom(this, null, null, new Tester() + { + public boolean isDone() + { + return false; + } + }); + iterator = partition.unfilteredIterator(columns, Slices.ALL, true); + } + + return iterator.hasNext(); + } + + public Unfiltered next() throws IOException + { + if (!hasNext()) + throw new NoSuchElementException(); + return iterator.next(); + } + + private void prepareBlock(int blockIdx, Slice.Bound start, Slice.Bound end) throws IOException + { + updateBlock(blockIdx); + + if (partition == null) + partition = createBuffer(indexes.size()); + else + partition.clear(); + + final FileMark fileMark = mark; + final long width = currentIndex().width; + + partition.populateFrom(this, start, end, new Tester() + { + public boolean isDone() + { + return file.bytesPastMark(fileMark) >= width; + } + }); + } + + @Override + public Iterator<Unfiltered> slice(final Slice slice) throws IOException + { + // if our previous slicing already got us the smallest row in the sstable, we're done + if (currentIndexIdx < 0) + return Collections.emptyIterator(); + + final List<IndexHelper.IndexInfo> indexes = indexEntry.columnsIndex(); + + // Find the first index block we'll need to read for the slice. + final int startIdx = IndexHelper.indexFor(slice.end(), indexes, sstable.metadata.comparator, true, currentIndexIdx); + if (startIdx < 0) + return Collections.emptyIterator(); + + // Find the last index block we'll need to read for the slice. + int lastIdx = IndexHelper.indexFor(slice.start(), indexes, sstable.metadata.comparator, true, startIdx); + + // The index search is by firstname and so lastIdx is such that + // indexes[lastIdx].firstName < slice.start <= indexes[lastIdx + 1].firstName + // However, if indexes[lastIdx].lastName < slice.start we can bump lastIdx. + if (lastIdx >= 0 && metadata().comparator.compare(indexes.get(lastIdx).lastName, slice.start()) < 0) + ++lastIdx; + + final int endIdx = lastIdx; + + // Because we're reversed, even if it is our current block, we should re-prepare the block since we would + // have skipped anything not in the previous slice. + prepareBlock(startIdx, slice.start(), slice.end()); + + return new AbstractIterator<Unfiltered>() + { + private Iterator<Unfiltered> currentBlockIterator = partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true); + + protected Unfiltered computeNext() + { + try + { + if (currentBlockIterator.hasNext()) + return currentBlockIterator.next(); + + --currentIndexIdx; + if (currentIndexIdx < 0 || currentIndexIdx < endIdx) + return endOfData(); + + // Note that since we know we're read blocks backward, there is no point in checking the slice end, so we pass null + prepareBlock(currentIndexIdx, slice.start(), null); + currentBlockIterator = partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true); + return computeNext(); + } + catch (IOException e) + { + try + { + close(); + } + catch (IOException suppressed) + { + e.addSuppressed(suppressed); + } + sstable.markSuspect(); + throw new CorruptSSTableException(e, file.getPath()); + } + } + }; + } + } + + private abstract class Tester + { + public abstract boolean isDone(); + } + + private class ReusablePartitionData extends AbstractPartitionData + { + private final Writer rowWriter; + private final RangeTombstoneCollector markerWriter; + + private ReusablePartitionData(CFMetaData metadata, + DecoratedKey partitionKey, + DeletionTime deletionTime, + PartitionColumns columns, + int initialRowCapacity) + { + super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, false); + + this.rowWriter = new Writer(true); + // Note that even though the iterator handles the reverse case, this object holds the data for a single index bock, and we read index blocks in + // forward clustering order. + this.markerWriter = new RangeTombstoneCollector(false); + } + + // Note that this method is here rather than in the readers because we want to use it for both readers and they + // don't extend one another + private void populateFrom(Reader reader, Slice.Bound start, Slice.Bound end, Tester tester) throws IOException + { + // If we have a start bound, skip everything that comes before it. + while (reader.deserializer.hasNext() && start != null && reader.deserializer.compareNextTo(start) <= 0 && !tester.isDone()) + { + if (reader.deserializer.nextIsRow()) + reader.deserializer.skipNext(); + else + reader.updateOpenMarker((RangeTombstoneMarker)reader.deserializer.readNext()); + } + + // If we have an open marker, it's either one from what we just skipped (if start != null), or it's from the previous index block. + if (reader.openMarker != null) + { + // If we have no start but still an openMarker, this means we're indexed and it's coming from the previous block + Slice.Bound markerStart = start; + if (start == null) + { + ClusteringPrefix c = ((IndexedReader)reader).previousIndex().lastName; + markerStart = Slice.Bound.exclusiveStartOf(c); + } + writeMarker(markerStart, reader.openMarker); + } + + // Now deserialize everything until we reach our requested end (if we have one) + while (reader.deserializer.hasNext() + && (end == null || reader.deserializer.compareNextTo(end) <= 0) + && !tester.isDone()) + { + Unfiltered unfiltered = reader.deserializer.readNext(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + { + ((Row) unfiltered).copyTo(rowWriter); + } + else + { + RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered; + reader.updateOpenMarker(marker); + marker.copyTo(markerWriter); + } + } + + // If we have an open marker, we should close it before finishing + if (reader.openMarker != null) + { + // If we no end and still an openMarker, this means we're indexed and the marker can be close using the blocks end + Slice.Bound markerEnd = end; + if (end == null) + { + ClusteringPrefix c = ((IndexedReader)reader).currentIndex().lastName; + markerEnd = Slice.Bound.inclusiveEndOf(c); + } + writeMarker(markerEnd, reader.getAndClearOpenMarker()); + } + } + + private void writeMarker(Slice.Bound bound, DeletionTime dt) + { + bound.writeTo(markerWriter); + markerWriter.writeBoundDeletion(dt); + markerWriter.endOfMarker(); + } + + @Override + public void clear() + { + super.clear(); + rowWriter.reset(); + markerWriter.reset(); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java index 02072de..ec270dd 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -206,7 +206,7 @@ public class CommitLogArchiver descriptor = fromHeader; else descriptor = fromName; - if (descriptor.version > CommitLogDescriptor.VERSION_22) + if (descriptor.version > CommitLogDescriptor.current_version) throw new IllegalStateException("Unsupported commit log version: " + descriptor.version); if (descriptor.compression != null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java index c4728fd..1eb640e 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -57,12 +57,13 @@ public class CommitLogDescriptor public static final int VERSION_20 = 3; public static final int VERSION_21 = 4; public static final int VERSION_22 = 5; + public static final int VERSION_30 = 6; /** * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes. * Note: make sure to handle {@link #getMessagingVersion()} */ @VisibleForTesting - public static final int current_version = VERSION_22; + public static final int current_version = VERSION_30; final int version; public final long id; @@ -195,6 +196,8 @@ public class CommitLogDescriptor return MessagingService.VERSION_21; case VERSION_22: return MessagingService.VERSION_22; + case VERSION_30: + return MessagingService.VERSION_30; default: throw new IllegalStateException("Unknown commitlog version " + version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 176f64b..902f1c4 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -47,6 +47,8 @@ import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.compress.ICompressor; @@ -55,7 +57,6 @@ import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CRC32Factory; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -195,7 +196,7 @@ public class CommitLogReplayer abstract static class ReplayFilter { - public abstract Iterable<ColumnFamily> filter(Mutation mutation); + public abstract Iterable<PartitionUpdate> filter(Mutation mutation); public abstract boolean includes(CFMetaData metadata); @@ -227,9 +228,9 @@ public class CommitLogReplayer private static class AlwaysReplayFilter extends ReplayFilter { - public Iterable<ColumnFamily> filter(Mutation mutation) + public Iterable<PartitionUpdate> filter(Mutation mutation) { - return mutation.getColumnFamilies(); + return mutation.getPartitionUpdates(); } public boolean includes(CFMetaData metadata) @@ -247,17 +248,17 @@ public class CommitLogReplayer this.toReplay = toReplay; } - public Iterable<ColumnFamily> filter(Mutation mutation) + public Iterable<PartitionUpdate> filter(Mutation mutation) { final Collection<String> cfNames = toReplay.get(mutation.getKeyspaceName()); if (cfNames == null) return Collections.emptySet(); - return Iterables.filter(mutation.getColumnFamilies(), new Predicate<ColumnFamily>() + return Iterables.filter(mutation.getPartitionUpdates(), new Predicate<PartitionUpdate>() { - public boolean apply(ColumnFamily cf) + public boolean apply(PartitionUpdate upd) { - return cfNames.contains(cf.metadata().cfName); + return cfNames.contains(upd.metadata().cfName); } }); } @@ -330,7 +331,8 @@ public class CommitLogReplayer { int uncompressedLength = reader.readInt(); replayEnd = replayPos + uncompressedLength; - } else + } + else { replayEnd = end; } @@ -478,11 +480,10 @@ public class CommitLogReplayer { mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), desc.getMessagingVersion(), - ColumnSerializer.Flag.LOCAL); + SerializationHelper.Flag.LOCAL); // doublecheck that what we read is [still] valid for the current schema - for (ColumnFamily cf : mutation.getColumnFamilies()) - for (Cell cell : cf) - cf.getComparator().validate(cell.name()); + for (PartitionUpdate upd : mutation.getPartitionUpdates()) + upd.validate(); } catch (UnknownColumnFamilyException ex) { @@ -515,7 +516,7 @@ public class CommitLogReplayer } if (logger.isDebugEnabled()) - logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}"); + logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}"); Runnable runnable = new WrappedRunnable() { @@ -534,12 +535,12 @@ public class CommitLogReplayer // or c) are part of a cf that was dropped. // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. Mutation newMutation = null; - for (ColumnFamily columnFamily : replayFilter.filter(mutation)) + for (PartitionUpdate update : replayFilter.filter(mutation)) { - if (Schema.instance.getCF(columnFamily.id()) == null) + if (Schema.instance.getCF(update.metadata().cfId) == null) continue; // dropped - ReplayPosition rp = cfPositions.get(columnFamily.id()); + ReplayPosition rp = cfPositions.get(update.metadata().cfId); // replay if current segment is newer than last flushed one or, // if it is the last known segment, if we are after the replay position @@ -547,7 +548,7 @@ public class CommitLogReplayer { if (newMutation == null) newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); - newMutation.add(columnFamily); + newMutation.add(update); replayedCount.incrementAndGet(); } } @@ -571,9 +572,9 @@ public class CommitLogReplayer { long restoreTarget = CommitLog.instance.archiver.restorePointInTime; - for (ColumnFamily families : fm.getColumnFamilies()) + for (PartitionUpdate upd : fm.getPartitionUpdates()) { - if (CommitLog.instance.archiver.precision.toMillis(families.maxTimestamp()) > restoreTarget) + if (CommitLog.instance.archiver.precision.toMillis(upd.maxTimestamp()) > restoreTarget) return true; } return false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index d748006..7473845 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -45,8 +45,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.CLibrary; @@ -398,12 +398,12 @@ public abstract class CommitLogSegment void markDirty(Mutation mutation, int allocatedPosition) { - for (ColumnFamily columnFamily : mutation.getColumnFamilies()) + for (PartitionUpdate update : mutation.getPartitionUpdates()) { // check for deleted CFS - CFMetaData cfm = columnFamily.metadata(); + CFMetaData cfm = update.metadata(); if (cfm.isPurged()) - logger.error("Attempted to write commit log entry for unrecognized table: {}", columnFamily.id()); + logger.error("Attempted to write commit log entry for unrecognized table: {}", cfm.cfId); else ensureAtleast(cfDirty, cfm.cfId, allocatedPosition); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java deleted file mode 100644 index 16b5fac..0000000 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.compaction; - -import java.io.Closeable; -import java.io.IOException; -import java.security.MessageDigest; - -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.io.sstable.ColumnStats; -import org.apache.cassandra.io.util.SequentialWriter; - -/** - * a CompactedRow is an object that takes a bunch of rows (keys + columnfamilies) - * and can write a compacted version of those rows to an output stream. It does - * NOT necessarily require creating a merged CF object in memory. - */ -public abstract class AbstractCompactedRow implements Closeable -{ - public final DecoratedKey key; - - public AbstractCompactedRow(DecoratedKey key) - { - this.key = key; - } - - /** - * write the row (size + column index + filter + column data, but NOT row key) to @param out. - * - * write() may change internal state; it is NOT valid to call write() or update() a second time. - * - * @return index information for the written row, or null if the compaction resulted in only expired tombstones. - */ - public abstract RowIndexEntry write(long currentPosition, SequentialWriter out) throws IOException; - - /** - * update @param digest with the data bytes of the row (not including row key or row size). - * May be called even if empty. - * - * update() may change internal state; it is NOT valid to call write() or update() a second time. - */ - public abstract void update(MessageDigest digest); - - /** - * @return aggregate information about the columns in this row. Some fields may - * contain default values if computing them value would require extra effort we're not willing to make. - */ - public abstract ColumnStats columnStats(); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java deleted file mode 100644 index 9fe8fd9..0000000 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.compaction; - -import java.util.List; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.utils.CloseableIterator; - -public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow> -{ - protected final OperationType type; - protected final CompactionController controller; - protected final long totalBytes; - protected volatile long bytesRead; - protected final List<ISSTableScanner> scanners; - protected final UUID compactionId; - /* - * counters for merged rows. - * array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row), - * index 1 is counter for 2 rows merged, and so on. - */ - protected final AtomicLong[] mergeCounters; - - public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ISSTableScanner> scanners, UUID compactionId) - { - this.controller = controller; - this.type = type; - this.scanners = scanners; - this.bytesRead = 0; - this.compactionId = compactionId; - - long bytes = 0; - for (ISSTableScanner scanner : scanners) - bytes += scanner.getLengthInBytes(); - this.totalBytes = bytes; - mergeCounters = new AtomicLong[scanners.size()]; - for (int i = 0; i < mergeCounters.length; i++) - mergeCounters[i] = new AtomicLong(); - } - - public CompactionInfo getCompactionInfo() - { - return new CompactionInfo(controller.cfs.metadata, - type, - bytesRead, - totalBytes, - compactionId); - } - - protected void updateCounterFor(int rows) - { - assert rows > 0 && rows - 1 < mergeCounters.length; - mergeCounters[rows - 1].incrementAndGet(); - } - - public long[] getMergedRowCounts() - { - long[] counters = new long[mergeCounters.length]; - for (int i = 0; i < counters.length; i++) - counters[i] = mergeCounters[i].get(); - return counters; - } - - public abstract CloseableIterator<AbstractCompactedRow> iterator(); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 28144ca..d8499ea 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -36,6 +36,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index 81d8b7c..303de15 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -27,7 +27,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.utils.AlwaysPresentFilter; import org.apache.cassandra.utils.OverlapIterator; @@ -44,7 +44,7 @@ public class CompactionController implements AutoCloseable public final ColumnFamilyStore cfs; private Refs<SSTableReader> overlappingSSTables; - private OverlapIterator<RowPosition, SSTableReader> overlapIterator; + private OverlapIterator<PartitionPosition, SSTableReader> overlapIterator; private final Iterable<SSTableReader> compacting; public final int gcBefore; @@ -189,9 +189,9 @@ public class CompactionController implements AutoCloseable return min; } - public void invalidateCachedRow(DecoratedKey key) + public void invalidateCachedPartition(DecoratedKey key) { - cfs.invalidateCachedRow(key); + cfs.invalidateCachedPartition(key); } public void close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java deleted file mode 100644 index 23d8a4a..0000000 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.compaction; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.UUID; - -import com.google.common.collect.ImmutableList; - -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.io.sstable.format.SSTableFormat; -import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.MergeIterator; - -public class CompactionIterable extends AbstractCompactionIterable -{ - final SSTableFormat format; - - private static final Comparator<OnDiskAtomIterator> comparator = new Comparator<OnDiskAtomIterator>() - { - public int compare(OnDiskAtomIterator i1, OnDiskAtomIterator i2) - { - return i1.getKey().compareTo(i2.getKey()); - } - }; - - public CompactionIterable(OperationType type, - List<ISSTableScanner> scanners, - CompactionController controller, - SSTableFormat.Type formatType, - UUID compactionId) - { - super(controller, type, scanners, compactionId); - this.format = formatType.info; - } - - public CloseableIterator<AbstractCompactedRow> iterator() - { - return MergeIterator.get(scanners, comparator, new Reducer()); - } - - public String toString() - { - return this.getCompactionInfo().toString(); - } - - protected class Reducer extends MergeIterator.Reducer<OnDiskAtomIterator, AbstractCompactedRow> - { - protected final List<OnDiskAtomIterator> rows = new ArrayList<>(); - - public void reduce(OnDiskAtomIterator current) - { - rows.add(current); - } - - protected AbstractCompactedRow getReduced() - { - assert !rows.isEmpty(); - - CompactionIterable.this.updateCounterFor(rows.size()); - try - { - // create a new container for rows, since we're going to clear ours for the next one, - // and the AbstractCompactionRow code should be able to assume that the collection it receives - // won't be pulled out from under it. - return format.getCompactedRowWriter(controller, ImmutableList.copyOf(rows)); - } - finally - { - rows.clear(); - long n = 0; - for (ISSTableScanner scanner : scanners) - n += scanner.getCurrentPosition(); - bytesRead = n; - } - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java new file mode 100644 index 0000000..b3cb370 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.util.UUID; +import java.util.List; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.metrics.CompactionMetrics; + +/** + * Merge multiple iterators over the content of sstable into a "compacted" iterator. + * <p> + * On top of the actual merging the source iterators, this class: + * <ul> + * <li>purge gc-able tombstones if possible (see PurgingPartitionIterator below).</li> + * <li>update 2ndary indexes if necessary (as we don't read-before-write on index updates, index entries are + * not deleted on deletion of the base table data, which is ok because we'll fix index inconsistency + * on reads. This however mean that potentially obsolete index entries could be kept a long time for + * data that is not read often, so compaction "pro-actively" fix such index entries. This is mainly + * an optimization).</li> + * <li>invalidate cached partitions that are empty post-compaction. This avoids keeping partitions with + * only purgable tombstones in the row cache.</li> + * <li>keep tracks of the compaction progress.</li> + * </ul> + */ +public class CompactionIterator extends CompactionInfo.Holder implements UnfilteredPartitionIterator +{ + private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100; + + private final OperationType type; + private final CompactionController controller; + private final List<ISSTableScanner> scanners; + private final int nowInSec; + private final UUID compactionId; + + private final long totalBytes; + private long bytesRead; + + /* + * counters for merged rows. + * array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row), + * index 1 is counter for 2 rows merged, and so on. + */ + private final long[] mergeCounters; + + private final UnfilteredPartitionIterator mergedIterator; + private final CompactionMetrics metrics; + + // The number of row/RT merged by the iterator + private int merged; + + public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId) + { + this(type, scanners, controller, nowInSec, compactionId, null); + } + + @SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable + public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics) + { + this.controller = controller; + this.type = type; + this.scanners = scanners; + this.nowInSec = nowInSec; + this.compactionId = compactionId; + this.bytesRead = 0; + + long bytes = 0; + for (ISSTableScanner scanner : scanners) + bytes += scanner.getLengthInBytes(); + this.totalBytes = bytes; + this.mergeCounters = new long[scanners.size()]; + this.metrics = metrics; + + if (metrics != null) + metrics.beginCompaction(this); + + this.mergedIterator = scanners.isEmpty() + ? UnfilteredPartitionIterators.EMPTY + : UnfilteredPartitionIterators.convertExpiredCellsToTombstones(new PurgingPartitionIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller), nowInSec); + } + + public boolean isForThrift() + { + return false; + } + + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(controller.cfs.metadata, + type, + bytesRead, + totalBytes, + compactionId); + } + + private void updateCounterFor(int rows) + { + assert rows > 0 && rows - 1 < mergeCounters.length; + mergeCounters[rows - 1] += 1; + } + + public long[] getMergedRowCounts() + { + return mergeCounters; + } + + private UnfilteredPartitionIterators.MergeListener listener() + { + return new UnfilteredPartitionIterators.MergeListener() + { + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + int merged = 0; + for (UnfilteredRowIterator iter : versions) + { + if (iter != null) + merged++; + } + + assert merged > 0; + + CompactionIterator.this.updateCounterFor(merged); + + /* + * The row level listener does 2 things: + * - It updates 2ndary indexes for deleted/shadowed cells + * - It updates progress regularly (every UNFILTERED_TO_UPDATE_PROGRESS) + */ + final SecondaryIndexManager.Updater indexer = type == OperationType.COMPACTION + ? controller.cfs.indexManager.gcUpdaterFor(partitionKey, nowInSec) + : SecondaryIndexManager.nullUpdater; + + return new UnfilteredRowIterators.MergeListener() + { + private Clustering clustering; + + public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + } + + public void onMergingRows(Clustering clustering, LivenessInfo mergedInfo, DeletionTime mergedDeletion, Row[] versions) + { + this.clustering = clustering; + } + + public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedCompositeDeletion, DeletionTime[] versions) + { + } + + public void onMergedCells(Cell mergedCell, Cell[] versions) + { + if (indexer == SecondaryIndexManager.nullUpdater) + return; + + for (int i = 0; i < versions.length; i++) + { + Cell version = versions[i]; + if (version != null && (mergedCell == null || !mergedCell.equals(version))) + indexer.remove(clustering, version); + } + } + + public void onRowDone() + { + int merged = ++CompactionIterator.this.merged; + if (merged % UNFILTERED_TO_UPDATE_PROGRESS == 0) + updateBytesRead(); + } + + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions) + { + int merged = ++CompactionIterator.this.merged; + if (merged % UNFILTERED_TO_UPDATE_PROGRESS == 0) + updateBytesRead(); + } + + public void close() + { + } + }; + } + + public void close() + { + } + }; + } + + private void updateBytesRead() + { + long n = 0; + for (ISSTableScanner scanner : scanners) + n += scanner.getCurrentPosition(); + bytesRead = n; + } + + public boolean hasNext() + { + return mergedIterator.hasNext(); + } + + public UnfilteredRowIterator next() + { + return mergedIterator.next(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + public void close() + { + try + { + mergedIterator.close(); + } + finally + { + if (metrics != null) + metrics.finishCompaction(this); + } + } + + public String toString() + { + return this.getCompactionInfo().toString(); + } + + private class PurgingPartitionIterator extends TombstonePurgingPartitionIterator + { + private final CompactionController controller; + + private DecoratedKey currentKey; + private long maxPurgeableTimestamp; + private boolean hasCalculatedMaxPurgeableTimestamp; + + private PurgingPartitionIterator(UnfilteredPartitionIterator toPurge, CompactionController controller) + { + super(toPurge, controller.gcBefore); + this.controller = controller; + } + + @Override + protected void onEmpty(DecoratedKey key) + { + if (type == OperationType.COMPACTION) + controller.cfs.invalidateCachedPartition(key); + } + + @Override + protected boolean shouldFilter(UnfilteredRowIterator iterator) + { + currentKey = iterator.partitionKey(); + hasCalculatedMaxPurgeableTimestamp = false; + + // TODO: we could be able to skip filtering if UnfilteredRowIterator was giving us some stats + // (like the smallest local deletion time). + return true; + } + + /* + * Tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable + * containing `currentKey` outside of the set of sstables involved in this compaction. This is computed lazily + * on demand as we only need this if there is tombstones and this a bit expensive (see #8914). + */ + protected long getMaxPurgeableTimestamp() + { + if (!hasCalculatedMaxPurgeableTimestamp) + { + hasCalculatedMaxPurgeableTimestamp = true; + maxPurgeableTimestamp = controller.maxPurgeableTimestamp(currentKey); + } + return maxPurgeableTimestamp; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 763871e..a6c3d8c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -41,7 +41,6 @@ import javax.management.ObjectName; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; -import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.*; import com.google.common.util.concurrent.*; @@ -57,6 +56,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionInfo.Holder; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.index.SecondaryIndexBuilder; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Bounds; @@ -71,7 +71,6 @@ import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.repair.Validator; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MerkleTree; @@ -235,7 +234,7 @@ public class CompactionManager implements CompactionManagerMBean } CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); - AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs)); + AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, FBUtilities.nowInSeconds())); if (task == null) { logger.debug("No tasks available"); @@ -426,7 +425,7 @@ public class CompactionManager implements CompactionManagerMBean @Override public void execute(LifecycleTransaction txn) throws IOException { - CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges); + CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges, FBUtilities.nowInSeconds()); doCleanupOne(cfStore, txn, cleanupStrategy, ranges, hasIndexes); } }, OperationType.CLEANUP); @@ -541,7 +540,7 @@ public class CompactionManager implements CompactionManagerMBean public void performMaximal(final ColumnFamilyStore cfStore, boolean splitOutput) { - FBUtilities.waitOnFutures(submitMaximal(cfStore, getDefaultGcBefore(cfStore), splitOutput)); + FBUtilities.waitOnFutures(submitMaximal(cfStore, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()), splitOutput)); } public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore, boolean splitOutput) @@ -595,8 +594,9 @@ public class CompactionManager implements CompactionManagerMBean } List<Future<?>> futures = new ArrayList<>(); + int nowInSec = FBUtilities.nowInSeconds(); for (ColumnFamilyStore cfs : descriptors.keySet()) - futures.add(submitUserDefined(cfs, descriptors.get(cfs), getDefaultGcBefore(cfs))); + futures.add(submitUserDefined(cfs, descriptors.get(cfs), getDefaultGcBefore(cfs, nowInSec))); FBUtilities.waitOnFutures(futures); } @@ -817,29 +817,29 @@ public class CompactionManager implements CompactionManagerMBean if (compactionFileLocation == null) throw new IOException("disk full"); - ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter()); - CleanupInfo ci = new CleanupInfo(sstable, scanner); - - metrics.beginCompaction(ci); List<SSTableReader> finished; + int nowInSec = FBUtilities.nowInSeconds(); try (SSTableRewriter writer = new SSTableRewriter(cfs, txn, sstable.maxDataAge, false); - CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs))) + ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter()); + CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec)); + CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics)) { writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable)); - while (scanner.hasNext()) + while (ci.hasNext()) { if (ci.isStopRequested()) throw new CompactionInterruptedException(ci.getCompactionInfo()); - @SuppressWarnings("resource") - SSTableIdentityIterator row = cleanupStrategy.cleanup((SSTableIdentityIterator) scanner.next()); - if (row == null) - continue; - @SuppressWarnings("resource") - AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row)); - if (writer.append(compactedRow) != null) - totalkeysWritten++; + try (UnfilteredRowIterator partition = ci.next(); + UnfilteredRowIterator notCleaned = cleanupStrategy.cleanup(partition)) + { + if (notCleaned == null) + continue; + + if (writer.append(notCleaned) != null) + totalkeysWritten++; + } } // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd @@ -847,11 +847,6 @@ public class CompactionManager implements CompactionManagerMBean finished = writer.finish(); } - finally - { - scanner.close(); - metrics.finishCompaction(ci); - } if (!finished.isEmpty()) { @@ -869,23 +864,30 @@ public class CompactionManager implements CompactionManagerMBean private static abstract class CleanupStrategy { - public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges) + protected final Collection<Range<Token>> ranges; + protected final int nowInSec; + + protected CleanupStrategy(Collection<Range<Token>> ranges, int nowInSec) + { + this.ranges = ranges; + this.nowInSec = nowInSec; + } + + public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec) { return cfs.indexManager.hasIndexes() - ? new Full(cfs, ranges) - : new Bounded(cfs, ranges); + ? new Full(cfs, ranges, nowInSec) + : new Bounded(cfs, ranges, nowInSec); } public abstract ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter); - public abstract SSTableIdentityIterator cleanup(SSTableIdentityIterator row); + public abstract UnfilteredRowIterator cleanup(UnfilteredRowIterator partition); private static final class Bounded extends CleanupStrategy { - private final Collection<Range<Token>> ranges; - - public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges) + public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec) { - this.ranges = ranges; + super(ranges, nowInSec); cacheCleanupExecutor.submit(new Runnable() { @Override @@ -894,8 +896,8 @@ public class CompactionManager implements CompactionManagerMBean cfs.cleanupCache(); } }); - } + @Override public ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter) { @@ -903,23 +905,20 @@ public class CompactionManager implements CompactionManagerMBean } @Override - public SSTableIdentityIterator cleanup(SSTableIdentityIterator row) + public UnfilteredRowIterator cleanup(UnfilteredRowIterator partition) { - return row; + return partition; } } private static final class Full extends CleanupStrategy { - private final Collection<Range<Token>> ranges; private final ColumnFamilyStore cfs; - private List<Cell> indexedColumnsInRow; - public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges) + public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec) { + super(ranges, nowInSec); this.cfs = cfs; - this.ranges = ranges; - this.indexedColumnsInRow = null; } @Override @@ -929,36 +928,17 @@ public class CompactionManager implements CompactionManagerMBean } @Override - public SSTableIdentityIterator cleanup(SSTableIdentityIterator row) + public UnfilteredRowIterator cleanup(UnfilteredRowIterator partition) { - if (Range.isInRanges(row.getKey().getToken(), ranges)) - return row; + if (Range.isInRanges(partition.partitionKey().getToken(), ranges)) + return partition; - cfs.invalidateCachedRow(row.getKey()); + cfs.invalidateCachedPartition(partition.partitionKey()); - if (indexedColumnsInRow != null) - indexedColumnsInRow.clear(); - - while (row.hasNext()) - { - OnDiskAtom column = row.next(); - - if (column instanceof Cell && cfs.indexManager.indexes((Cell) column)) - { - if (indexedColumnsInRow == null) - indexedColumnsInRow = new ArrayList<>(); - - indexedColumnsInRow.add((Cell) column); - } - } - - if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty()) + // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712 + try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start()) { - // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712 - try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start()) - { - cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow, opGroup); - } + cfs.indexManager.deleteFromIndexes(partition, opGroup, nowInSec); } return null; } @@ -978,14 +958,15 @@ public class CompactionManager implements CompactionManagerMBean expectedBloomFilterSize, repairedAt, sstable.getSSTableLevel(), - cfs.partitioner); + cfs.partitioner, + sstable.header); } public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs, - File compactionFileLocation, - int expectedBloomFilterSize, - long repairedAt, - Collection<SSTableReader> sstables) + File compactionFileLocation, + int expectedBloomFilterSize, + long repairedAt, + Collection<SSTableReader> sstables) { FileUtils.createDirectory(compactionFileLocation); int minLevel = Integer.MAX_VALUE; @@ -1008,7 +989,8 @@ public class CompactionManager implements CompactionManagerMBean repairedAt, cfs.metadata, cfs.partitioner, - new MetadataCollector(sstables, cfs.metadata.comparator, minLevel)); + new MetadataCollector(sstables, cfs.metadata.comparator, minLevel), + SerializationHeader.make(cfs.metadata, sstables)); } @@ -1033,6 +1015,7 @@ public class CompactionManager implements CompactionManagerMBean String snapshotName = validator.desc.sessionId.toString(); int gcBefore; + int nowInSec = FBUtilities.nowInSeconds(); boolean isSnapshotValidation = cfs.snapshotExists(snapshotName); if (isSnapshotValidation) { @@ -1046,7 +1029,7 @@ public class CompactionManager implements CompactionManagerMBean // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case // 'as good as in the non-snapshot' case) - gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName)); + gcBefore = cfs.gcBefore((int)(cfs.getSnapshotCreationTime(snapshotName) / 1000)); } else { @@ -1084,7 +1067,7 @@ public class CompactionManager implements CompactionManagerMBean if (validator.gcBefore > 0) gcBefore = validator.gcBefore; else - gcBefore = getDefaultGcBefore(cfs); + gcBefore = getDefaultGcBefore(cfs, nowInSec); } // Create Merkle tree suitable to hold estimated partitions for given range. @@ -1099,32 +1082,28 @@ public class CompactionManager implements CompactionManagerMBean MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); long start = System.nanoTime(); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range)) + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range); + ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore); + CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics)) { - CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore); - Iterator<AbstractCompactedRow> iter = ci.iterator(); - metrics.beginCompaction(ci); - try + // validate the CF as we iterate over it + validator.prepare(cfs, tree); + while (ci.hasNext()) { - // validate the CF as we iterate over it - validator.prepare(cfs, tree); - while (iter.hasNext()) + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + try (UnfilteredRowIterator partition = ci.next()) { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - AbstractCompactedRow row = iter.next(); - validator.add(row); + validator.add(partition); } - validator.complete(); } - finally + validator.complete(); + } + finally + { + if (isSnapshotValidation) { - if (isSnapshotValidation) - { - cfs.clearSnapshot(snapshotName); - } - - metrics.finishCompaction(ci); + cfs.clearSnapshot(snapshotName); } } @@ -1209,45 +1188,38 @@ public class CompactionManager implements CompactionManagerMBean File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); long repairedKeyCount = 0; long unrepairedKeyCount = 0; + int nowInSec = FBUtilities.nowInSeconds(); + CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false); SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false); AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals()); - CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs))) + CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec)); + CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics)) { int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet))); repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet)); unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet)); - CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()); - metrics.beginCompaction(ci); - try + while (ci.hasNext()) { - @SuppressWarnings("resource") - CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); - while (iter.hasNext()) + try (UnfilteredRowIterator partition = ci.next()) { - @SuppressWarnings("resource") - AbstractCompactedRow row = iter.next(); // if current range from sstable is repaired, save it into the new repaired sstable - if (Range.isInRanges(row.key.getToken(), ranges)) + if (Range.isInRanges(partition.partitionKey().getToken(), ranges)) { - repairedSSTableWriter.append(row); + repairedSSTableWriter.append(partition); repairedKeyCount++; } // otherwise save into the new 'non-repaired' table else { - unRepairedSSTableWriter.append(row); + unRepairedSSTableWriter.append(partition); unrepairedKeyCount++; } } } - finally - { - metrics.finishCompaction(ci); - } List<SSTableReader> anticompactedSSTables = new ArrayList<>(); // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method, @@ -1342,19 +1314,18 @@ public class CompactionManager implements CompactionManagerMBean return executor.submit(runnable); } - public static int getDefaultGcBefore(ColumnFamilyStore cfs) + public static int getDefaultGcBefore(ColumnFamilyStore cfs, int nowInSec) { // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to // add any GcGrace however since 2ndary indexes are local to a node. - return cfs.isIndex() ? (int) (System.currentTimeMillis() / 1000) : cfs.gcBefore(System.currentTimeMillis()); + return cfs.isIndex() ? nowInSec : cfs.gcBefore(nowInSec); } - private static class ValidationCompactionIterable extends CompactionIterable + private static class ValidationCompactionIterator extends CompactionIterator { - @SuppressWarnings("resource") - public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ISSTableScanner> scanners, int gcBefore) + public ValidationCompactionIterator(List<ISSTableScanner> scanners, ValidationCompactionController controller, int nowInSec, CompactionMetrics metrics) { - super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore), DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()); + super(OperationType.VALIDATION, scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics); } } @@ -1518,36 +1489,6 @@ public class CompactionManager implements CompactionManagerMBean return metrics.completedTasks.getValue(); } - private static class CleanupInfo extends CompactionInfo.Holder - { - private final SSTableReader sstable; - private final ISSTableScanner scanner; - private final UUID cleanupCompactionId; - - public CleanupInfo(SSTableReader sstable, ISSTableScanner scanner) - { - this.sstable = sstable; - this.scanner = scanner; - cleanupCompactionId = UUIDGen.getTimeUUID(); - } - - public CompactionInfo getCompactionInfo() - { - try - { - return new CompactionInfo(sstable.metadata, - OperationType.CLEANUP, - scanner.getCurrentPosition(), - scanner.getLengthInBytes(), - cleanupCompactionId); - } - catch (Exception e) - { - throw new RuntimeException(); - } - } - } - public void stopCompaction(String type) { OperationType operation = OperationType.valueOf(type); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index dd6261c..cfe28e8 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -339,11 +339,11 @@ public class CompactionStrategyManager implements INotificationConsumer * @param range * @return */ + @SuppressWarnings("resource") public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) { List<SSTableReader> repairedSSTables = new ArrayList<>(); List<SSTableReader> unrepairedSSTables = new ArrayList<>(); - for (SSTableReader sstable : sstables) { if (sstable.isRepaired()) @@ -352,24 +352,24 @@ public class CompactionStrategyManager implements INotificationConsumer unrepairedSSTables.add(sstable); } - List<ISSTableScanner> scanners = new ArrayList<>(); - if (!repairedSSTables.isEmpty()) - scanners.addAll(repaired.getScanners(repairedSSTables, range).scanners); - if (!unrepairedSSTables.isEmpty()) - scanners.addAll(unrepaired.getScanners(unrepairedSSTables, range).scanners); + AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range); + AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range); + List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size()); + scanners.addAll(repairedScanners.scanners); + scanners.addAll(unrepairedScanners.scanners); return new AbstractCompactionStrategy.ScannerList(scanners); } - public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) + public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables) { - return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup); + return getScanners(sstables, null); } - public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables) + public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) { - return getScanners(sstables, null); + return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup); } public long getMaxSSTableBytes() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 7dbeb44..911b4af 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -45,6 +45,7 @@ import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorSt import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Refs; @@ -142,70 +143,63 @@ public class CompactionTask extends AbstractCompactionTask logger.info("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg); long start = System.nanoTime(); - long totalKeysWritten = 0; - long estimatedKeys = 0; try (CompactionController controller = getCompactionController(transaction.originals())) { Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); - SSTableFormat.Type sstableFormat = getFormatType(transaction.originals()); - List<SSTableReader> newSStables; - AbstractCompactionIterable ci; + + long[] mergedRowCounts; // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. // See CASSANDRA-8019 and CASSANDRA-8399 + int nowInSec = FBUtilities.nowInSeconds(); try (Refs<SSTableReader> refs = Refs.ref(actuallyCompact); - AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) + AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact); + CompactionIterator ci = new CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, taskId)) { + if (collector != null) + collector.beginCompaction(ci); + long lastCheckObsoletion = start; + + if (!controller.cfs.getCompactionStrategyManager().isActive) + throw new CompactionInterruptedException(ci.getCompactionInfo()); - ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId); - try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator()) + try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) { - if (collector != null) - collector.beginCompaction(ci); - long lastCheckObsoletion = start; + estimatedKeys = writer.estimatedKeys(); + while (ci.hasNext()) + { + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); - if (!controller.cfs.getCompactionStrategyManager().isActive) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + if (writer.append(ci.next())) + totalKeysWritten++; - try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) - { - estimatedKeys = writer.estimatedKeys(); - while (iter.hasNext()) + if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - - try (AbstractCompactedRow row = iter.next()) - { - if (writer.append(row)) - totalKeysWritten++; - - if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) - { - controller.maybeRefreshOverlaps(); - lastCheckObsoletion = System.nanoTime(); - } - } + controller.maybeRefreshOverlaps(); + lastCheckObsoletion = System.nanoTime(); } - - // don't replace old sstables yet, as we need to mark the compaction finished in the system table - newSStables = writer.finish(); } - finally - { - // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones - // (in replaceCompactedSSTables) - if (taskId != null) - SystemKeyspace.finishCompaction(taskId); - if (collector != null) - collector.finishCompaction(ci); - } + // don't replace old sstables yet, as we need to mark the compaction finished in the system table + newSStables = writer.finish(); + } + finally + { + // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones + // (in replaceCompactedSSTables) + if (taskId != null) + SystemKeyspace.finishCompaction(taskId); + + if (collector != null) + collector.finishCompaction(ci); + + mergedRowCounts = ci.getMergedRowCounts(); } } @@ -221,7 +215,7 @@ public class CompactionTask extends AbstractCompactionTask double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; long totalSourceRows = 0; - String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); + String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize); logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); @@ -242,14 +236,13 @@ public class CompactionTask extends AbstractCompactionTask } - public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, AbstractCompactionIterable ci, long startSize, long endSize) + public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize) { - long[] counts = ci.getMergedRowCounts(); - StringBuilder mergeSummary = new StringBuilder(counts.length * 10); + StringBuilder mergeSummary = new StringBuilder(mergedRowCounts.length * 10); Map<Integer, Long> mergedRows = new HashMap<>(); - for (int i = 0; i < counts.length; i++) + for (int i = 0; i < mergedRowCounts.length; i++) { - long count = counts[i]; + long count = mergedRowCounts[i]; if (count == 0) continue; @@ -305,13 +298,4 @@ public class CompactionTask extends AbstractCompactionTask } return max; } - - public static SSTableFormat.Type getFormatType(Collection<SSTableReader> sstables) - { - if (sstables.isEmpty() || !SSTableFormat.enableSSTableDevelopmentTestMode) - return DatabaseDescriptor.getSSTableFormat(); - - //Allows us to test compaction of non-default formats - return sstables.iterator().next().descriptor.formatType; - } }
