Fix streaming RangeTombstones at column index boundary; patch by slebresne reviewed by yukim for CASSANDRA-5418
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f1fb434 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f1fb434 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f1fb434 Branch: refs/heads/trunk Commit: 0f1fb4340ca1f6360487c76909883bfedc63e4ce Parents: 83ed1cb Author: Yuki Morishita <[email protected]> Authored: Thu Apr 11 10:57:42 2013 -0500 Committer: Yuki Morishita <[email protected]> Committed: Thu Apr 11 10:57:42 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 3 +- src/java/org/apache/cassandra/db/ColumnIndex.java | 26 +++++++--- .../apache/cassandra/io/sstable/SSTableWriter.java | 2 +- .../cassandra/streaming/StreamingTransferTest.java | 36 +++++++++++++++ 4 files changed, 57 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f1fb434/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5c26014..2124b15 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,7 +2,8 @@ * Include fatal errors in trace events (CASSANDRA-5447) * Ensure that PerRowSecondaryIndex is notified of row-level deletes (CASSANDRA-5445) - * Allow empty blob literals in CQL3 (CASSANDRA-5452) + * Allow empty blob literals in CQL3 (CASSANDRA-5452) + * Fix streaming RangeTombstones at column index boundary (CASSANDRA-5418) Merged from 1.1: * Fix trying to load deleted row into row cache on startup (CASSANDRA-4463) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f1fb434/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index bd1c35a..bcd0eef 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -68,13 +68,22 @@ public class ColumnIndex public Builder(ColumnFamily cf, ByteBuffer key, int estimatedColumnCount, - DataOutput output) + DataOutput output, + boolean fromStream) { this.indexOffset = rowHeaderSize(key, cf.deletionInfo()); this.result = new ColumnIndex(estimatedColumnCount); this.output = output; this.atomSerializer = cf.getOnDiskSerializer(); - this.tombstoneTracker = new RangeTombstone.Tracker(cf.getComparator()); + this.tombstoneTracker = fromStream ? null : new RangeTombstone.Tracker(cf.getComparator()); + } + + public Builder(ColumnFamily cf, + ByteBuffer key, + int estimatedColumnCount, + DataOutput output) + { + this(cf, key, estimatedColumnCount, output, false); } /** @@ -99,7 +108,7 @@ public class ColumnIndex public int writtenAtomCount() { - return atomCount + tombstoneTracker.writtenAtom(); + return tombstoneTracker == null ? atomCount : atomCount + tombstoneTracker.writtenAtom(); } /** @@ -153,11 +162,11 @@ public class ColumnIndex { firstColumn = column; startPosition = endPosition; - // TODO: have that use the firstColumn as min + make sure we - // optimize that on read - endPosition += tombstoneTracker.writeOpenedMarker(firstColumn, output, atomSerializer); + // TODO: have that use the firstColumn as min + make sure we optimize that on read + if (tombstoneTracker != null) + endPosition += tombstoneTracker.writeOpenedMarker(firstColumn, output, atomSerializer); blockSize = 0; // We don't count repeated tombstone marker in the block size, to avoid a situation - // where we wouldn't make any problem because a block is filled by said marker + // where we wouldn't make any progress because a block is filled by said marker } long size = column.serializedSizeForSSTable(); @@ -177,7 +186,8 @@ public class ColumnIndex atomSerializer.serializeForSSTable(column, output); // TODO: Should deal with removing unneeded tombstones - tombstoneTracker.update(column); + if (tombstoneTracker != null) + tombstoneTracker.update(column); lastColumn = column; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f1fb434/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index e05a34e..c64fd27 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -240,7 +240,7 @@ public class SSTableWriter extends SSTable ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory()); cf.delete(deletionInfo); - ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, columnCount, dataFile.stream); + ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, columnCount, dataFile.stream, true); OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer(); for (int i = 0; i < columnCount; i++) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f1fb434/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 502e5d7..2befe45 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -26,9 +26,11 @@ import static org.apache.cassandra.Util.column; import static org.apache.cassandra.Util.addMutation; import java.net.InetAddress; +import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.context.CounterContext; @@ -126,6 +128,40 @@ public class StreamingTransferTest extends SchemaLoader session.await(); } + /** + * Test to make sure RangeTombstones at column index boundary transferred correctly. + */ + @Test + public void testTransferRangeTombstones() throws Exception + { + String ks = "Keyspace1"; + String cfname = "StandardInteger1"; + Table table = Table.open(ks); + ColumnFamilyStore cfs = table.getColumnFamilyStore(cfname); + + String key = "key1"; + RowMutation rm = new RowMutation(ks, ByteBufferUtil.bytes(key)); + // add columns of size slightly less than column_index_size to force insert column index + rm.add(new QueryPath(cfname, null, ByteBufferUtil.bytes(1)), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2); + rm.add(new QueryPath(cfname, null, ByteBufferUtil.bytes(6)), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]), 2); + ColumnFamily cf = rm.addOrGet(cfname); + // add RangeTombstones + cf.delete(new DeletionInfo(ByteBufferUtil.bytes(2), ByteBufferUtil.bytes(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); + cf.delete(new DeletionInfo(ByteBufferUtil.bytes(5), ByteBufferUtil.bytes(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); + rm.apply(); + cfs.forceBlockingFlush(); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + cfs.clearUnsafe(); + transfer(table, sstable); + + // confirm that a single SSTable was transferred and registered + assertEquals(1, cfs.getSSTables().size()); + + List<Row> rows = Util.getRangeSlice(cfs); + assertEquals(1, rows.size()); + } + @Test public void testTransferTable() throws Exception {
