http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java index 1d2bf48..f88fe92 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java @@ -25,6 +25,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.io.sstable.ColumnStats; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.SequentialWriter; /** * a CompactedRow is an object that takes a bunch of rows (keys + columnfamilies) @@ -47,7 +48,7 @@ public abstract class AbstractCompactedRow implements Closeable * * @return index information for the written row, or null if the compaction resulted in only expired tombstones. */ - public abstract RowIndexEntry write(long currentPosition, DataOutputPlus out) throws IOException; + public abstract RowIndexEntry write(long currentPosition, SequentialWriter out) throws IOException; /** * update @param digest with the data bytes of the row (not including row key or row size).
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 6a0e0df..d228cba 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.RateLimiter; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +34,6 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.SSTableReader; /** * Pluggable compaction strategy determines how SSTables get merged. @@ -369,7 +369,7 @@ public abstract class AbstractCompactionStrategy long keys = sstable.estimatedKeys(); Set<Range<Token>> ranges = new HashSet<Range<Token>>(overlaps.size()); for (SSTableReader overlap : overlaps) - ranges.add(new Range<Token>(overlap.first.getToken(), overlap.last.getToken(), overlap.partitioner)); + ranges.add(new Range<>(overlap.first.getToken(), overlap.last.getToken(), overlap.partitioner)); long remainingKeys = keys - sstable.estimatedKeysForRanges(ranges); // next, calculate what percentage of columns we have within those keys long columns = sstable.getEstimatedColumnCount().mean() * remainingKeys; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index 59338f4..7631baa 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -22,7 +22,7 @@ import java.util.Set; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DiskAwareRunnable; public abstract class AbstractCompactionTask extends DiskAwareRunnable http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index ef27805..e57d01d 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -24,13 +24,13 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.AlwaysPresentFilter; /** @@ -112,7 +112,7 @@ public class CompactionController implements AutoCloseable if (compacting == null) return Collections.<SSTableReader>emptySet(); - List<SSTableReader> candidates = new ArrayList<SSTableReader>(); + List<SSTableReader> candidates = new ArrayList<>(); long minTimestamp = Long.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java index 0c9b52a..86918bc 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java @@ -24,11 +24,14 @@ import java.util.List; import com.google.common.collect.ImmutableList; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.MergeIterator; public class CompactionIterable extends AbstractCompactionIterable { + final SSTableFormat format; + private static final Comparator<OnDiskAtomIterator> comparator = new Comparator<OnDiskAtomIterator>() { public int compare(OnDiskAtomIterator i1, OnDiskAtomIterator i2) @@ -37,9 +40,10 @@ public class CompactionIterable extends AbstractCompactionIterable } }; - public CompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller) + public CompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller, SSTableFormat.Type formatType) { super(controller, type, scanners); + this.format = formatType.info; } public CloseableIterator<AbstractCompactedRow> iterator() @@ -71,7 +75,7 @@ public class CompactionIterable extends AbstractCompactionIterable // create a new container for rows, since we're going to clear ours for the next one, // and the AbstractCompactionRow code should be able to assume that the collection it receives // won't be pulled out from under it. - return new LazilyCompactedRow(controller, ImmutableList.copyOf(rows)); + return format.getCompactedRowWriter(controller, ImmutableList.copyOf(rows)); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index b08668e..18ad7ae 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -52,7 +52,9 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multiset; import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; - +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,10 +78,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableRewriter; -import org.apache.cassandra.io.sstable.SSTableWriter; -import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.repair.Validator; @@ -327,7 +326,7 @@ public class CompactionManager implements CompactionManagerMBean @Override public boolean apply(SSTableReader sstable) { - return !(excludeCurrentVersion && sstable.descriptor.version.equals(Descriptor.Version.CURRENT)); + return !(excludeCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion())); } }); } @@ -502,7 +501,7 @@ public class CompactionManager implements CompactionManagerMBean { // look up the sstables now that we're on the compaction executor, so we don't try to re-compact // something that was already being compacted earlier. - Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(dataFiles.size()); + Collection<SSTableReader> sstables = new ArrayList<>(dataFiles.size()); for (Descriptor desc : dataFiles) { // inefficient but not in a performance sensitive path @@ -845,12 +844,8 @@ public class CompactionManager implements CompactionManagerMBean SSTableReader sstable) { FileUtils.createDirectory(compactionFileLocation); - return new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation), - expectedBloomFilterSize, - repairedAt, - cfs.metadata, - cfs.partitioner, - new MetadataCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel())); + + return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)), expectedBloomFilterSize, repairedAt, sstable.getSSTableLevel()); } public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs, @@ -875,8 +870,8 @@ public class CompactionManager implements CompactionManagerMBean break; } } - return new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation), - expectedBloomFilterSize, + return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)), + (long)expectedBloomFilterSize, repairedAt, cfs.metadata, cfs.partitioner, @@ -1007,6 +1002,7 @@ public class CompactionManager implements CompactionManagerMBean private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<SSTableReader> repairedSSTables, long repairedAt) { + // TODO(5351): we can do better here: logger.info("Performing anticompaction on {} sstables", repairedSSTables.size()); @@ -1044,6 +1040,7 @@ public class CompactionManager implements CompactionManagerMBean groupMaxDataAge = sstable.maxDataAge; } + if (anticompactionGroup.size() == 0) { logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available"); @@ -1068,7 +1065,7 @@ public class CompactionManager implements CompactionManagerMBean repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet)); unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet)); - CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller); + CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()); Iterator<AbstractCompactedRow> iter = ci.iterator(); while(iter.hasNext()) { @@ -1175,7 +1172,7 @@ public class CompactionManager implements CompactionManagerMBean { public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ICompactionScanner> scanners, int gcBefore) { - super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore)); + super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore), DatabaseDescriptor.getSSTableFormat()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index bfb86c9..584ff38 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -31,6 +31,10 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +43,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableRewriter; -import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.UUIDGen; @@ -150,11 +152,13 @@ public class CompactionTask extends AbstractCompactionTask long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact)); long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes()); long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); + SSTableFormat.Type sstableFormat = getFormatType(sstables); + logger.debug("Expected bloom filter size : {}", keysPerSSTable); try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) { - AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller); + AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat); Iterator<AbstractCompactedRow> iter = ci.iterator(); // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to @@ -177,7 +181,7 @@ public class CompactionTask extends AbstractCompactionTask return; } - writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt)); + writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt, sstableFormat)); while (iter.hasNext()) { if (ci.isStopRequested()) @@ -189,7 +193,7 @@ public class CompactionTask extends AbstractCompactionTask totalKeysWritten++; if (newSSTableSegmentThresholdReached(writer.currentWriter())) { - writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt)); + writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt, sstableFormat)); } } @@ -271,14 +275,14 @@ public class CompactionTask extends AbstractCompactionTask return minRepairedAt; } - private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt) + private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt, SSTableFormat.Type type) { - return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory), - keysPerSSTable, - repairedAt, - cfs.metadata, - cfs.partitioner, - new MetadataCollector(sstables, cfs.metadata.comparator, getLevel())); + return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory), type), + keysPerSSTable, + repairedAt, + cfs.metadata, + cfs.partitioner, + new MetadataCollector(sstables, cfs.metadata.comparator, getLevel())); } protected int getLevel() @@ -312,4 +316,13 @@ public class CompactionTask extends AbstractCompactionTask } return max; } + + public static SSTableFormat.Type getFormatType(Collection<SSTableReader> sstables) + { + if (sstables.isEmpty() || !SSTableFormat.enableSSTableDevelopmentTestMode) + return DatabaseDescriptor.getSSTableFormat(); + + //Allows us to test compaction of non-default formats + return sstables.iterator().next().descriptor.formatType; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 8c997ed..3f2aed6 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -22,13 +22,13 @@ import java.util.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cql3.statements.CFPropDefs; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.Pair; public class DateTieredCompactionStrategy extends AbstractCompactionStrategy http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index cfdbd17..1b28179 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -30,12 +30,13 @@ import com.google.common.collect.Iterators; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.io.sstable.format.big.BigTableWriter; import org.apache.cassandra.io.sstable.ColumnNameHelper; import org.apache.cassandra.io.sstable.ColumnStats; import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.utils.MergeIterator; import org.apache.cassandra.utils.StreamingHistogram; @@ -48,17 +49,17 @@ import org.apache.cassandra.utils.StreamingHistogram; */ public class LazilyCompactedRow extends AbstractCompactedRow { - private final List<? extends OnDiskAtomIterator> rows; - private final CompactionController controller; - private final long maxPurgeableTimestamp; - private final ColumnFamily emptyColumnFamily; - private ColumnStats columnStats; - private boolean closed; - private ColumnIndex.Builder indexBuilder; - private final SecondaryIndexManager.Updater indexer; - private final Reducer reducer; - private final Iterator<OnDiskAtom> merger; - private DeletionTime maxRowTombstone; + protected final List<? extends OnDiskAtomIterator> rows; + protected final CompactionController controller; + protected final long maxPurgeableTimestamp; + protected final ColumnFamily emptyColumnFamily; + protected ColumnStats columnStats; + protected boolean closed; + protected ColumnIndex.Builder indexBuilder; + protected final SecondaryIndexManager.Updater indexer; + protected final Reducer reducer; + protected final Iterator<OnDiskAtom> merger; + protected DeletionTime maxRowTombstone; public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows) { @@ -99,10 +100,12 @@ public class LazilyCompactedRow extends AbstractCompactedRow ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.gcUpdaterFor(key)); } - public RowIndexEntry write(long currentPosition, DataOutputPlus out) throws IOException + public RowIndexEntry write(long currentPosition, SequentialWriter dataFile) throws IOException { assert !closed; + DataOutputPlus out = dataFile.stream; + ColumnIndex columnsIndex; try { @@ -130,7 +133,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone indexBuilder.maybeWriteEmptyRowHeader(); - out.writeShort(SSTableWriter.END_OF_ROW); + out.writeShort(BigTableWriter.END_OF_ROW); close(); @@ -183,7 +186,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow closed = true; } - private class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom> + protected class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom> { // all columns reduced together will have the same name, so there will only be one column // in the container; we just want to leverage the conflict resolution code from CF. @@ -192,9 +195,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow // tombstone reference; will be reconciled w/ column during getReduced. Note that the top-level (row) tombstone // is held by LCR.deletionInfo. - RangeTombstone tombstone; + public RangeTombstone tombstone; - int columns = 0; + public int columns = 0; // if the row tombstone is 'live' we need to set timestamp to MAX_VALUE to be able to overwrite it later // markedForDeleteAt is MIN_VALUE for 'live' row tombstones (which we use to default maxTimestampSeen) @@ -204,10 +207,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow // we are bound to have either a RangeTombstone or standard cells will set this properly: ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE); - StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE); - List<ByteBuffer> minColumnNameSeen = Collections.emptyList(); - List<ByteBuffer> maxColumnNameSeen = Collections.emptyList(); - boolean hasLegacyCounterShards = false; + public StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE); + public List<ByteBuffer> minColumnNameSeen = Collections.emptyList(); + public List<ByteBuffer> maxColumnNameSeen = Collections.emptyList(); + public boolean hasLegacyCounterShards = false; public Reducer() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 994e52d..7a1f883 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.*; import com.google.common.primitives.Doubles; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +44,6 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; import org.apache.cassandra.notifications.SSTableAddedNotification; @@ -160,6 +160,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem } } + @Override public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) { throw new UnsupportedOperationException("LevelDB compaction strategy does not allow user-specified compactions"); @@ -319,7 +320,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem this.range = range; // add only sstables that intersect our range, and estimate how much data that involves - this.sstables = new ArrayList<SSTableReader>(sstables.size()); + this.sstables = new ArrayList<>(sstables.size()); long length = 0; for (SSTableReader sstable : sstables) { @@ -342,10 +343,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem public static List<SSTableReader> intersecting(Collection<SSTableReader> sstables, Range<Token> range) { - ArrayList<SSTableReader> filtered = new ArrayList<SSTableReader>(); + ArrayList<SSTableReader> filtered = new ArrayList<>(); for (SSTableReader sstable : sstables) { - Range<Token> sstableRange = new Range<Token>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner); + Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner); if (range == null || sstableRange.intersects(range)) filtered.add(sstable); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java index 2731b6d..9a33b49 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java @@ -20,8 +20,8 @@ package org.apache.cassandra.db.compaction; import java.util.Collection; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; public class LeveledCompactionTask extends CompactionTask { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 6d3bf69..74be143 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -574,7 +575,7 @@ public class LeveledManifest private static Set<SSTableReader> overlapping(Token start, Token end, Iterable<SSTableReader> sstables) { assert start.compareTo(end) <= 0; - Set<SSTableReader> overlapped = new HashSet<SSTableReader>(); + Set<SSTableReader> overlapped = new HashSet<>(); Bounds<Token> promotedBounds = new Bounds<Token>(start, end); for (SSTableReader candidate : sstables) { @@ -622,8 +623,8 @@ public class LeveledManifest // Note that we ignore suspect-ness of L1 sstables here, since if an L1 sstable is suspect we're // basically screwed, since we expect all or most L0 sstables to overlap with each L1 sstable. // So if an L1 sstable is suspect we can't do much besides try anyway and hope for the best. - Set<SSTableReader> candidates = new HashSet<SSTableReader>(); - Set<SSTableReader> remaining = new HashSet<SSTableReader>(); + Set<SSTableReader> candidates = new HashSet<>(); + Set<SSTableReader> remaining = new HashSet<>(); Iterables.addAll(remaining, Iterables.filter(getLevel(0), Predicates.not(suspectP))); for (SSTableReader sstable : ageSortedSSTables(remaining)) { @@ -699,7 +700,7 @@ public class LeveledManifest private List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates) { - List<SSTableReader> ageSortedCandidates = new ArrayList<SSTableReader>(candidates); + List<SSTableReader> ageSortedCandidates = new ArrayList<>(candidates); Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampComparator); return ageSortedCandidates; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java index 6b9f161..87c82e0 100644 --- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java +++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java @@ -20,7 +20,8 @@ package org.apache.cassandra.db.compaction; import java.util.*; import org.apache.cassandra.db.*; -import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; public class SSTableSplitter { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index b3d098d..1bea188 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -25,6 +25,8 @@ import com.google.common.base.Throwables; import org.apache.cassandra.db.*; import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.service.ActiveRepairService; @@ -46,6 +48,7 @@ public class Scrubber implements Closeable private final RandomAccessReader dataFile; private final RandomAccessReader indexFile; private final ScrubInfo scrubInfo; + private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; private final boolean isOffline; @@ -80,6 +83,8 @@ public class Scrubber implements Closeable this.skipCorrupted = skipCorrupted; this.isOffline = isOffline; + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); + // Calculate the expected compacted filesize this.destination = cfs.directories.getDirectoryForNewSSTables(); if (destination == null) @@ -113,7 +118,7 @@ public class Scrubber implements Closeable ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); { // throw away variable so we don't have a side effect in the assert - long firstRowPositionFromIndex = sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, sstable.descriptor.version).position; + long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position; assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex; } @@ -147,7 +152,7 @@ public class Scrubber implements Closeable nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() - : sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, sstable.descriptor.version).position; + : rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position; } catch (Throwable th) { @@ -177,7 +182,7 @@ public class Scrubber implements Closeable if (dataSize > dataFile.length()) throw new IOError(new IOException("Impossible row size " + dataSize)); - SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true); + SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, true); if (prevKey != null && prevKey.compareTo(key) > 0) { saveOutOfOrderRow(prevKey, key, atoms); @@ -206,7 +211,7 @@ public class Scrubber implements Closeable key = sstable.partitioner.decorateKey(currentIndexKey); try { - SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true); + SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, true); if (prevKey != null && prevKey.compareTo(key) > 0) { saveOutOfOrderRow(prevKey, key, atoms); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 2fa188c..0abb68d 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -24,13 +24,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.big.BigTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cql3.statements.CFPropDefs; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.Pair; public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy @@ -311,7 +312,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy public static List<Pair<SSTableReader, Long>> createSSTableAndLengthPairs(Iterable<SSTableReader> sstables) { - List<Pair<SSTableReader, Long>> sstableLengthPairs = new ArrayList<Pair<SSTableReader, Long>>(Iterables.size(sstables)); + List<Pair<SSTableReader, Long>> sstableLengthPairs = new ArrayList<>(Iterables.size(sstables)); for(SSTableReader sstable : sstables) sstableLengthPairs.add(Pair.create(sstable, sstable.onDiskLength())); return sstableLengthPairs; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index 2e33ac9..c9e7034 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -22,9 +22,12 @@ import java.util.*; import com.google.common.base.Throwables; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.OutputHandler; @@ -78,7 +81,7 @@ public class Upgrader sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel()); } - return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector); + return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector); } public void upgrade() @@ -88,7 +91,7 @@ public class Upgrader SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true); try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade)) { - Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator(); + Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()).iterator(); writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt)); while (iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/composites/AbstractCType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java index 5af7458..1df73fe 100644 --- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java +++ b/src/java/org/apache/cassandra/db/composites/AbstractCType.java @@ -80,12 +80,10 @@ public abstract class AbstractCType implements CType private final Serializer serializer; - private final ISerializer<IndexInfo> indexSerializer; private final IVersionedSerializer<ColumnSlice> sliceSerializer; private final IVersionedSerializer<SliceQueryFilter> sliceQueryFilterSerializer; private final DeletionInfo.Serializer deletionInfoSerializer; private final RangeTombstone.Serializer rangeTombstoneSerializer; - private final RowIndexEntry.Serializer rowIndexEntrySerializer; protected final boolean isByteOrderComparable; @@ -115,12 +113,10 @@ public abstract class AbstractCType implements CType serializer = new Serializer(this); - indexSerializer = new IndexInfo.Serializer(this); sliceSerializer = new ColumnSlice.Serializer(this); sliceQueryFilterSerializer = new SliceQueryFilter.Serializer(this); deletionInfoSerializer = new DeletionInfo.Serializer(this); rangeTombstoneSerializer = new RangeTombstone.Serializer(this); - rowIndexEntrySerializer = new RowIndexEntry.Serializer(this); this.isByteOrderComparable = isByteOrderComparable; } @@ -295,11 +291,6 @@ public abstract class AbstractCType implements CType return indexReverseComparator; } - public ISerializer<IndexInfo> indexSerializer() - { - return indexSerializer; - } - public IVersionedSerializer<ColumnSlice> sliceSerializer() { return sliceSerializer; @@ -320,11 +311,6 @@ public abstract class AbstractCType implements CType return rangeTombstoneSerializer; } - public RowIndexEntry.Serializer rowIndexEntrySerializer() - { - return rowIndexEntrySerializer; - } - @Override public boolean equals(Object o) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/composites/CType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/composites/CType.java b/src/java/org/apache/cassandra/db/composites/CType.java index d24277e..3844342 100644 --- a/src/java/org/apache/cassandra/db/composites/CType.java +++ b/src/java/org/apache/cassandra/db/composites/CType.java @@ -130,12 +130,10 @@ public interface CType extends Comparator<Composite> public Serializer serializer(); - public ISerializer<IndexInfo> indexSerializer(); public IVersionedSerializer<ColumnSlice> sliceSerializer(); public IVersionedSerializer<SliceQueryFilter> sliceQueryFilterSerializer(); public DeletionInfo.Serializer deletionInfoSerializer(); public RangeTombstone.Serializer rangeTombstoneSerializer(); - public RowIndexEntry.Serializer rowIndexEntrySerializer(); public interface Serializer extends ISerializer<Composite> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java index 3750c75..16be34c 100644 --- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java +++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java @@ -28,7 +28,7 @@ import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.composites.CType; import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileDataInput; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java index 0a34dfd..b5515bc 100644 --- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java @@ -24,20 +24,19 @@ import java.util.Iterator; import java.util.SortedSet; import java.util.TreeSet; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.lang3.StringUtils; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.columniterator.SSTableNamesIterator; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.composites.CType; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.utils.SearchIterator; @@ -87,12 +86,12 @@ public class NamesQueryFilter implements IDiskAtomFilter public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key) { - return new SSTableNamesIterator(sstable, key, columns); + return sstable.iterator(key, columns); } public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry) { - return new SSTableNamesIterator(sstable, file, key, columns, indexEntry); + return sstable.iterator(file, key, columns, indexEntry); } public void collectReducedColumns(ColumnFamily container, Iterator<Cell> reducedColumns, int gcBefore, long now) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/filter/QueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java index f58fa9f..1914970 100644 --- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java @@ -33,7 +33,7 @@ import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.MergeIterator; public class QueryFilter http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index 71d9095..0887cbe 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -24,19 +24,18 @@ import java.util.*; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.columniterator.SSTableSliceIterator; import org.apache.cassandra.db.composites.CType; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.tracing.Tracing; @@ -176,12 +175,12 @@ public class SliceQueryFilter implements IDiskAtomFilter public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key) { - return new SSTableSliceIterator(sstable, key, slices, reversed); + return sstable.iterator(key, slices, reversed); } public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry) { - return new SSTableSliceIterator(sstable, file, key, slices, reversed, indexEntry); + return sstable.iterator(file, key, slices, reversed, indexEntry); } public Comparator<Cell> getColumnComparator(CellNameType comparator) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/index/SecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java index 529e82c..c50019a 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,6 @@ import org.apache.cassandra.db.marshal.LocalByPartionerType; import org.apache.cassandra.dht.LocalToken; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.ReducingKeyIterator; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index c2d481b..5640d23 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Future; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +53,6 @@ import org.apache.cassandra.db.filter.ExtendedFilter; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.sstable.ReducingKeyIterator; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.OpOrder; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/dht/BytesToken.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BytesToken.java b/src/java/org/apache/cassandra/dht/BytesToken.java index f2f9eab..da965d2 100644 --- a/src/java/org/apache/cassandra/dht/BytesToken.java +++ b/src/java/org/apache/cassandra/dht/BytesToken.java @@ -49,7 +49,6 @@ public class BytesToken extends Token<byte[]> return FBUtilities.compareUnsigned(token, o.token, 0, 0, token.length, o.token.length); } - @Override public int hashCode() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/ISSTableSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/ISSTableSerializer.java b/src/java/org/apache/cassandra/io/ISSTableSerializer.java index 20ee352..2834662 100644 --- a/src/java/org/apache/cassandra/io/ISSTableSerializer.java +++ b/src/java/org/apache/cassandra/io/ISSTableSerializer.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputPlus; public interface ISSTableSerializer<T> @@ -43,5 +44,5 @@ public interface ISSTableSerializer<T> * @throws IOException * @return the type that was deserialized */ - public T deserializeFromSSTable(DataInput in, Descriptor.Version version) throws IOException; + public T deserializeFromSSTable(DataInput in, Version version) throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index 3d6cb71..75fdf1b 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -39,6 +39,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Longs; import org.apache.cassandra.cache.RefCountedMemory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.FSReadError; @@ -47,10 +48,12 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.Memory; import org.apache.cassandra.utils.Pair; +import org.hsqldb.Database; /** * Holds metadata about compressed file @@ -79,7 +82,7 @@ public class CompressionMetadata public static CompressionMetadata create(String dataFilePath) { Descriptor desc = Descriptor.fromFilename(dataFilePath); - return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length(), desc.version.hasPostCompressionAdlerChecksums); + return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length(), desc.version.hasPostCompressionAdlerChecksums()); } @VisibleForTesting @@ -262,6 +265,8 @@ public class CompressionMetadata private int maxCount = 100; private RefCountedMemory offsets = new RefCountedMemory(maxCount * 8); private int count = 0; + private Version latestVersion = DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(); + private Writer(CompressionParameters parameters, String path) { @@ -311,14 +316,14 @@ public class CompressionMetadata public CompressionMetadata openEarly(long dataLength, long compressedLength) { - return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums); + return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, latestVersion.hasPostCompressionAdlerChecksums()); } public CompressionMetadata openAfterClose(long dataLength, long compressedLength) { RefCountedMemory newOffsets = offsets.copy(count * 8L); offsets.unreference(); - return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums); + return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, latestVersion.hasPostCompressionAdlerChecksums()); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index ad8fb3e..11d6d5e 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -30,7 +30,8 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.CounterId; import org.apache.cassandra.utils.Pair; @@ -43,6 +44,8 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable protected ColumnFamily columnFamily; protected ByteBuffer currentSuperColumn; protected final CounterId counterid = CounterId.generate(); + private SSTableFormat.Type formatType = DatabaseDescriptor.getSSTableFormat(); + public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner) { @@ -51,19 +54,18 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable DatabaseDescriptor.setPartitioner(partitioner); } + protected void setSSTableFormatType(SSTableFormat.Type type) + { + this.formatType = type; + } + protected SSTableWriter getWriter() { - return new SSTableWriter( - makeFilename(directory, metadata.ksName, metadata.cfName), - 0, // We don't care about the bloom filter - ActiveRepairService.UNREPAIRED_SSTABLE, - metadata, - DatabaseDescriptor.getPartitioner(), - new MetadataCollector(metadata.comparator)); + return SSTableWriter.create(Descriptor.fromFilename(makeFilename(directory, metadata.ksName, metadata.cfName, formatType)), 0, ActiveRepairService.UNREPAIRED_SSTABLE); } // find available generation and pick up filename from that - protected static String makeFilename(File directory, final String keyspace, final String columnFamily) + protected static String makeFilename(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt) { final Set<Descriptor> existing = new HashSet<Descriptor>(); directory.list(new FilenameFilter() @@ -84,7 +86,7 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable int maxGen = 0; for (Descriptor desc : existing) maxGen = Math.max(maxGen, desc.generation); - return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, Descriptor.Type.TEMP).filenameFor(Component.DATA); + return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, Descriptor.Type.TEMP, fmt).filenameFor(Component.DATA); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index bf4da24..f7d467e 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -39,6 +39,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.utils.Pair; @@ -272,6 +273,8 @@ public class CQLSSTableWriter implements Closeable private File directory; private IPartitioner partitioner = new Murmur3Partitioner(); + protected SSTableFormat.Type formatType = null; + private CFMetaData schema; private UpdateStatement insert; private List<ColumnSpecification> boundNames; @@ -279,7 +282,7 @@ public class CQLSSTableWriter implements Closeable private boolean sorted = false; private long bufferSizeInMB = 128; - private Builder() {} + protected Builder() {} /** * The directory where to write the sstables. @@ -484,6 +487,10 @@ public class CQLSSTableWriter implements Closeable AbstractSSTableSimpleWriter writer = sorted ? new SSTableSimpleWriter(directory, schema, partitioner) : new BufferedWriter(directory, schema, partitioner, bufferSizeInMB); + + if (formatType != null) + writer.setSSTableFormatType(formatType); + return new CQLSSTableWriter(writer, insert, boundNames); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index 6911b48..03522c3 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -22,13 +22,18 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.StringTokenizer; +import com.google.common.base.CharMatcher; import com.google.common.base.Objects; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Directories; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer; import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer; import org.apache.cassandra.io.sstable.metadata.MetadataSerializer; import org.apache.cassandra.utils.Pair; +import org.apache.commons.lang.StringUtils; import static org.apache.cassandra.io.sstable.Component.separator; @@ -41,92 +46,6 @@ import static org.apache.cassandra.io.sstable.Component.separator; */ public class Descriptor { - // versions are denoted as [major][minor]. Minor versions must be forward-compatible: - // new fields are allowed in e.g. the metadata component, but fields can't be removed - // or have their size changed. - // - // Minor versions were introduced with version "hb" for Cassandra 1.0.3; prior to that, - // we always incremented the major version. - public static class Version - { - // This needs to be at the begining for initialization sake - public static final String current_version = "la"; - - // ja (2.0.0): super columns are serialized as composites (note that there is no real format change, - // this is mostly a marker to know if we should expect super columns or not. We do need - // a major version bump however, because we should not allow streaming of super columns - // into this new format) - // tracks max local deletiontime in sstable metadata - // records bloom_filter_fp_chance in metadata component - // remove data size and column count from data file (CASSANDRA-4180) - // tracks max/min column values (according to comparator) - // jb (2.0.1): switch from crc32 to adler32 for compression checksums - // checksum the compressed data - // ka (2.1.0): new Statistics.db file format - // index summaries can be downsampled and the sampling level is persisted - // switch uncompressed checksums to adler32 - // tracks presense of legacy (local and remote) counter shards - // la (3.0.0): new file name format - - public static final Version CURRENT = new Version(current_version); - - private final String version; - - public final boolean isLatestVersion; - public final boolean hasPostCompressionAdlerChecksums; - public final boolean hasSamplingLevel; - public final boolean newStatsFile; - public final boolean hasAllAdlerChecksums; - public final boolean hasRepairedAt; - public final boolean tracksLegacyCounterShards; - public final boolean newFileName; - - public Version(String version) - { - this.version = version; - isLatestVersion = version.compareTo(current_version) == 0; - hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0; - hasSamplingLevel = version.compareTo("ka") >= 0; - newStatsFile = version.compareTo("ka") >= 0; - hasAllAdlerChecksums = version.compareTo("ka") >= 0; - hasRepairedAt = version.compareTo("ka") >= 0; - tracksLegacyCounterShards = version.compareTo("ka") >= 0; - newFileName = version.compareTo("la") >= 0; - } - - /** - * @param ver SSTable version - * @return True if the given version string matches the format. - * @see #version - */ - static boolean validate(String ver) - { - return ver != null && ver.matches("[a-z]+"); - } - - public boolean isCompatible() - { - return version.compareTo("ja") >= 0 && version.charAt(0) <= CURRENT.version.charAt(0); - } - - @Override - public String toString() - { - return version; - } - - @Override - public boolean equals(Object o) - { - return o == this || o instanceof Version && version.equals(((Version) o).version); - } - - @Override - public int hashCode() - { - return version.hashCode(); - } - } public static enum Type { @@ -140,6 +59,7 @@ public class Descriptor } } + public final File directory; /** version has the following format: <code>[a-z]+</code> */ public final Version version; @@ -147,6 +67,7 @@ public class Descriptor public final String cfname; public final int generation; public final Type type; + public final SSTableFormat.Type formatType; private final int hashCode; /** @@ -154,29 +75,41 @@ public class Descriptor */ public Descriptor(File directory, String ksname, String cfname, int generation, Type temp) { - this(Version.CURRENT, directory, ksname, cfname, generation, temp); + this(DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), directory, ksname, cfname, generation, temp, DatabaseDescriptor.getSSTableFormat()); } - public Descriptor(String version, File directory, String ksname, String cfname, int generation, Type temp) + public Descriptor(File directory, String ksname, String cfname, int generation, Type temp, SSTableFormat.Type formatType) { - this(new Version(version), directory, ksname, cfname, generation, temp); + this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, temp, formatType); } - public Descriptor(Version version, File directory, String ksname, String cfname, int generation, Type temp) + public Descriptor(String version, File directory, String ksname, String cfname, int generation, Type temp, SSTableFormat.Type formatType) { - assert version != null && directory != null && ksname != null && cfname != null; + this(formatType.info.getVersion(version), directory, ksname, cfname, generation, temp, formatType); + } + + public Descriptor(Version version, File directory, String ksname, String cfname, int generation, Type temp, SSTableFormat.Type formatType) + { + assert version != null && directory != null && ksname != null && cfname != null && formatType.info.getLatestVersion().getClass().equals(version.getClass()); this.version = version; this.directory = directory; this.ksname = ksname; this.cfname = cfname; this.generation = generation; - type = temp; - hashCode = Objects.hashCode(directory, generation, ksname, cfname, temp); + this.type = temp; + this.formatType = formatType; + + hashCode = Objects.hashCode(version, directory, generation, ksname, cfname, temp, formatType); } public Descriptor withGeneration(int newGeneration) { - return new Descriptor(version, directory, ksname, cfname, newGeneration, type); + return new Descriptor(version, directory, ksname, cfname, newGeneration, type, formatType); + } + + public Descriptor withFormatType(SSTableFormat.Type newType) + { + return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, type, newType); } public String filenameFor(Component component) @@ -194,7 +127,7 @@ public class Descriptor private void appendFileName(StringBuilder buff) { - if (!version.newFileName) + if (!version.hasNewFileName()) { buff.append(ksname).append(separator); buff.append(cfname).append(separator); @@ -203,6 +136,8 @@ public class Descriptor buff.append(type.marker).append(separator); buff.append(version).append(separator); buff.append(generation); + if (formatType != SSTableFormat.Type.LEGACY) + buff.append(separator).append(formatType.name); } public String relativeFilenameFor(Component component) @@ -213,6 +148,11 @@ public class Descriptor return buff.toString(); } + public SSTableFormat getFormat() + { + return formatType.info; + } + /** * @param suffix A component suffix, such as 'Data.db'/'Index.db'/etc * @return A filename for this descriptor with the given suffix. @@ -233,6 +173,11 @@ public class Descriptor return fromFilename(file.getParentFile(), file.getName(), false).left; } + public static Descriptor fromFilename(String filename, SSTableFormat.Type formatType) + { + return fromFilename(filename).withFormatType(formatType); + } + public static Descriptor fromFilename(String filename, boolean skipComponent) { File file = new File(filename); @@ -249,7 +194,7 @@ public class Descriptor * * <ul> * <li><ksname>-<cfname>-(tmp-)?<version>-<gen>-<component> for cassandra 2.0 and before</li> - * <li>(<tmp marker>-)?<version>-<gen>-<component> for cassandra 2.1 and later</li> + * <li>(<tmp marker>-)?<version>-<gen>-<component> for cassandra 3.0 and later</li> * </ul> * * If this is for SSTable of secondary index, directory should ends with index name for 2.1+. @@ -278,32 +223,42 @@ public class Descriptor // component suffix String component = skipComponent ? null : tokenStack.pop(); + nexttok = tokenStack.pop(); + // generation OR Type + SSTableFormat.Type fmt = SSTableFormat.Type.LEGACY; + if (!CharMatcher.DIGIT.matchesAllOf(nexttok)) + { + fmt = SSTableFormat.Type.validate(nexttok); + nexttok = tokenStack.pop(); + } + // generation - int generation = Integer.parseInt(tokenStack.pop()); + int generation = Integer.parseInt(nexttok); // version nexttok = tokenStack.pop(); - if (!Version.validate(nexttok)) + Version version = fmt.info.getVersion(nexttok); + + if (!version.validate(nexttok)) throw new UnsupportedOperationException("SSTable " + name + " is too old to open. Upgrade to 2.0 first, and run upgradesstables"); - Version version = new Version(nexttok); // optional temporary marker - Type type = Type.FINAL; + Type type = Descriptor.Type.FINAL; nexttok = tokenStack.peek(); - if (Type.TEMP.marker.equals(nexttok)) + if (Descriptor.Type.TEMP.marker.equals(nexttok)) { - type = Type.TEMP; + type = Descriptor.Type.TEMP; tokenStack.pop(); } - else if (Type.TEMPLINK.marker.equals(nexttok)) + else if (Descriptor.Type.TEMPLINK.marker.equals(nexttok)) { - type = Type.TEMPLINK; + type = Descriptor.Type.TEMPLINK; tokenStack.pop(); } // ks/cf names String ksname, cfname; - if (version.newFileName) + if (version.hasNewFileName()) { // for 2.1+ read ks and cf names from directory File cfDirectory = parentDirectory; @@ -332,7 +287,7 @@ public class Descriptor } assert tokenStack.isEmpty() : "Invalid file name " + name + " in " + directory; - return Pair.create(new Descriptor(version, parentDirectory, ksname, cfname, generation, type), component); + return Pair.create(new Descriptor(version, parentDirectory, ksname, cfname, generation, type, fmt), component); } /** @@ -341,12 +296,12 @@ public class Descriptor */ public Descriptor asType(Type type) { - return new Descriptor(version, directory, ksname, cfname, generation, type); + return new Descriptor(version, directory, ksname, cfname, generation, type, formatType); } public IMetadataSerializer getMetadataSerializer() { - if (version.newStatsFile) + if (version.hasNewStatsFile()) return new MetadataSerializer(); else return new LegacyMetadataSerializer(); @@ -378,6 +333,7 @@ public class Descriptor && that.generation == this.generation && that.ksname.equals(this.ksname) && that.cfname.equals(this.cfname) + && that.formatType == this.formatType && that.type == this.type; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/IndexHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java index b0bbfc4..4518e23 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java @@ -67,32 +67,6 @@ public class IndexHelper } /** - * Deserialize the index into a structure and return it - * - * @param in input source - * @param type the comparator type for the column family - * - * @return ArrayList<IndexInfo> - list of de-serialized indexes - * @throws IOException if an I/O error occurs. - */ - public static List<IndexInfo> deserializeIndex(FileDataInput in, CType type) throws IOException - { - int columnIndexSize = in.readInt(); - if (columnIndexSize == 0) - return Collections.<IndexInfo>emptyList(); - ArrayList<IndexInfo> indexList = new ArrayList<IndexInfo>(); - FileMark mark = in.mark(); - ISerializer<IndexInfo> serializer = type.indexSerializer(); - while (in.bytesPastMark(mark) < columnIndexSize) - { - indexList.add(serializer.deserialize(in)); - } - assert in.bytesPastMark(mark) == columnIndexSize; - - return indexList; - } - - /** * The index of the IndexInfo in which a scan starting with @name should begin. * * @param name http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index e39d75d..468b54c 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -37,6 +37,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java index 6cb8653..a1fda57 100644 --- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Iterator; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.IMergeIterator; import org.apache.cassandra.utils.MergeIterator; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index dee024a..50b9545 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -208,7 +208,7 @@ public abstract class SSTable } /** @return An estimate of the number of keys contained in the given index file. */ - long estimateRowsFromIndex(RandomAccessReader ifile) throws IOException + protected long estimateRowsFromIndex(RandomAccessReader ifile) throws IOException { // collect sizes for the first 10000 keys, or first 10 megabytes of data final int SAMPLES_CAP = 10000, BYTES_CAP = (int)Math.min(10000000, ifile.length()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java index 785e23b..bb84e4c 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java @@ -24,6 +24,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import com.google.common.collect.Sets; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index b784a7e..6300749 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -21,16 +21,19 @@ import java.io.*; import java.util.Iterator; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.serializers.MarshalException; -public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, OnDiskAtomIterator + public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, OnDiskAtomIterator { private final DecoratedKey key; private final DataInput in; - public final long dataSize; // we [still] require this so compaction can tell if it's safe to read the row into memory public final ColumnSerializer.Flag flag; private final ColumnFamily columnFamily; @@ -43,11 +46,10 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat * @param sstable SSTable we are reading ffrom. * @param file Reading using this file. * @param key Key of this row. - * @param dataSize length of row data */ - public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataSize) + public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key) { - this(sstable, file, key, dataSize, false); + this(sstable, file, key, false); } /** @@ -55,21 +57,19 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat * @param sstable SSTable we are reading ffrom. * @param file Reading using this file. * @param key Key of this row. - * @param dataSize length of row data * @param checkData if true, do its best to deserialize and check the coherence of row data */ - public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData) + public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, boolean checkData) { - this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL); + this(sstable.metadata, file, file.getPath(), key, checkData, sstable, ColumnSerializer.Flag.LOCAL); } // sstable may be null *if* checkData is false // If it is null, we assume the data is in the current file format private SSTableIdentityIterator(CFMetaData metadata, - DataInput in, + FileDataInput in, String filename, DecoratedKey key, - long dataSize, boolean checkData, SSTableReader sstable, ColumnSerializer.Flag flag) @@ -78,11 +78,10 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat this.in = in; this.filename = filename; this.key = key; - this.dataSize = dataSize; this.flag = flag; this.validateColumns = checkData; - Descriptor.Version dataVersion = sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version; + Version dataVersion = sstable == null ? DatabaseDescriptor.getSSTableFormat().info.getLatestVersion() : sstable.descriptor.version; int expireBefore = (int) (System.currentTimeMillis() / 1000); columnFamily = ArrayBackedSortedColumns.factory.create(metadata); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 6cb7f03..2a1b66f 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -33,6 +33,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.*; import org.apache.cassandra.utils.FBUtilities;
