Updated Branches: refs/heads/cassandra-1.2 6b093b4ea -> ac9f4786e
Improve removal of gcable tomstones during minor compaction patch by Vijay; reviewed by Yuki Morishita for CASSANDRA-4671 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ac9f4786 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ac9f4786 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ac9f4786 Branch: refs/heads/cassandra-1.2 Commit: ac9f4786e4d7b3b1a414838eec465f8553bf43b2 Parents: 6b093b4 Author: Vijay Parthasarathy <[email protected]> Authored: Thu Dec 27 18:45:21 2012 -0800 Committer: Vijay Parthasarathy <[email protected]> Committed: Thu Dec 27 18:45:21 2012 -0800 ---------------------------------------------------------------------- src/java/org/apache/cassandra/db/Column.java | 5 ++ src/java/org/apache/cassandra/db/ColumnFamily.java | 4 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 1 + src/java/org/apache/cassandra/db/DeletionInfo.java | 10 ++++ src/java/org/apache/cassandra/db/OnDiskAtom.java | 3 +- .../org/apache/cassandra/db/RangeTombstone.java | 5 ++ src/java/org/apache/cassandra/db/SuperColumn.java | 8 +++ .../db/compaction/CompactionController.java | 4 +- .../cassandra/db/compaction/CompactionManager.java | 2 +- .../db/compaction/LazilyCompactedRow.java | 10 +++- .../cassandra/db/compaction/PrecompactedRow.java | 4 +- .../apache/cassandra/db/compaction/Scrubber.java | 2 +- .../apache/cassandra/io/sstable/ColumnStats.java | 4 +- .../apache/cassandra/io/sstable/Descriptor.java | 5 ++- .../cassandra/io/sstable/SSTableMetadata.java | 21 +++++++- .../apache/cassandra/io/sstable/SSTableReader.java | 5 ++ .../apache/cassandra/io/sstable/SSTableWriter.java | 3 + .../db/compaction/CompactionsPurgeTest.java | 39 ++++++++++++++- .../cassandra/io/sstable/DescriptorTest.java | 1 + .../io/sstable/SSTableMetadataSerializerTest.java | 4 ++ .../apache/cassandra/streaming/BootstrapTest.java | 2 +- 21 files changed, 125 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/Column.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java index 23eaa7e..616f3c0 100644 --- a/src/java/org/apache/cassandra/db/Column.java +++ b/src/java/org/apache/cassandra/db/Column.java @@ -102,6 +102,11 @@ public class Column implements IColumn return timestamp; } + public long minTimestamp() + { + return timestamp; + } + public long maxTimestamp() { return timestamp; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index c2dd118..6f19fc4 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -380,16 +380,18 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn public ColumnStats getColumnStats() { + long minTimestampSeen = deletionInfo() == DeletionInfo.LIVE ? Long.MAX_VALUE : deletionInfo().minTimestamp(); long maxTimestampSeen = deletionInfo().maxTimestamp(); StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE); for (IColumn column : columns) { + minTimestampSeen = Math.min(minTimestampSeen, column.minTimestamp()); maxTimestampSeen = Math.max(maxTimestampSeen, column.maxTimestamp()); int deletionTime = column.getLocalDeletionTime(); if (deletionTime < Integer.MAX_VALUE) tombstones.update(deletionTime); } - return new ColumnStats(getColumnCount(), maxTimestampSeen, tombstones); + return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, tombstones); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 7b8a149..44585ce 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1947,6 +1947,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // and adds generation of live ancestors for (SSTableReader sstable : sstables) { + sstableMetadataCollector.updateMinTimestamp(sstable.getMinTimestamp()); sstableMetadataCollector.updateMaxTimestamp(sstable.getMaxTimestamp()); sstableMetadataCollector.addAncestor(sstable.descriptor.generation); for (Integer i : sstable.getAncestors()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/DeletionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java index b63686f..be64224 100644 --- a/src/java/org/apache/cassandra/db/DeletionInfo.java +++ b/src/java/org/apache/cassandra/db/DeletionInfo.java @@ -178,6 +178,16 @@ public class DeletionInfo } } + public long minTimestamp() + { + long minTimestamp = topLevel.markedForDeleteAt; + for (RangeTombstone i : ranges) + { + minTimestamp = Math.min(minTimestamp, i.data.markedForDeleteAt); + } + return minTimestamp; + } + /** * The maximum timestamp mentioned by this DeletionInfo. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/OnDiskAtom.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java index f504ce8..7501d83 100644 --- a/src/java/org/apache/cassandra/db/OnDiskAtom.java +++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java @@ -34,8 +34,9 @@ public interface OnDiskAtom /** * For a standard column, this is the same as timestamp(). - * For a super column, this is the max column timestamp of the sub columns. + * For a super column, this is the min/max column timestamp of the sub columns. */ + public long minTimestamp(); public long maxTimestamp(); public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/RangeTombstone.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java index 1b85c59..1d472c3 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstone.java +++ b/src/java/org/apache/cassandra/db/RangeTombstone.java @@ -57,6 +57,11 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement return data.localDeletionTime; } + public long minTimestamp() + { + return data.markedForDeleteAt; + } + public long maxTimestamp() { return data.markedForDeleteAt; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/SuperColumn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SuperColumn.java b/src/java/org/apache/cassandra/db/SuperColumn.java index f95dc8f..57e87c4 100644 --- a/src/java/org/apache/cassandra/db/SuperColumn.java +++ b/src/java/org/apache/cassandra/db/SuperColumn.java @@ -154,6 +154,14 @@ public class SuperColumn extends AbstractColumnContainer implements IColumn throw new UnsupportedOperationException("This operation is not supported for Super Columns."); } + public long minTimestamp() + { + long minTimestamp = getMarkedForDeleteAt(); + for (IColumn subColumn : getSubColumns()) + minTimestamp = Math.min(minTimestamp, subColumn.minTimestamp()); + return minTimestamp; + } + public long maxTimestamp() { long maxTimestamp = getMarkedForDeleteAt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index 2c8ddba..6104486 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -109,12 +109,12 @@ public class CompactionController * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row * are included in the compaction set */ - public boolean shouldPurge(DecoratedKey key) + public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp) { List<SSTableReader> filteredSSTables = overlappingTree.search(key); for (SSTableReader sstable : filteredSSTables) { - if (sstable.getBloomFilter().isPresent(key.key)) + if (sstable.getBloomFilter().isPresent(key.key) && sstable.getMinTimestamp() >= maxDeletionTimestamp) return false; } return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 36fa00f..0a31888 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -929,7 +929,7 @@ public class CompactionManager implements CompactionManagerMBean } @Override - public boolean shouldPurge(DecoratedKey key) + public boolean shouldPurge(DecoratedKey key, long delTimestamp) { /* * The main reason we always purge is that including gcable tombstone would mean that the http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 084af98..7981d88 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -71,18 +71,20 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable super(rows.get(0).getKey()); this.rows = rows; this.controller = controller; - this.shouldPurge = controller.shouldPurge(key); indexer = controller.cfs.indexManager.updaterFor(key, false); + long maxDelTimestamp = Long.MIN_VALUE; for (OnDiskAtomIterator row : rows) { ColumnFamily cf = row.getColumnFamily(); + maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp()); if (emptyColumnFamily == null) emptyColumnFamily = cf; else emptyColumnFamily.delete(cf); } + this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp); try { @@ -94,7 +96,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable } // reach into the reducer used during iteration to get column count, size, max column timestamp // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null) - columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, reducer == null ? Long.MIN_VALUE : reducer.maxTimestampSeen, + columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, + reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen, + reducer == null ? Long.MIN_VALUE : reducer.maxTimestampSeen, reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones ); columnSerializedSize = reducer == null ? 0 : reducer.serializedSize; @@ -236,6 +240,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable long serializedSize = 4; // int for column count int columns = 0; + long minTimestampSeen = Long.MAX_VALUE; long maxTimestampSeen = Long.MIN_VALUE; StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE); @@ -290,6 +295,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable serializedSize += reduced.serializedSizeForSSTable(); columns++; + minTimestampSeen = Math.min(minTimestampSeen, reduced.minTimestamp()); maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp()); int deletionTime = reduced.getLocalDeletionTime(); if (deletionTime < Integer.MAX_VALUE) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java index 2d7f55a..be4b20e 100644 --- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java @@ -59,7 +59,7 @@ public class PrecompactedRow extends AbstractCompactedRow Boolean shouldPurge = null; if (cf.hasIrrelevantData(controller.gcBefore)) - shouldPurge = controller.shouldPurge(key); + shouldPurge = controller.shouldPurge(key, cf.maxTimestamp()); // We should only gc tombstone if shouldPurge == true. But otherwise, // it is still ok to collect column that shadowed by their (deleted) @@ -69,7 +69,7 @@ public class PrecompactedRow extends AbstractCompactedRow if (compacted != null && compacted.metadata().getDefaultValidator().isCommutative()) { if (shouldPurge == null) - shouldPurge = controller.shouldPurge(key); + shouldPurge = controller.shouldPurge(key, cf.deletionInfo().maxTimestamp()); if (shouldPurge) CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index c6855e8..0601857 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -353,7 +353,7 @@ public class Scrubber implements Closeable } @Override - public boolean shouldPurge(DecoratedKey key) + public boolean shouldPurge(DecoratedKey key, long delTimestamp) { return false; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/io/sstable/ColumnStats.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java index a7dcfec..12ef534 100644 --- a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java +++ b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java @@ -28,13 +28,15 @@ public class ColumnStats public final int columnCount; /** the largest (client-supplied) timestamp in the row */ + public final long minTimestamp; public final long maxTimestamp; /** histogram of tombstone drop time */ public final StreamingHistogram tombstoneHistogram; - public ColumnStats(int columnCount, long maxTimestamp, StreamingHistogram tombstoneHistogram) + public ColumnStats(int columnCount, long minTimestamp, long maxTimestamp, StreamingHistogram tombstoneHistogram) { + this.minTimestamp = minTimestamp; this.maxTimestamp = maxTimestamp; this.columnCount = columnCount; this.tombstoneHistogram = tombstoneHistogram; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index 18c8a41..cf1907e 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -47,7 +47,7 @@ public class Descriptor public static class Version { // This needs to be at the begining for initialization sake - private static final String current_version = "ia"; + private static final String current_version = "ib"; public static final Version LEGACY = new Version("a"); // "pre-history" // b (0.7.0): added version to sstable filenames @@ -65,6 +65,7 @@ public class Descriptor // ia (1.2.0): column indexes are promoted to the index file // records estimated histogram of deletion times in tombstones // bloom filter (keys and columns) upgraded to Murmur3 + // ib (1.2.1): tracks min client timestamp in metadata component public static final Version CURRENT = new Version(current_version); @@ -77,6 +78,7 @@ public class Descriptor public final boolean metadataIncludesReplayPosition; public final boolean metadataIncludesModernReplayPosition; public final boolean tracksMaxTimestamp; + public final boolean tracksMinTimestamp; public final boolean hasCompressionRatio; public final boolean hasPartitioner; public final boolean tracksTombstones; @@ -94,6 +96,7 @@ public class Descriptor hasCompressionRatio = version.compareTo("hb") >= 0; hasPartitioner = version.compareTo("hc") >= 0; tracksMaxTimestamp = version.compareTo("hd") >= 0; + tracksMinTimestamp = version.compareTo("ib") >= 0; hasAncestors = version.compareTo("he") >= 0; metadataIncludesModernReplayPosition = version.compareTo("hf") >= 0; tracksTombstones = version.compareTo("ia") >= 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java index 7932c88..e53fdf9 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java @@ -51,6 +51,7 @@ public class SSTableMetadata public final EstimatedHistogram estimatedRowSize; public final EstimatedHistogram estimatedColumnCount; public final ReplayPosition replayPosition; + public final long minTimestamp; public final long maxTimestamp; public final double compressionRatio; public final String partitioner; @@ -62,6 +63,7 @@ public class SSTableMetadata this(defaultRowSizeHistogram(), defaultColumnCountHistogram(), ReplayPosition.NONE, + Long.MAX_VALUE, Long.MIN_VALUE, NO_COMPRESSION_RATIO, null, @@ -69,12 +71,13 @@ public class SSTableMetadata defaultTombstoneDropTimeHistogram()); } - private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp, - double cr, String partitioner, Set<Integer> ancestors, StreamingHistogram estimatedTombstoneDropTime) + private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long minTimestamp, + long maxTimestamp, double cr, String partitioner, Set<Integer> ancestors, StreamingHistogram estimatedTombstoneDropTime) { this.estimatedRowSize = rowSizes; this.estimatedColumnCount = columnCounts; this.replayPosition = replayPosition; + this.minTimestamp = minTimestamp; this.maxTimestamp = maxTimestamp; this.compressionRatio = cr; this.partitioner = partitioner; @@ -129,6 +132,7 @@ public class SSTableMetadata protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram(); protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram(); protected ReplayPosition replayPosition = ReplayPosition.NONE; + protected long minTimestamp = Long.MAX_VALUE; protected long maxTimestamp = Long.MIN_VALUE; protected double compressionRatio = NO_COMPRESSION_RATIO; protected Set<Integer> ancestors = new HashSet<Integer>(); @@ -158,6 +162,11 @@ public class SSTableMetadata compressionRatio = (double) compressed/uncompressed; } + public void updateMinTimestamp(long potentialMin) + { + minTimestamp = Math.min(minTimestamp, potentialMin); + } + public void updateMaxTimestamp(long potentialMax) { maxTimestamp = Math.max(maxTimestamp, potentialMax); @@ -168,6 +177,7 @@ public class SSTableMetadata return new SSTableMetadata(estimatedRowSize, estimatedColumnCount, replayPosition, + minTimestamp, maxTimestamp, compressionRatio, partitioner, @@ -201,6 +211,7 @@ public class SSTableMetadata void update(long size, ColumnStats stats) { + updateMinTimestamp(stats.minTimestamp); /* * The max timestamp is not always collected here (more precisely, row.maxTimestamp() may return Long.MIN_VALUE), * to avoid deserializing an EchoedRow. @@ -226,6 +237,7 @@ public class SSTableMetadata EstimatedHistogram.serializer.serialize(sstableStats.estimatedRowSize, dos); EstimatedHistogram.serializer.serialize(sstableStats.estimatedColumnCount, dos); ReplayPosition.serializer.serialize(sstableStats.replayPosition, dos); + dos.writeLong(sstableStats.minTimestamp); dos.writeLong(sstableStats.maxTimestamp); dos.writeDouble(sstableStats.compressionRatio); dos.writeUTF(sstableStats.partitioner); @@ -269,6 +281,9 @@ public class SSTableMetadata // make sure we don't omit replaying something that we should. see CASSANDRA-4782 replayPosition = ReplayPosition.NONE; } + long minTimestamp = desc.version.tracksMinTimestamp ? dis.readLong() : Long.MIN_VALUE; + if (!desc.version.tracksMinTimestamp) + minTimestamp = Long.MAX_VALUE; long maxTimestamp = desc.version.containsTimestamp() ? dis.readLong() : Long.MIN_VALUE; if (!desc.version.tracksMaxTimestamp) // see javadoc to Descriptor.containsTimestamp maxTimestamp = Long.MIN_VALUE; @@ -283,7 +298,7 @@ public class SSTableMetadata StreamingHistogram tombstoneHistogram = desc.version.tracksTombstones ? StreamingHistogram.serializer.deserialize(dis) : defaultTombstoneDropTimeHistogram(); - return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp, compressionRatio, partitioner, ancestors, tombstoneHistogram); + return new SSTableMetadata(rowSizes, columnCounts, replayPosition, minTimestamp, maxTimestamp, compressionRatio, partitioner, ancestors, tombstoneHistogram); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 6099ca8..42b4ae6 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -1096,6 +1096,11 @@ public class SSTableReader extends SSTable return sstableMetadata.replayPosition; } + public long getMinTimestamp() + { + return sstableMetadata.minTimestamp; + } + public long getMaxTimestamp() { return sstableMetadata.maxTimestamp; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 3e4656a..a3a8fe9 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -234,6 +234,7 @@ public class SSTableWriter extends SSTable } // deserialize each column to obtain maxTimestamp and immediately serialize it. + long minTimestamp = Long.MAX_VALUE; long maxTimestamp = Long.MIN_VALUE; StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory()); @@ -268,6 +269,7 @@ public class SSTableWriter extends SSTable { tombstones.update(deletionTime); } + minTimestamp = Math.min(minTimestamp, atom.minTimestamp()); maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp()); try { @@ -281,6 +283,7 @@ public class SSTableWriter extends SSTable assert dataSize == dataFile.getFilePointer() - (dataStart + 8) : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8)); + sstableMetadataCollector.updateMinTimestamp(minTimestamp); sstableMetadataCollector.updateMaxTimestamp(maxTimestamp); sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition); sstableMetadataCollector.addColumnCount(columnCount); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index e8ed58c..deac172 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@ -23,6 +23,8 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.concurrent.ExecutionException; +import junit.framework.Assert; + import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -140,7 +142,7 @@ public class CompactionsPurgeTest extends SchemaLoader // verify that minor compaction does not GC when key is present // in a non-compacted sstable ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, new QueryPath(cfName))); - assert cf.getColumnCount() == 10; + Assert.assertEquals(10, cf.getColumnCount()); // verify that minor compaction does GC when key is provably not // present in a non-compacted sstable @@ -149,6 +151,41 @@ public class CompactionsPurgeTest extends SchemaLoader } @Test + public void testMinTimestampPurge() throws IOException, ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + Table table = Table.open(TABLE2); + String cfName = "Standard1"; + ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName); + + RowMutation rm; + DecoratedKey key3 = Util.dk("key3"); + // inserts + rm = new RowMutation(TABLE2, key3.key); + rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes("c1")), ByteBufferUtil.EMPTY_BYTE_BUFFER, 8); + rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes("c2")), ByteBufferUtil.EMPTY_BYTE_BUFFER, 8); + rm.apply(); + cfs.forceBlockingFlush(); + // deletes + rm = new RowMutation(TABLE2, key3.key); + rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes("c1")), 10); + rm.apply(); + cfs.forceBlockingFlush(); + Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables(); + + // delete so we have new delete in a diffrent SST. + rm = new RowMutation(TABLE2, key3.key); + rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes("c2")), 9); + rm.apply(); + cfs.forceBlockingFlush(); + new CompactionTask(cfs, sstablesIncomplete, Integer.MAX_VALUE).execute(null); + + ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, new QueryPath(cfName))); + Assert.assertTrue(!cf.getColumn(ByteBufferUtil.bytes("c2")).isLive()); + Assert.assertEquals(1, cf.getColumnCount()); + } + + @Test public void testCompactionPurgeOneFile() throws IOException, ExecutionException, InterruptedException { CompactionManager.instance.disableAutoCompaction(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java index 545a9ec..007e0ca 100644 --- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java @@ -50,6 +50,7 @@ public class DescriptorTest desc = Descriptor.fromFilename("Keyspace1-Standard1-gz-1-Data.db"); assert "gz".equals(desc.version.toString()); assert !desc.version.tracksMaxTimestamp; + assert !desc.version.tracksMinTimestamp; } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java index 562be07..ab96f89 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java @@ -43,12 +43,14 @@ public class SSTableMetadataSerializerTest new long[] { 6L, 7L }, new long[] { 8L, 9L, 10L }); ReplayPosition rp = new ReplayPosition(11L, 12); + long minTimestamp = 2162517136L; long maxTimestamp = 4162517136L; SSTableMetadata.Collector collector = SSTableMetadata.createCollector() .estimatedRowSize(rowSizes) .estimatedColumnCount(columnCounts) .replayPosition(rp); + collector.updateMinTimestamp(minTimestamp); collector.updateMaxTimestamp(maxTimestamp); SSTableMetadata originalMetadata = collector.finalizeMetadata(RandomPartitioner.class.getCanonicalName()); @@ -68,7 +70,9 @@ public class SSTableMetadataSerializerTest assert stats.estimatedColumnCount.equals(columnCounts); assert stats.replayPosition.equals(originalMetadata.replayPosition); assert stats.replayPosition.equals(rp); + assert stats.minTimestamp == minTimestamp; assert stats.maxTimestamp == maxTimestamp; + assert stats.minTimestamp == originalMetadata.minTimestamp; assert stats.maxTimestamp == originalMetadata.maxTimestamp; assert RandomPartitioner.class.getCanonicalName().equals(stats.partitioner); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/test/unit/org/apache/cassandra/streaming/BootstrapTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/BootstrapTest.java b/test/unit/org/apache/cassandra/streaming/BootstrapTest.java index 83e0470..1af3074 100644 --- a/test/unit/org/apache/cassandra/streaming/BootstrapTest.java +++ b/test/unit/org/apache/cassandra/streaming/BootstrapTest.java @@ -36,7 +36,7 @@ public class BootstrapTest extends SchemaLoader @Test public void testGetNewNames() throws IOException { - Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Keyspace1-Standard1-ia-500-Data.db").toString()); + Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Keyspace1-Standard1-ib-500-Data.db").toString()); // assert !desc.isLatestVersion; // minimum compatible version -- for now it is the latest as well PendingFile inContext = new PendingFile(null, desc, "Data.db", Arrays.asList(Pair.create(0L, 1L)), OperationType.BOOTSTRAP);
