Updated Branches: refs/heads/cassandra-2.0 1365749e4 -> 2111a20b4 refs/heads/trunk 30afda368 -> 67f9eba79
Fix streaming older SSTable yields row tombstones patch by yukim; reviewed by jbellis for CASSANDRA-6527 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2111a20b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2111a20b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2111a20b Branch: refs/heads/cassandra-2.0 Commit: 2111a20b4b44e557357f81146ead6cf7493a8d31 Parents: 1365749 Author: Yuki Morishita <[email protected]> Authored: Thu Dec 26 15:52:03 2013 -0600 Committer: Yuki Morishita <[email protected]> Committed: Thu Dec 26 15:52:03 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/SSTableWriter.java | 11 +++-- .../cassandra/io/sstable/LegacySSTableTest.java | 49 +++++++++++++++++++- 3 files changed, 56 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2111a20b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f38b58f..93cdd81 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,7 @@ * cqlsh: handle symlinks properly (CASSANDRA-6425) * Fix potential infinite loop when paging query with IN (CASSANDRA-6464) * Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447) + * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527) Merged from 1.2: * Improved error message on bad properties in DDL queries (CASSANDRA-6453) * Randomize batchlog candidates selection (CASSANDRA-6481) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2111a20b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 70c0b42..3d19d83 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -222,16 +222,19 @@ public class SSTableWriter extends SSTable StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata); + // skip row size for version < ja + if (version.hasRowSizeAndColumnCount) + FileUtils.skipBytesFully(in, 8); + cf.delete(DeletionTime.serializer.deserialize(in)); ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, dataFile.stream); + + // read column count for version < ja int columnCount = Integer.MAX_VALUE; if (version.hasRowSizeAndColumnCount) - { - // skip row size - FileUtils.skipBytesFully(in, 8); columnCount = in.readInt(); - } + Iterator<OnDiskAtom> iter = metadata.getOnDiskIterator(in, columnCount, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/2111a20b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index 0b0ecf8..e508a55 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -25,8 +25,15 @@ import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.SSTableNamesIterator; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ByteBufferUtil; import org.junit.BeforeClass; @@ -85,6 +92,46 @@ public class LegacySSTableTest extends SchemaLoader */ @Test + public void testStreaming() throws Throwable + { + StorageService.instance.initServer(); + + for (File version : LEGACY_SSTABLE_ROOT.listFiles()) + if (Descriptor.Version.validate(version.getName())) + testStreaming(version.getName()); + } + + private void testStreaming(String version) throws Exception + { + SSTableReader sstable = SSTableReader.open(getDescriptor(version)); + IPartitioner p = StorageService.getPartitioner(); + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100")))); + ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken())); + ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>(); + details.add(new StreamSession.SSTableStreamingSections(sstable, + sstable.getPositionsForRanges(ranges), + sstable.estimatedKeysForRanges(ranges))); + new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details) + .execute().get(); + sstable.close(); + + ColumnFamilyStore cfs = Keyspace.open(KSNAME).getColumnFamilyStore(CFNAME); + assert cfs.getSSTables().size() == 1; + sstable = cfs.getSSTables().iterator().next(); + for (String keystring : TEST_DATA) + { + ByteBuffer key = ByteBufferUtil.bytes(keystring); + SSTableNamesIterator iter = new SSTableNamesIterator(sstable, Util.dk(key), FBUtilities.singleton(key)); + ColumnFamily cf = iter.getColumnFamily(); + + // check not deleted (CASSANDRA-6527) + assert cf.deletionInfo().equals(DeletionInfo.live()); + assert iter.next().name().equals(key); + } + } + + @Test public void testVersions() throws Throwable { for (File version : LEGACY_SSTABLE_ROOT.listFiles())
