Updated Branches:
  refs/heads/trunk c3171085a -> 5ac567628

support streaming SSTables of older versions
patch by yukim; reviewed by slebresne for CASSANDRA-5772


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ac56762
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ac56762
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ac56762

Branch: refs/heads/trunk
Commit: 5ac5676282eb7f6ac12ef0629e565ab4c983173b
Parents: c317108
Author: Yuki Morishita <[email protected]>
Authored: Fri Jul 19 10:09:15 2013 -0500
Committer: Yuki Morishita <[email protected]>
Committed: Fri Jul 19 10:09:15 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/io/sstable/SSTableWriter.java     | 22 ++++++++++++++------
 .../cassandra/streaming/StreamReader.java       |  4 +++-
 3 files changed, 20 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ac56762/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b47f03d..6b2e2ba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * Add LZ4 compression to the native protocol (CASSANDRA-5765)
  * Fix bugs in the native protocol v2 (CASSANDRA-5770)
  * CAS on 'primary key only' table (CASSANDRA-5715)
+ * Support streaming SSTables of old versions (CASSANDRA-5772)
 
 
 2.0.0-beta1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ac56762/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 9e5999d..6add286 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -29,13 +29,16 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.StreamingHistogram;
 
 public class SSTableWriter extends SSTable
 {
@@ -205,7 +208,7 @@ public class SSTableWriter extends SSTable
      * @throws IOException if a read from the DataInput fails
      * @throws FSWriteError if a write to the dataFile fails
      */
-    public long appendFromStream(DecoratedKey key, CFMetaData metadata, 
DataInput in) throws IOException
+    public long appendFromStream(DecoratedKey key, CFMetaData metadata, 
DataInput in, Descriptor.Version version) throws IOException
     {
         long currentPosition = beforeAppend(key);
 
@@ -221,14 +224,21 @@ public class SSTableWriter extends SSTable
         cf.delete(DeletionTime.serializer.deserialize(in));
 
         ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, 
key.key, dataFile.stream);
-        OnDiskAtom.Serializer atomSerializer = Column.onDiskSerializer();
+        int columnCount = Integer.MAX_VALUE;
+        if (version.hasRowSizeAndColumnCount)
+        {
+            // skip row size
+            in.skipBytes(8);
+            columnCount = in.readInt();
+        }
+        Iterator<OnDiskAtom> iter = metadata.getOnDiskIterator(in, 
columnCount, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version);
         try
         {
-            while (true)
+            while (iter.hasNext())
             {
                 // deserialize column with PRESERVE_SIZE because we've written 
the dataSize based on the
                 // data size received, so we must reserialize the exact same 
data
-                OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, 
ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, 
Descriptor.Version.CURRENT);
+                OnDiskAtom atom = iter.next();
                 if (atom == null)
                     break;
                 if (atom instanceof CounterColumn)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ac56762/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java 
b/src/java/org/apache/cassandra/streaming/StreamReader.java
index bad8445..5c19eb1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -52,6 +52,7 @@ public class StreamReader
     protected final long estimatedKeys;
     protected final Collection<Pair<Long, Long>> sections;
     protected final StreamSession session;
+    protected final Descriptor.Version inputVersion;
 
     protected Descriptor desc;
 
@@ -61,6 +62,7 @@ public class StreamReader
         this.cfId = header.cfId;
         this.estimatedKeys = header.estimatedKeys;
         this.sections = header.sections;
+        this.inputVersion = new Descriptor.Version(header.version);
     }
 
     /**
@@ -114,7 +116,7 @@ public class StreamReader
     protected void writeRow(SSTableWriter writer, DataInput in, 
ColumnFamilyStore cfs) throws IOException
     {
         DecoratedKey key = 
StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
-        writer.appendFromStream(key, cfs.metadata, in);
+        writer.appendFromStream(key, cfs.metadata, in, inputVersion);
         cfs.invalidateCachedRow(key);
     }
 }

Reply via email to