Updated Branches: refs/heads/trunk c3171085a -> 5ac567628
support streaming SSTables of older versions patch by yukim; reviewed by slebresne for CASSANDRA-5772 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ac56762 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ac56762 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ac56762 Branch: refs/heads/trunk Commit: 5ac5676282eb7f6ac12ef0629e565ab4c983173b Parents: c317108 Author: Yuki Morishita <[email protected]> Authored: Fri Jul 19 10:09:15 2013 -0500 Committer: Yuki Morishita <[email protected]> Committed: Fri Jul 19 10:09:15 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/SSTableWriter.java | 22 ++++++++++++++------ .../cassandra/streaming/StreamReader.java | 4 +++- 3 files changed, 20 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ac56762/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b47f03d..6b2e2ba 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ * Add LZ4 compression to the native protocol (CASSANDRA-5765) * Fix bugs in the native protocol v2 (CASSANDRA-5770) * CAS on 'primary key only' table (CASSANDRA-5715) + * Support streaming SSTables of old versions (CASSANDRA-5772) 2.0.0-beta1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ac56762/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 9e5999d..6add286 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -29,13 +29,16 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.compaction.*; +import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.util.*; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FilterFactory; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.StreamingHistogram; public class SSTableWriter extends SSTable { @@ -205,7 +208,7 @@ public class SSTableWriter extends SSTable * @throws IOException if a read from the DataInput fails * @throws FSWriteError if a write to the dataFile fails */ - public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in) throws IOException + public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Descriptor.Version version) throws IOException { long currentPosition = beforeAppend(key); @@ -221,14 +224,21 @@ public class SSTableWriter extends SSTable cf.delete(DeletionTime.serializer.deserialize(in)); ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, dataFile.stream); - OnDiskAtom.Serializer atomSerializer = Column.onDiskSerializer(); + int columnCount = Integer.MAX_VALUE; + if (version.hasRowSizeAndColumnCount) + { + // skip row size + in.skipBytes(8); + columnCount = in.readInt(); + } + Iterator<OnDiskAtom> iter = metadata.getOnDiskIterator(in, columnCount, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version); try { - while (true) + while (iter.hasNext()) { // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the // data size received, so we must reserialize the exact same data - OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT); + OnDiskAtom atom = iter.next(); if (atom == null) break; if (atom instanceof CounterColumn) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ac56762/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index bad8445..5c19eb1 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -52,6 +52,7 @@ public class StreamReader protected final long estimatedKeys; protected final Collection<Pair<Long, Long>> sections; protected final StreamSession session; + protected final Descriptor.Version inputVersion; protected Descriptor desc; @@ -61,6 +62,7 @@ public class StreamReader this.cfId = header.cfId; this.estimatedKeys = header.estimatedKeys; this.sections = header.sections; + this.inputVersion = new Descriptor.Version(header.version); } /** @@ -114,7 +116,7 @@ public class StreamReader protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException { DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); - writer.appendFromStream(key, cfs.metadata, in); + writer.appendFromStream(key, cfs.metadata, in, inputVersion); cfs.invalidateCachedRow(key); } }
