Updated Branches: refs/heads/trunk 770c20127 -> 267690a14
fix Summary component and caches to use correct partitioner patch by Sam Tunnicliffe and Pavel Yaskevich; reviewed by Pavel Yaskevich for CASSANDRA-4289 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/267690a1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/267690a1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/267690a1 Branch: refs/heads/trunk Commit: 267690a140ae7d6dfba169448654f9d15ba2b98e Parents: 770c201 Author: Pavel Yaskevich <[email protected]> Authored: Fri Jun 1 18:10:21 2012 +0300 Committer: Pavel Yaskevich <[email protected]> Committed: Fri Jun 1 18:52:46 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/io/sstable/IndexSummary.java | 5 ++- .../apache/cassandra/io/sstable/SSTableReader.java | 6 ++-- .../org/apache/cassandra/service/CacheService.java | 9 +++-- test/data/serialization/1.2/db.RowMutation.bin | Bin 3602 -> 3410 bytes .../cassandra/io/sstable/SSTableReaderTest.java | 29 +++++++++++++++ 6 files changed, 41 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/267690a1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6dd4abc..121e46e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,7 @@ * Set thrift HSHA server thread limit to unlimet by default (CASSANDRA-4277) * Avoids double serialization of CF id in RowMutation messages (CASSANDRA-4293) + * fix Summary component and caches to use correct partitioner (CASSANDRA-4289) 1.1.1-dev http://git-wip-us.apache.org/repos/asf/cassandra/blob/267690a1/src/java/org/apache/cassandra/io/sstable/IndexSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java index 3cac781..0721621 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -110,7 +111,7 @@ public class IndexSummary } } - public IndexSummary deserialize(DataInput dis) throws IOException + public IndexSummary deserialize(DataInput dis, IPartitioner partitioner) throws IOException { IndexSummary summary = new IndexSummary(); if (dis.readInt() != DatabaseDescriptor.getIndexInterval()) @@ -121,7 +122,7 @@ public class IndexSummary { long location = dis.readLong(); ByteBuffer key = ByteBufferUtil.readWithLength(dis); - summary.addEntry(StorageService.getPartitioner().decorateKey(key), location); + summary.addEntry(partitioner.decorateKey(key), location); } return summary; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/267690a1/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 362ce3c..71526e3 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -401,9 +401,9 @@ public class SSTableReader extends SSTable try { iStream = new DataInputStream(new FileInputStream(summariesFile)); - reader.indexSummary = IndexSummary.serializer.deserialize(iStream); - reader.first = decodeKey(StorageService.getPartitioner(), reader.descriptor, ByteBufferUtil.readWithLength(iStream)); - reader.last = decodeKey(StorageService.getPartitioner(), reader.descriptor, ByteBufferUtil.readWithLength(iStream)); + reader.indexSummary = IndexSummary.serializer.deserialize(iStream, reader.partitioner); + reader.first = decodeKey(reader.partitioner, reader.descriptor, ByteBufferUtil.readWithLength(iStream)); + reader.last = decodeKey(reader.partitioner, reader.descriptor, ByteBufferUtil.readWithLength(iStream)); ibuilder.deserializeBounds(iStream); dbuilder.deserializeBounds(iStream); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/267690a1/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index e66d995..deda078 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -309,7 +309,7 @@ public class CacheService implements CacheServiceMBean public Pair<RowCacheKey, IRowCacheEntry> deserialize(DataInputStream in, ColumnFamilyStore store) throws IOException { ByteBuffer buffer = ByteBufferUtil.readWithLength(in); - DecoratedKey key = StorageService.getPartitioner().decorateKey(buffer); + DecoratedKey key = store.partitioner.decorateKey(buffer); ColumnFamily data = store.getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(store.columnFamily)), Integer.MIN_VALUE, true); return new Pair<RowCacheKey, IRowCacheEntry>(new RowCacheKey(store.metadata.cfId, key), data); } @@ -319,7 +319,7 @@ public class CacheService implements CacheServiceMBean { for (ByteBuffer key : buffers) { - DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + DecoratedKey dk = store.partitioner.decorateKey(key); ColumnFamily data = store.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, new QueryPath(store.columnFamily)), Integer.MIN_VALUE, true); rowCache.put(new RowCacheKey(store.metadata.cfId, dk), data); } @@ -356,7 +356,7 @@ public class CacheService implements CacheServiceMBean if (input.readBoolean()) entry = RowIndexEntry.serializer.deserialize(input, reader.descriptor.version); else - entry = reader.getPosition(StorageService.getPartitioner().decorateKey(key), Operator.EQ); + entry = reader.getPosition(reader.partitioner.decorateKey(key), Operator.EQ); return new Pair<KeyCacheKey, RowIndexEntry>(new KeyCacheKey(reader.descriptor, key), entry); } @@ -375,7 +375,8 @@ public class CacheService implements CacheServiceMBean { for (ByteBuffer key : buffers) { - DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + DecoratedKey dk = store.partitioner.decorateKey(key); + for (SSTableReader sstable : store.getSSTables()) { RowIndexEntry entry = sstable.getPosition(dk, Operator.EQ); http://git-wip-us.apache.org/repos/asf/cassandra/blob/267690a1/test/data/serialization/1.2/db.RowMutation.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/1.2/db.RowMutation.bin b/test/data/serialization/1.2/db.RowMutation.bin index ed0aba5..83b5328 100644 Binary files a/test/data/serialization/1.2/db.RowMutation.bin and b/test/data/serialization/1.2/db.RowMutation.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/267690a1/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index 544f9e6..0492fd2 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -41,12 +41,15 @@ import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.dht.LocalToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.MmappedSegmentedFile; +import org.apache.cassandra.io.util.SegmentedFile; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.thrift.IndexOperator; @@ -284,6 +287,32 @@ public class SSTableReaderTest extends SchemaLoader assert target.last.equals(lastKey); } + @Test + public void testLoadingSummaryUsesCorrectPartitioner() throws Exception + { + Table table = Table.open("Keyspace1"); + ColumnFamilyStore store = table.getColumnFamilyStore("Indexed1"); + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1")); + RowMutation rm = new RowMutation("Keyspace1", key); + rm.add(new QueryPath("Indexed1", null, ByteBufferUtil.bytes("birthdate")), ByteBufferUtil.bytes(1L), System.currentTimeMillis()); + rm.apply(); + store.forceBlockingFlush(); + + ColumnFamilyStore indexCfs = store.indexManager.getIndexForColumn(ByteBufferUtil.bytes("birthdate")).getIndexCfs(); + assert indexCfs.partitioner instanceof LocalPartitioner; + SSTableReader sstable = indexCfs.getSSTables().iterator().next(); + assert sstable.first.token instanceof LocalToken; + + SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + SegmentedFile.Builder dbuilder = sstable.compression + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + SSTableReader.saveSummary(sstable, ibuilder, dbuilder); + + SSTableReader reopened = SSTableReader.open(sstable.descriptor); + assert reopened.first.token instanceof LocalToken; + } + private void assertIndexQueryWorks(ColumnFamilyStore indexedCFS) throws IOException { assert "Indexed1".equals(indexedCFS.getColumnFamilyName());
