Extend Descriptor to include a format value and refactor reader/writer apis
patch by tjake; reviewed by Marcus Eriksson for CASSANDRA-7443 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0368e97e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0368e97e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0368e97e Branch: refs/heads/trunk Commit: 0368e97ee4a807cb832a90c590ae5c65a98730c1 Parents: 7e53db0 Author: Jake Luciani <[email protected]> Authored: Tue Sep 2 12:49:01 2014 -0400 Committer: T Jake Luciani <[email protected]> Committed: Thu Oct 23 11:10:55 2014 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 12 +- .../org/apache/cassandra/config/CFMetaData.java | 10 +- .../cassandra/config/DatabaseDescriptor.java | 9 + .../org/apache/cassandra/db/AbstractCell.java | 3 +- .../apache/cassandra/db/AtomDeserializer.java | 5 +- .../apache/cassandra/db/BatchlogManager.java | 2 +- .../cassandra/db/CollationController.java | 2 +- .../cassandra/db/ColumnFamilySerializer.java | 3 +- .../apache/cassandra/db/ColumnFamilyStore.java | 31 +- .../org/apache/cassandra/db/DataTracker.java | 2 +- .../org/apache/cassandra/db/DeletionInfo.java | 2 +- src/java/org/apache/cassandra/db/Keyspace.java | 4 +- src/java/org/apache/cassandra/db/Memtable.java | 13 +- .../org/apache/cassandra/db/OnDiskAtom.java | 5 +- .../org/apache/cassandra/db/RangeTombstone.java | 7 +- .../org/apache/cassandra/db/RowIndexEntry.java | 60 +- .../apache/cassandra/db/RowIteratorFactory.java | 2 +- .../org/apache/cassandra/db/SystemKeyspace.java | 2 +- .../db/columniterator/IndexedSliceReader.java | 499 ----- .../db/columniterator/SSTableNamesIterator.java | 249 --- .../db/columniterator/SSTableSliceIterator.java | 101 - .../db/columniterator/SimpleSliceReader.java | 107 - .../db/commitlog/CommitLogDescriptor.java | 7 +- .../cassandra/db/commitlog/ReplayPosition.java | 2 +- .../db/compaction/AbstractCompactedRow.java | 3 +- .../compaction/AbstractCompactionStrategy.java | 4 +- .../db/compaction/AbstractCompactionTask.java | 2 +- .../db/compaction/CompactionController.java | 4 +- .../db/compaction/CompactionIterable.java | 8 +- .../db/compaction/CompactionManager.java | 29 +- .../cassandra/db/compaction/CompactionTask.java | 37 +- .../DateTieredCompactionStrategy.java | 2 +- .../db/compaction/LazilyCompactedRow.java | 45 +- .../compaction/LeveledCompactionStrategy.java | 9 +- .../db/compaction/LeveledCompactionTask.java | 4 +- .../db/compaction/LeveledManifest.java | 9 +- .../db/compaction/SSTableSplitter.java | 3 +- .../cassandra/db/compaction/Scrubber.java | 13 +- .../SizeTieredCompactionStrategy.java | 5 +- .../cassandra/db/compaction/Upgrader.java | 7 +- .../cassandra/db/composites/AbstractCType.java | 14 - .../apache/cassandra/db/composites/CType.java | 2 - .../cassandra/db/filter/IDiskAtomFilter.java | 2 +- .../cassandra/db/filter/NamesQueryFilter.java | 7 +- .../apache/cassandra/db/filter/QueryFilter.java | 2 +- .../cassandra/db/filter/SliceQueryFilter.java | 7 +- .../cassandra/db/index/SecondaryIndex.java | 2 +- .../db/index/SecondaryIndexManager.java | 2 +- .../org/apache/cassandra/dht/BytesToken.java | 1 - .../apache/cassandra/io/ISSTableSerializer.java | 3 +- .../io/compress/CompressionMetadata.java | 11 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 22 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 9 +- .../apache/cassandra/io/sstable/Descriptor.java | 172 +- .../cassandra/io/sstable/IndexHelper.java | 26 - .../io/sstable/IndexSummaryManager.java | 1 + .../io/sstable/ReducingKeyIterator.java | 1 + .../apache/cassandra/io/sstable/SSTable.java | 2 +- .../io/sstable/SSTableDeletingTask.java | 1 + .../io/sstable/SSTableIdentityIterator.java | 23 +- .../cassandra/io/sstable/SSTableLoader.java | 1 + .../cassandra/io/sstable/SSTableReader.java | 2055 ------------------ .../cassandra/io/sstable/SSTableRewriter.java | 2 + .../cassandra/io/sstable/SSTableScanner.java | 294 --- .../io/sstable/SSTableSimpleUnsortedWriter.java | 7 +- .../io/sstable/SSTableSimpleWriter.java | 1 + .../cassandra/io/sstable/SSTableWriter.java | 641 ------ .../io/sstable/format/SSTableFormat.java | 90 + .../io/sstable/format/SSTableReader.java | 1881 ++++++++++++++++ .../io/sstable/format/SSTableWriter.java | 202 ++ .../cassandra/io/sstable/format/Version.java | 104 + .../io/sstable/format/big/BigFormat.java | 224 ++ .../io/sstable/format/big/BigTableReader.java | 256 +++ .../io/sstable/format/big/BigTableScanner.java | 299 +++ .../io/sstable/format/big/BigTableWriter.java | 541 +++++ .../sstable/format/big/IndexedSliceReader.java | 500 +++++ .../format/big/SSTableNamesIterator.java | 250 +++ .../format/big/SSTableSliceIterator.java | 102 + .../sstable/format/big/SimpleSliceReader.java | 108 + .../io/sstable/metadata/CompactionMetadata.java | 3 +- .../metadata/IMetadataComponentSerializer.java | 3 +- .../io/sstable/metadata/MetadataCollector.java | 2 +- .../io/sstable/metadata/StatsMetadata.java | 7 +- .../io/sstable/metadata/ValidationMetadata.java | 3 +- .../cassandra/io/util/AbstractDataInput.java | 6 +- .../io/util/DataIntegrityMetadata.java | 2 +- .../apache/cassandra/io/util/FileDataInput.java | 1 + .../org/apache/cassandra/io/util/FileUtils.java | 29 +- .../cassandra/io/util/MappedFileDataInput.java | 4 +- .../cassandra/io/util/MemoryInputStream.java | 6 +- .../cassandra/metrics/ColumnFamilyMetrics.java | 2 +- .../apache/cassandra/net/MessagingService.java | 3 +- .../notifications/SSTableAddedNotification.java | 2 +- .../SSTableDeletingNotification.java | 2 +- .../SSTableListChangedNotification.java | 3 +- .../SSTableRepairStatusChanged.java | 2 +- .../repair/RepairMessageVerbHandler.java | 2 +- .../cassandra/service/ActiveRepairService.java | 3 +- .../apache/cassandra/service/CacheService.java | 7 +- .../cassandra/streaming/StreamLockfile.java | 2 +- .../cassandra/streaming/StreamReader.java | 33 +- .../cassandra/streaming/StreamReceiveTask.java | 6 +- .../cassandra/streaming/StreamSession.java | 2 +- .../cassandra/streaming/StreamTransferTask.java | 2 +- .../cassandra/streaming/StreamWriter.java | 2 +- .../compress/CompressedStreamReader.java | 16 +- .../compress/CompressedStreamWriter.java | 2 +- .../streaming/messages/FileMessageHeader.java | 27 +- .../streaming/messages/IncomingFileMessage.java | 2 +- .../streaming/messages/OutgoingFileMessage.java | 3 +- .../streaming/messages/StreamMessage.java | 4 +- .../apache/cassandra/tools/SSTableExport.java | 8 +- .../apache/cassandra/tools/SSTableImport.java | 7 +- .../tools/SSTableRepairedAtSetter.java | 2 +- .../cassandra/tools/StandaloneScrubber.java | 3 +- .../cassandra/tools/StandaloneSplitter.java | 3 +- .../cassandra/tools/StandaloneUpgrader.java | 5 +- .../utils/vint/EncodedDataInputStream.java | 6 +- .../db/compaction/LongCompactionsTest.java | 4 +- .../LongLeveledCompactionStrategyTest.java | 2 +- test/unit/org/apache/cassandra/Util.java | 2 +- .../cassandra/cache/AutoSavingCacheTest.java | 2 +- .../org/apache/cassandra/db/CleanupTest.java | 2 +- .../cassandra/db/ColumnFamilyStoreTest.java | 33 +- .../org/apache/cassandra/db/KeyCacheTest.java | 5 +- .../org/apache/cassandra/db/KeyspaceTest.java | 2 +- .../apache/cassandra/db/RangeTombstoneTest.java | 3 +- .../apache/cassandra/db/RowIndexEntryTest.java | 7 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 5 +- .../db/compaction/AntiCompactionTest.java | 15 +- .../compaction/BlacklistingCompactionsTest.java | 2 +- .../db/compaction/CompactionsPurgeTest.java | 2 +- .../db/compaction/CompactionsTest.java | 26 +- .../DateTieredCompactionStrategyTest.java | 3 +- .../LeveledCompactionStrategyTest.java | 2 +- .../SizeTieredCompactionStrategyTest.java | 2 +- .../cassandra/db/compaction/TTLExpiryTest.java | 5 +- .../cassandra/io/sstable/DescriptorTest.java | 52 +- .../io/sstable/IndexSummaryManagerTest.java | 1 + .../cassandra/io/sstable/LegacySSTableTest.java | 15 +- .../io/sstable/SSTableMetadataTest.java | 1 + .../cassandra/io/sstable/SSTableReaderTest.java | 1 + .../io/sstable/SSTableScannerTest.java | 11 +- .../cassandra/io/sstable/SSTableUtils.java | 9 +- .../metadata/MetadataSerializerTest.java | 2 +- .../apache/cassandra/repair/ValidatorTest.java | 3 +- .../streaming/StreamTransferTaskTest.java | 2 +- .../streaming/StreamingTransferTest.java | 2 +- .../cassandra/tools/SSTableExportTest.java | 20 +- .../cassandra/tools/SSTableImportTest.java | 2 +- tools/cqlstress-example.yaml | 9 +- tools/cqlstress-insanity-example.yaml | 2 - 153 files changed, 5202 insertions(+), 4473 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0dae098..524e776 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443) * Integrate JMH for microbenchmarks (CASSANDRA-8151) * Keep sstable levels when bootstrapping (CASSANDRA-7460) * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 9cd80c2..7a6476c 100644 --- a/build.xml +++ b/build.xml @@ -334,7 +334,7 @@ <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/> <dependency groupId="com.boundary" artifactId="high-scale-lib" version="1.0.6"/> <dependency groupId="com.github.jbellis" artifactId="jamm" version="0.2.6"/> - <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.7"> + <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.5"> <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/> </dependency> <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/> @@ -418,10 +418,11 @@ <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/> <dependency groupId="org.antlr" artifactId="antlr"/> <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/> - <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/> <dependency groupId="org.javassist" artifactId="javassist"/> <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/> <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess"/> + <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/> + <dependency groupId="org.javassist" artifactId="javassist" /> </artifact:pom> <artifact:pom id="coverage-deps-pom" @@ -475,10 +476,9 @@ <dependency groupId="org.mindrot" artifactId="jbcrypt"/> <dependency groupId="com.yammer.metrics" artifactId="metrics-core"/> <dependency groupId="com.addthis.metrics" artifactId="reporter-config"/> - <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.7"/> + <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.5"/> <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" /> <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" /> - <dependency groupId="org.javassist" artifactId="javassist"/> <dependency groupId="ch.qos.logback" artifactId="logback-core"/> <dependency groupId="ch.qos.logback" artifactId="logback-classic"/> @@ -492,6 +492,7 @@ <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/> <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/> + <!-- don't need jna to run, but nice to have --> <dependency groupId="net.java.dev.jna" artifactId="jna" version="4.0.0"/> @@ -1110,7 +1111,8 @@ <jvmarg value="-Xss256k"/> <jvmarg value="-Dcassandra.memtable_row_overhead_computation_step=100"/> <jvmarg value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/> - <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/> + <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/> + <jvmarg value="-Dcassandra.test.sstableformatdevelopment=true"/> <optjvmargs/> <classpath> <path refid="cassandra.classpath" /> http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 1e2a2e1..cf4d761 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -44,6 +44,8 @@ import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterables; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -1460,17 +1462,17 @@ public final class CFMetaData return (cfName + "_" + columnName + "_idx").replaceAll("\\W", ""); } - public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, Descriptor.Version version) + public Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, Version version) { return getOnDiskIterator(in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version); } - public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) + public Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, ColumnSerializer.Flag flag, int expireBefore, Version version) { - return AbstractCell.onDiskIterator(in, flag, expireBefore, version, comparator); + return version.getSSTableFormat().getOnDiskIterator(in, flag, expireBefore, this, version); } - public AtomDeserializer getOnDiskDeserializer(DataInput in, Descriptor.Version version) + public AtomDeserializer getOnDiskDeserializer(DataInput in, Version version) { return new AtomDeserializer(comparator, in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 8659c94..00e875b 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -38,6 +38,7 @@ import java.util.UUID; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Longs; +import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.AllowAllAuthenticator; @@ -98,6 +99,8 @@ public class DatabaseDescriptor private static Config conf; + private static SSTableFormat.Type sstable_format = SSTableFormat.Type.BIG; + private static IAuthenticator authenticator = new AllowAllAuthenticator(); private static IAuthorizer authorizer = new AllowAllAuthorizer(); @@ -1544,6 +1547,12 @@ public class DatabaseDescriptor return conf.inter_dc_tcp_nodelay; } + + public static SSTableFormat.Type getSSTableFormat() + { + return sstable_format; + } + public static MemtablePool getMemtableAllocatorPool() { long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/AbstractCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractCell.java b/src/java/org/apache/cassandra/db/AbstractCell.java index f27871f..de86126 100644 --- a/src/java/org/apache/cassandra/db/AbstractCell.java +++ b/src/java/org/apache/cassandra/db/AbstractCell.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.FBUtilities; @@ -39,7 +40,7 @@ public abstract class AbstractCell implements Cell public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in, final ColumnSerializer.Flag flag, final int expireBefore, - final Descriptor.Version version, + final Version version, final CellNameType type) { return new AbstractIterator<OnDiskAtom>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/AtomDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomDeserializer.java b/src/java/org/apache/cassandra/db/AtomDeserializer.java index 799ed0e..0c43422 100644 --- a/src/java/org/apache/cassandra/db/AtomDeserializer.java +++ b/src/java/org/apache/cassandra/db/AtomDeserializer.java @@ -24,6 +24,7 @@ import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.Version; /** * Helper class to deserialize OnDiskAtom efficiently. @@ -40,9 +41,9 @@ public class AtomDeserializer private final DataInput in; private final ColumnSerializer.Flag flag; private final int expireBefore; - private final Descriptor.Version version; + private final Version version; - public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) + public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag, int expireBefore, Version version) { this.type = type; this.nameDeserializer = type.newDeserializer(in); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 18d9a17..7f04cba 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -31,6 +31,7 @@ import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; import com.google.common.util.concurrent.RateLimiter; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,6 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/CollationController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java index ff5fe88..922cbfe 100644 --- a/src/java/org/apache/cassandra/db/CollationController.java +++ b/src/java/org/apache/cassandra/db/CollationController.java @@ -35,7 +35,7 @@ import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.marshal.CounterColumnType; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.SearchIterator; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java index e2aeb6c..29866d6 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java @@ -25,6 +25,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.io.ISSTableSerializer; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.UUIDSerializer; @@ -146,7 +147,7 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily throw new UnsupportedOperationException(); } - public ColumnFamily deserializeFromSSTable(DataInput in, Descriptor.Version version) + public ColumnFamily deserializeFromSSTable(DataInput in, Version version) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index aa266be..a23b2ad 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -34,6 +34,10 @@ import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.format.Version; import org.json.simple.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -441,7 +445,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean generations.add(desc.generation); if (!desc.isCompatible()) throw new RuntimeException(String.format("Incompatible SSTable found. Current version %s is unable to read file: %s. Please run upgradesstables.", - Descriptor.Version.CURRENT, desc)); + desc.getFormat().getLatestVersion(), desc)); } Collections.sort(generations); int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0; @@ -672,7 +676,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Set<Descriptor> currentDescriptors = new HashSet<Descriptor>(); for (SSTableReader sstable : data.getView().sstables) currentDescriptors.add(sstable.descriptor); - Set<SSTableReader> newSSTables = new HashSet<SSTableReader>(); + Set<SSTableReader> newSSTables = new HashSet<>(); Directories.SSTableLister lister = directories.sstableLister().skipTemporary(true); for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) @@ -686,8 +690,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (!descriptor.isCompatible()) throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s", - Descriptor.Version.CURRENT, - descriptor)); + descriptor.getFormat().getLatestVersion(), + descriptor)); // force foreign sstables to level 0 try @@ -711,7 +715,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean descriptor.ksname, descriptor.cfname, fileIndexGenerator.incrementAndGet(), - Descriptor.Type.FINAL); + Descriptor.Type.FINAL, + descriptor.formatType); } while (new File(newDescriptor.filenameFor(Component.DATA)).exists()); @@ -780,17 +785,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public String getTempSSTablePath(File directory) { - return getTempSSTablePath(directory, Descriptor.Version.CURRENT); + return getTempSSTablePath(directory, DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), DatabaseDescriptor.getSSTableFormat()); } - private String getTempSSTablePath(File directory, Descriptor.Version version) + public String getTempSSTablePath(File directory, SSTableFormat.Type format) + { + return getTempSSTablePath(directory, format.info.getLatestVersion(), format); + } + + private String getTempSSTablePath(File directory, Version version, SSTableFormat.Type format) { Descriptor desc = new Descriptor(version, directory, keyspace.getName(), name, fileIndexGenerator.incrementAndGet(), - Descriptor.Type.TEMP); + Descriptor.Type.TEMP, + format); return desc.filenameFor(Component.DATA); } @@ -2193,7 +2204,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public List<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException { Map<Descriptor, Set<Component>> snapshots = directories.sstableLister().snapshots(tag).list(); - List<SSTableReader> readers = new ArrayList<SSTableReader>(snapshots.size()); + List<SSTableReader> readers = new ArrayList<>(snapshots.size()); for (Map.Entry<Descriptor, Set<Component>> entries : snapshots.entrySet()) readers.add(SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner)); return readers; @@ -2779,7 +2790,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { assert data.getCompacting().isEmpty() : data.getCompacting(); - List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>(); + List<SSTableReader> truncatedSSTables = new ArrayList<>(); for (SSTableReader sstable : getSSTables()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index 7393323..d106190 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -24,13 +24,13 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Predicate; import com.google.common.collect.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.notifications.*; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/DeletionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java index 3e5e845..048324a 100644 --- a/src/java/org/apache/cassandra/db/DeletionInfo.java +++ b/src/java/org/apache/cassandra/db/DeletionInfo.java @@ -147,7 +147,7 @@ public class DeletionInfo implements IMeasurableMemory /** * Returns a new {@link InOrderTester} in forward order. */ - InOrderTester inOrderTester() + public InOrderTester inOrderTester() { return inOrderTester(false); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index ca43df6..8986154 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -32,6 +32,7 @@ import java.util.concurrent.Future; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,6 @@ import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.pager.QueryPagers; @@ -260,7 +260,7 @@ public class Keyspace */ public List<SSTableReader> getAllSSTables() { - List<SSTableReader> list = new ArrayList<SSTableReader>(columnFamilyStores.size()); + List<SSTableReader> list = new ArrayList<>(columnFamilyStores.size()); for (ColumnFamilyStore cfStore : columnFamilyStores.values()) list.addAll(cfStore.getSSTables()); return list; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index a711833..80376f7 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -28,6 +28,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Throwables; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,8 +40,6 @@ import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.dht.LongToken; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.DiskAwareRunnable; import org.apache.cassandra.service.ActiveRepairService; @@ -384,12 +385,8 @@ public class Memtable public SSTableWriter createFlushWriter(String filename) { MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context); - return new SSTableWriter(filename, - rows.size(), - ActiveRepairService.UNREPAIRED_SSTABLE, - cfs.metadata, - cfs.partitioner, - sstableMetadataCollector); + + return SSTableWriter.create(Descriptor.fromFilename(filename), (long) rows.size(), ActiveRepairService.UNREPAIRED_SSTABLE, cfs.metadata, cfs.partitioner, sstableMetadataCollector); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/OnDiskAtom.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java index b53e43b..f97ca42 100644 --- a/src/java/org/apache/cassandra/db/OnDiskAtom.java +++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java @@ -26,6 +26,7 @@ import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.io.ISSTableSerializer; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.serializers.MarshalException; @@ -65,12 +66,12 @@ public interface OnDiskAtom } } - public OnDiskAtom deserializeFromSSTable(DataInput in, Descriptor.Version version) throws IOException + public OnDiskAtom deserializeFromSSTable(DataInput in, Version version) throws IOException { return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version); } - public OnDiskAtom deserializeFromSSTable(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException + public OnDiskAtom deserializeFromSSTable(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Version version) throws IOException { Composite name = type.serializer().deserialize(in); if (name.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/RangeTombstone.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java index 6a246f9..4a0037b 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstone.java +++ b/src/java/org/apache/cassandra/db/RangeTombstone.java @@ -27,6 +27,7 @@ import org.apache.cassandra.db.composites.CType; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.io.ISSTableSerializer; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.serializers.MarshalException; @@ -259,7 +260,7 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements DeletionTime.serializer.serialize(t.data, out); } - public RangeTombstone deserializeFromSSTable(DataInput in, Descriptor.Version version) throws IOException + public RangeTombstone deserializeFromSSTable(DataInput in, Version version) throws IOException { Composite min = type.serializer().deserialize(in); @@ -268,14 +269,14 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements return deserializeBody(in, min, version); } - public RangeTombstone deserializeBody(DataInput in, Composite min, Descriptor.Version version) throws IOException + public RangeTombstone deserializeBody(DataInput in, Composite min, Version version) throws IOException { Composite max = type.serializer().deserialize(in); DeletionTime dt = DeletionTime.serializer.deserialize(in); return new RangeTombstone(min, max, dt); } - public void skipBody(DataInput in, Descriptor.Version version) throws IOException + public void skipBody(DataInput in, Version version) throws IOException { type.serializer().skip(in); DeletionTime.serializer.skip(in); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java index 01035c4..4ff61ce 100644 --- a/src/java/org/apache/cassandra/db/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java @@ -27,15 +27,14 @@ import java.util.List; import com.google.common.primitives.Ints; import org.apache.cassandra.cache.IMeasurableMemory; -import org.apache.cassandra.db.composites.CType; import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ObjectSizes; -public class RowIndexEntry implements IMeasurableMemory +public class RowIndexEntry<T> implements IMeasurableMemory { private static final long EMPTY_SIZE = ObjectSizes.measure(new RowIndexEntry(0)); @@ -46,12 +45,12 @@ public class RowIndexEntry implements IMeasurableMemory this.position = position; } - protected int promotedSize(CType type) + public int promotedSize(ISerializer<T> idxSerializer) { return 0; } - public static RowIndexEntry create(long position, DeletionTime deletionTime, ColumnIndex index) + public static RowIndexEntry<IndexHelper.IndexInfo> create(long position, DeletionTime deletionTime, ColumnIndex index) { assert index != null; assert deletionTime != null; @@ -62,7 +61,7 @@ public class RowIndexEntry implements IMeasurableMemory if (index.columnsIndex.size() > 1) return new IndexedEntry(position, deletionTime, index.columnsIndex); else - return new RowIndexEntry(position); + return new RowIndexEntry<>(position); } /** @@ -79,7 +78,16 @@ public class RowIndexEntry implements IMeasurableMemory throw new UnsupportedOperationException(); } - public List<IndexHelper.IndexInfo> columnsIndex() + /** + * @return the offset to the start of the header information for this row. + * For some formats this may not be the start of the row. + */ + public long headerOffset() + { + return 0; + } + + public List<T> columnsIndex() { return Collections.emptyList(); } @@ -89,31 +97,37 @@ public class RowIndexEntry implements IMeasurableMemory return EMPTY_SIZE; } - public static class Serializer + public static interface IndexSerializer<T> { - private final CType type; + void serialize(RowIndexEntry<T> rie, DataOutputPlus out) throws IOException; + RowIndexEntry<T> deserialize(DataInput in, Version version) throws IOException; + public int serializedSize(RowIndexEntry<T> rie); + } - public Serializer(CType type) + public static class Serializer implements IndexSerializer<IndexHelper.IndexInfo> + { + private final ISerializer<IndexHelper.IndexInfo> idxSerializer; + + public Serializer(ISerializer<IndexHelper.IndexInfo> idxSerializer) { - this.type = type; + this.idxSerializer = idxSerializer; } - public void serialize(RowIndexEntry rie, DataOutputPlus out) throws IOException + public void serialize(RowIndexEntry<IndexHelper.IndexInfo> rie, DataOutputPlus out) throws IOException { out.writeLong(rie.position); - out.writeInt(rie.promotedSize(type)); + out.writeInt(rie.promotedSize(idxSerializer)); if (rie.isIndexed()) { DeletionTime.serializer.serialize(rie.deletionTime(), out); out.writeInt(rie.columnsIndex().size()); - ISerializer<IndexHelper.IndexInfo> idxSerializer = type.indexSerializer(); for (IndexHelper.IndexInfo info : rie.columnsIndex()) idxSerializer.serialize(info, out); } } - public RowIndexEntry deserialize(DataInput in, Descriptor.Version version) throws IOException + public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInput in, Version version) throws IOException { long position = in.readLong(); @@ -123,8 +137,7 @@ public class RowIndexEntry implements IMeasurableMemory DeletionTime deletionTime = DeletionTime.serializer.deserialize(in); int entries = in.readInt(); - ISerializer<IndexHelper.IndexInfo> idxSerializer = type.indexSerializer(); - List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<IndexHelper.IndexInfo>(entries); + List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<>(entries); for (int i = 0; i < entries; i++) columnsIndex.add(idxSerializer.deserialize(in)); @@ -132,7 +145,7 @@ public class RowIndexEntry implements IMeasurableMemory } else { - return new RowIndexEntry(position); + return new RowIndexEntry<>(position); } } @@ -151,9 +164,9 @@ public class RowIndexEntry implements IMeasurableMemory FileUtils.skipBytesFully(in, size); } - public int serializedSize(RowIndexEntry rie) + public int serializedSize(RowIndexEntry<IndexHelper.IndexInfo> rie) { - int size = TypeSizes.NATIVE.sizeof(rie.position) + TypeSizes.NATIVE.sizeof(rie.promotedSize(type)); + int size = TypeSizes.NATIVE.sizeof(rie.position) + TypeSizes.NATIVE.sizeof(rie.promotedSize(idxSerializer)); if (rie.isIndexed()) { @@ -162,11 +175,11 @@ public class RowIndexEntry implements IMeasurableMemory size += DeletionTime.serializer.serializedSize(rie.deletionTime(), TypeSizes.NATIVE); size += TypeSizes.NATIVE.sizeof(index.size()); - ISerializer<IndexHelper.IndexInfo> idxSerializer = type.indexSerializer(); for (IndexHelper.IndexInfo info : index) size += idxSerializer.serializedSize(info, TypeSizes.NATIVE); } + return size; } } @@ -174,7 +187,7 @@ public class RowIndexEntry implements IMeasurableMemory /** * An entry in the row index for a row whose columns are indexed. */ - private static class IndexedEntry extends RowIndexEntry + private static class IndexedEntry extends RowIndexEntry<IndexHelper.IndexInfo> { private final DeletionTime deletionTime; private final List<IndexHelper.IndexInfo> columnsIndex; @@ -204,12 +217,11 @@ public class RowIndexEntry implements IMeasurableMemory } @Override - public int promotedSize(CType type) + public int promotedSize(ISerializer<IndexHelper.IndexInfo> idxSerializer) { TypeSizes typeSizes = TypeSizes.NATIVE; long size = DeletionTime.serializer.serializedSize(deletionTime, typeSizes); size += typeSizes.sizeof(columnsIndex.size()); // number of entries - ISerializer<IndexHelper.IndexInfo> idxSerializer = type.indexSerializer(); for (IndexHelper.IndexInfo info : columnsIndex) size += idxSerializer.serializedSize(info, typeSizes); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/RowIteratorFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java index 5bd2d9b..6ac74ae 100644 --- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java +++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java @@ -26,7 +26,7 @@ import org.apache.cassandra.db.columniterator.LazyColumnIterator; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.MergeIterator; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index bfd92e9..5c0d935 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -29,6 +29,7 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,6 @@ import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.metrics.RestorableMeter; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java deleted file mode 100644 index 7012321..0000000 --- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java +++ /dev/null @@ -1,499 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.columniterator; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.List; - -import com.google.common.collect.AbstractIterator; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.IndexHelper; -import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileMark; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.ByteBufferUtil; - -/** - * This is a reader that finds the block for a starting column and returns blocks before/after it for each next call. - * This function assumes that the CF is sorted by name and exploits the name index. - */ -class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator -{ - private final ColumnFamily emptyColumnFamily; - - private final SSTableReader sstable; - private final List<IndexHelper.IndexInfo> indexes; - private final FileDataInput originalInput; - private FileDataInput file; - private final boolean reversed; - private final ColumnSlice[] slices; - private final BlockFetcher fetcher; - private final Deque<OnDiskAtom> blockColumns = new ArrayDeque<OnDiskAtom>(); - private final CellNameType comparator; - - // Holds range tombstone in reverse queries. See addColumn() - private final Deque<OnDiskAtom> rangeTombstonesReversed; - - /** - * This slice reader assumes that slices are sorted correctly, e.g. that for forward lookup slices are in - * lexicographic order of start elements and that for reverse lookup they are in reverse lexicographic order of - * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] reverse: [h,g],[e,d],[b,a]. This reader also - * assumes that validation has been performed in terms of intervals (no overlapping intervals). - */ - public IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ColumnSlice[] slices, boolean reversed) - { - Tracing.trace("Seeking to partition indexed section in data file"); - this.sstable = sstable; - this.originalInput = input; - this.reversed = reversed; - this.slices = slices; - this.comparator = sstable.metadata.comparator; - this.rangeTombstonesReversed = reversed ? new ArrayDeque<OnDiskAtom>() : null; - - try - { - this.indexes = indexEntry.columnsIndex(); - emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata); - if (indexes.isEmpty()) - { - setToRowStart(indexEntry, input); - emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file)); - fetcher = new SimpleBlockFetcher(); - } - else - { - emptyColumnFamily.delete(indexEntry.deletionTime()); - fetcher = new IndexedBlockFetcher(indexEntry.position); - } - } - catch (IOException e) - { - sstable.markSuspect(); - throw new CorruptSSTableException(e, file.getPath()); - } - } - - /** - * Sets the seek position to the start of the row for column scanning. - */ - private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) throws IOException - { - if (in == null) - { - this.file = sstable.getFileDataInput(rowEntry.position); - } - else - { - this.file = in; - in.seek(rowEntry.position); - } - sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file)); - } - - public ColumnFamily getColumnFamily() - { - return emptyColumnFamily; - } - - public DecoratedKey getKey() - { - throw new UnsupportedOperationException(); - } - - protected OnDiskAtom computeNext() - { - while (true) - { - if (reversed) - { - // Return all tombstone for the block first (see addColumn() below) - OnDiskAtom column = rangeTombstonesReversed.poll(); - if (column != null) - return column; - } - - OnDiskAtom column = blockColumns.poll(); - if (column == null) - { - if (!fetcher.fetchMoreData()) - return endOfData(); - } - else - { - return column; - } - } - } - - public void close() throws IOException - { - if (originalInput == null && file != null) - file.close(); - } - - protected void addColumn(OnDiskAtom col) - { - if (reversed) - { - /* - * We put range tomstone markers at the beginning of the range they delete. But for reversed queries, - * the caller still need to know about a RangeTombstone before it sees any column that it covers. - * To make that simple, we keep said tombstones separate and return them all before any column for - * a given block. - */ - if (col instanceof RangeTombstone) - rangeTombstonesReversed.addFirst(col); - else - blockColumns.addFirst(col); - } - else - { - blockColumns.addLast(col); - } - } - - private abstract class BlockFetcher - { - protected int currentSliceIdx; - - protected BlockFetcher(int sliceIdx) - { - this.currentSliceIdx = sliceIdx; - } - - /* - * Return the smallest key selected by the current ColumnSlice. - */ - protected Composite currentStart() - { - return reversed ? slices[currentSliceIdx].finish : slices[currentSliceIdx].start; - } - - /* - * Return the biggest key selected by the current ColumnSlice. - */ - protected Composite currentFinish() - { - return reversed ? slices[currentSliceIdx].start : slices[currentSliceIdx].finish; - } - - protected abstract boolean setNextSlice(); - - protected abstract boolean fetchMoreData(); - - protected boolean isColumnBeforeSliceStart(OnDiskAtom column) - { - return isBeforeSliceStart(column.name()); - } - - protected boolean isBeforeSliceStart(Composite name) - { - Composite start = currentStart(); - return !start.isEmpty() && comparator.compare(name, start) < 0; - } - - protected boolean isColumnBeforeSliceFinish(OnDiskAtom column) - { - Composite finish = currentFinish(); - return finish.isEmpty() || comparator.compare(column.name(), finish) <= 0; - } - - protected boolean isAfterSliceFinish(Composite name) - { - Composite finish = currentFinish(); - return !finish.isEmpty() && comparator.compare(name, finish) > 0; - } - } - - private class IndexedBlockFetcher extends BlockFetcher - { - // where this row starts - private final long columnsStart; - - // the index entry for the next block to deserialize - private int nextIndexIdx = -1; - - // index of the last block we've read from disk; - private int lastDeserializedBlock = -1; - - // For reversed, keep columns at the beginning of the last deserialized block that - // may still match a slice - private final Deque<OnDiskAtom> prefetched; - - public IndexedBlockFetcher(long columnsStart) - { - super(-1); - this.columnsStart = columnsStart; - this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null; - setNextSlice(); - } - - protected boolean setNextSlice() - { - while (++currentSliceIdx < slices.length) - { - nextIndexIdx = IndexHelper.indexFor(slices[currentSliceIdx].start, indexes, comparator, reversed, nextIndexIdx); - if (nextIndexIdx < 0 || nextIndexIdx >= indexes.size()) - // no index block for that slice - continue; - - // Check if we can exclude this slice entirely from the index - IndexInfo info = indexes.get(nextIndexIdx); - if (reversed) - { - if (!isBeforeSliceStart(info.lastName)) - return true; - } - else - { - if (!isAfterSliceFinish(info.firstName)) - return true; - } - } - nextIndexIdx = -1; - return false; - } - - protected boolean hasMoreSlice() - { - return currentSliceIdx < slices.length; - } - - protected boolean fetchMoreData() - { - if (!hasMoreSlice()) - return false; - - // If we read blocks in reversed disk order, we may have columns from the previous block to handle. - // Note that prefetched keeps columns in reversed disk order. - if (reversed && !prefetched.isEmpty()) - { - boolean gotSome = false; - // Avoids some comparison when we know it's not useful - boolean inSlice = false; - - OnDiskAtom prefetchedCol; - while ((prefetchedCol = prefetched.peek() ) != null) - { - // col is before slice, we update the slice - if (isColumnBeforeSliceStart(prefetchedCol)) - { - inSlice = false; - if (!setNextSlice()) - return false; - } - // col is within slice, all columns - // (we go in reverse, so as soon as we are in a slice, no need to check - // we're after the slice until we change slice) - else if (inSlice || isColumnBeforeSliceFinish(prefetchedCol)) - { - blockColumns.addLast(prefetched.poll()); - gotSome = true; - inSlice = true; - } - // if col is after slice, ignore - else - { - prefetched.poll(); - } - } - if (gotSome) - return true; - } - try - { - return getNextBlock(); - } - catch (IOException e) - { - throw new CorruptSSTableException(e, file.getPath()); - } - } - - private boolean getNextBlock() throws IOException - { - if (lastDeserializedBlock == nextIndexIdx) - { - if (reversed) - nextIndexIdx--; - else - nextIndexIdx++; - } - lastDeserializedBlock = nextIndexIdx; - - // Are we done? - if (lastDeserializedBlock < 0 || lastDeserializedBlock >= indexes.size()) - return false; - - IndexInfo currentIndex = indexes.get(lastDeserializedBlock); - - /* seek to the correct offset to the data, and calculate the data size */ - long positionToSeek = columnsStart + currentIndex.offset; - - // With new promoted indexes, our first seek in the data file will happen at that point. - if (file == null) - file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput; - - AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version); - - file.seek(positionToSeek); - FileMark mark = file.mark(); - - // We remenber when we are whithin a slice to avoid some comparison - boolean inSlice = false; - - // scan from index start - while (file.bytesPastMark(mark) < currentIndex.width || deserializer.hasUnprocessed()) - { - // col is before slice - // (If in slice, don't bother checking that until we change slice) - Composite start = currentStart(); - if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0) - { - if (reversed) - { - // the next slice select columns that are before the current one, so it may - // match this column, so keep it around. - prefetched.addFirst(deserializer.readNext()); - } - else - { - deserializer.skipNext(); - } - } - // col is within slice - else - { - Composite finish = currentFinish(); - if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0) - { - inSlice = true; - addColumn(deserializer.readNext()); - } - // col is after slice. - else - { - // When reading forward, if we hit a column that sorts after the current slice, it means we're done with this slice. - // For reversed, this may either mean that we're done with the current slice, or that we need to read the previous - // index block. However, we can be sure that we are in the first case though (the current slice is done) if the first - // columns of the block were not part of the current slice, i.e. if we have columns in prefetched. - if (reversed && prefetched.isEmpty()) - break; - - if (!setNextSlice()) - break; - - inSlice = false; - - // The next index block now corresponds to the first block that may have columns for the newly set slice. - // So if it's different from the current block, we're done with this block. And in that case, we know - // that our prefetched columns won't match. - if (nextIndexIdx != lastDeserializedBlock) - { - if (reversed) - prefetched.clear(); - break; - } - - // Even if the next slice may have column in this blocks, if we're reversed, those columns have been - // prefetched and we're done with that block - if (reversed) - break; - - // otherwise, we will deal with that column at the next iteration - } - } - } - return true; - } - } - - private class SimpleBlockFetcher extends BlockFetcher - { - public SimpleBlockFetcher() throws IOException - { - // Since we have to deserialize in order and will read all slices might as well reverse the slices and - // behave as if it was not reversed - super(reversed ? slices.length - 1 : 0); - - // We remenber when we are whithin a slice to avoid some comparison - boolean inSlice = false; - - AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version); - while (deserializer.hasNext()) - { - // col is before slice - // (If in slice, don't bother checking that until we change slice) - Composite start = currentStart(); - if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0) - { - deserializer.skipNext(); - continue; - } - - // col is within slice - Composite finish = currentFinish(); - if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0) - { - inSlice = true; - addColumn(deserializer.readNext()); - } - // col is after slice. more slices? - else - { - inSlice = false; - if (!setNextSlice()) - break; - } - } - } - - protected boolean setNextSlice() - { - if (reversed) - { - if (currentSliceIdx <= 0) - return false; - - currentSliceIdx--; - } - else - { - if (currentSliceIdx >= slices.length - 1) - return false; - - currentSliceIdx++; - } - return true; - } - - protected boolean fetchMoreData() - { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java deleted file mode 100644 index 224b63f..0000000 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.columniterator; - -import java.io.IOException; -import java.util.*; - -import com.google.common.collect.AbstractIterator; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.IndexHelper; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileMark; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.ByteBufferUtil; - -public class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator -{ - private ColumnFamily cf; - private final SSTableReader sstable; - private FileDataInput fileToClose; - private Iterator<OnDiskAtom> iter; - public final SortedSet<CellName> columns; - public final DecoratedKey key; - - public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<CellName> columns) - { - assert columns != null; - this.sstable = sstable; - this.columns = columns; - this.key = key; - - RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); - if (indexEntry == null) - return; - - try - { - read(sstable, null, indexEntry); - } - catch (IOException e) - { - sstable.markSuspect(); - throw new CorruptSSTableException(e, sstable.getFilename()); - } - finally - { - if (fileToClose != null) - FileUtils.closeQuietly(fileToClose); - } - } - - public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry) - { - assert columns != null; - this.sstable = sstable; - this.columns = columns; - this.key = key; - - try - { - read(sstable, file, indexEntry); - } - catch (IOException e) - { - sstable.markSuspect(); - throw new CorruptSSTableException(e, sstable.getFilename()); - } - } - - private FileDataInput createFileDataInput(long position) - { - fileToClose = sstable.getFileDataInput(position); - return fileToClose; - } - - private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry) - throws IOException - { - List<IndexHelper.IndexInfo> indexList; - - // If the entry is not indexed or the index is not promoted, read from the row start - if (!indexEntry.isIndexed()) - { - if (file == null) - file = createFileDataInput(indexEntry.position); - else - file.seek(indexEntry.position); - - DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file)); - assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath()); - } - - indexList = indexEntry.columnsIndex(); - - if (!indexEntry.isIndexed()) - { - ColumnFamilySerializer serializer = ColumnFamily.serializer; - try - { - cf = ArrayBackedSortedColumns.factory.create(sstable.metadata); - cf.delete(DeletionTime.serializer.deserialize(file)); - } - catch (Exception e) - { - throw new IOException(serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e); - } - } - else - { - cf = ArrayBackedSortedColumns.factory.create(sstable.metadata); - cf.delete(indexEntry.deletionTime()); - } - - List<OnDiskAtom> result = new ArrayList<OnDiskAtom>(); - if (indexList.isEmpty()) - { - readSimpleColumns(file, columns, result); - } - else - { - readIndexedColumns(sstable.metadata, file, columns, indexList, indexEntry.position, result); - } - - // create an iterator view of the columns we read - iter = result.iterator(); - } - - private void readSimpleColumns(FileDataInput file, SortedSet<CellName> columnNames, List<OnDiskAtom> result) - { - Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, sstable.descriptor.version); - int n = 0; - while (atomIterator.hasNext()) - { - OnDiskAtom column = atomIterator.next(); - if (column instanceof Cell) - { - if (columnNames.contains(column.name())) - { - result.add(column); - if (++n >= columns.size()) - break; - } - } - else - { - result.add(column); - } - } - } - - private void readIndexedColumns(CFMetaData metadata, - FileDataInput file, - SortedSet<CellName> columnNames, - List<IndexHelper.IndexInfo> indexList, - long basePosition, - List<OnDiskAtom> result) - throws IOException - { - /* get the various column ranges we have to read */ - CellNameType comparator = metadata.comparator; - List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>(); - int lastIndexIdx = -1; - for (CellName name : columnNames) - { - int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx); - if (index < 0 || index == indexList.size()) - continue; - IndexHelper.IndexInfo indexInfo = indexList.get(index); - // Check the index block does contain the column names and that we haven't inserted this block yet. - if (comparator.compare(name, indexInfo.firstName) < 0 || index == lastIndexIdx) - continue; - - ranges.add(indexInfo); - lastIndexIdx = index; - } - - if (ranges.isEmpty()) - return; - - Iterator<CellName> toFetch = columnNames.iterator(); - CellName nextToFetch = toFetch.next(); - for (IndexHelper.IndexInfo indexInfo : ranges) - { - long positionToSeek = basePosition + indexInfo.offset; - - // With new promoted indexes, our first seek in the data file will happen at that point. - if (file == null) - file = createFileDataInput(positionToSeek); - - AtomDeserializer deserializer = cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version); - file.seek(positionToSeek); - FileMark mark = file.mark(); - while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch != null) - { - int cmp = deserializer.compareNextTo(nextToFetch); - if (cmp == 0) - { - nextToFetch = toFetch.hasNext() ? toFetch.next() : null; - result.add(deserializer.readNext()); - continue; - } - - deserializer.skipNext(); - if (cmp > 0) - nextToFetch = toFetch.hasNext() ? toFetch.next() : null; - } - } - } - - public DecoratedKey getKey() - { - return key; - } - - public ColumnFamily getColumnFamily() - { - return cf; - } - - protected OnDiskAtom computeNext() - { - if (iter == null || !iter.hasNext()) - return endOfData(); - return iter.next(); - } - - public void close() throws IOException { } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java deleted file mode 100644 index 0057d52..0000000 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.columniterator; - -import java.io.IOException; - -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.OnDiskAtom; -import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.util.FileDataInput; - -/** - * A Cell Iterator over SSTable - */ -public class SSTableSliceIterator implements OnDiskAtomIterator -{ - private final OnDiskAtomIterator reader; - private final DecoratedKey key; - - public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, ColumnSlice[] slices, boolean reversed) - { - this.key = key; - RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); - this.reader = indexEntry == null ? null : createReader(sstable, indexEntry, null, slices, reversed); - } - - /** - * An iterator for a slice within an SSTable - * @param sstable Keyspace for the CFS we are reading from - * @param file Optional parameter that input is read from. If null is passed, this class creates an appropriate one automatically. - * If this class creates, it will close the underlying file when #close() is called. - * If a caller passes a non-null argument, this class will NOT close the underlying file when the iterator is closed (i.e. the caller is responsible for closing the file) - * In all cases the caller should explicitly #close() this iterator. - * @param key The key the requested slice resides under - * @param slices the column slices - * @param reversed Results are returned in reverse order iff reversed is true. - * @param indexEntry position of the row - */ - public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry) - { - this.key = key; - reader = createReader(sstable, indexEntry, file, slices, reversed); - } - - private static OnDiskAtomIterator createReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput file, ColumnSlice[] slices, boolean reversed) - { - return slices.length == 1 && slices[0].start.isEmpty() && !reversed - ? new SimpleSliceReader(sstable, indexEntry, file, slices[0].finish) - : new IndexedSliceReader(sstable, indexEntry, file, slices, reversed); - } - - public DecoratedKey getKey() - { - return key; - } - - public ColumnFamily getColumnFamily() - { - return reader == null ? null : reader.getColumnFamily(); - } - - public boolean hasNext() - { - return reader != null && reader.hasNext(); - } - - public OnDiskAtom next() - { - return reader.next(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - - public void close() throws IOException - { - if (reader != null) - reader.close(); - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java deleted file mode 100644 index bdbf4bd..0000000 --- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.columniterator; - -import java.io.IOException; -import java.util.Iterator; - -import com.google.common.collect.AbstractIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.ByteBufferUtil; - -class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator -{ - private static final Logger logger = LoggerFactory.getLogger(SimpleSliceReader.class); - - private final FileDataInput file; - private final boolean needsClosing; - private final Composite finishColumn; - private final CellNameType comparator; - private final ColumnFamily emptyColumnFamily; - private final Iterator<OnDiskAtom> atomIterator; - - public SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, Composite finishColumn) - { - Tracing.trace("Seeking to partition beginning in data file"); - this.finishColumn = finishColumn; - this.comparator = sstable.metadata.comparator; - try - { - if (input == null) - { - this.file = sstable.getFileDataInput(indexEntry.position); - this.needsClosing = true; - } - else - { - this.file = input; - input.seek(indexEntry.position); - this.needsClosing = false; - } - - // Skip key and data size - ByteBufferUtil.skipShortLength(file); - - emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata); - emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file)); - atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, sstable.descriptor.version); - } - catch (IOException e) - { - sstable.markSuspect(); - throw new CorruptSSTableException(e, sstable.getFilename()); - } - } - - protected OnDiskAtom computeNext() - { - if (!atomIterator.hasNext()) - return endOfData(); - - OnDiskAtom column = atomIterator.next(); - if (!finishColumn.isEmpty() && comparator.compare(column.name(), finishColumn) > 0) - return endOfData(); - - return column; - } - - public ColumnFamily getColumnFamily() - { - return emptyColumnFamily; - } - - public void close() throws IOException - { - if (needsClosing) - file.close(); - } - - public DecoratedKey getKey() - { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java index e50a585..f914c2c 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.PureJavaCrc32; @@ -43,11 +44,13 @@ public class CommitLogDescriptor public static final int VERSION_12 = 2; public static final int VERSION_20 = 3; public static final int VERSION_21 = 4; + public static final int VERSION_30 = 5; /** * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes. * Note: make sure to handle {@link #getMessagingVersion()} */ - public static final int current_version = VERSION_21; + @VisibleForTesting + public static final int current_version = VERSION_30; // [version, id, checksum] static final int HEADER_SIZE = 4 + 8 + 4; @@ -126,6 +129,8 @@ public class CommitLogDescriptor return MessagingService.VERSION_20; case VERSION_21: return MessagingService.VERSION_21; + case VERSION_30: + return MessagingService.VERSION_30; default: throw new IllegalStateException("Unknown commitlog version " + version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java index 31fc28e..ca1969f 100644 --- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java +++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java @@ -27,7 +27,7 @@ import com.google.common.collect.Ordering; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputPlus; public class ReplayPosition implements Comparable<ReplayPosition>
