Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 604c9df88 -> d4e6f08d4
Always persist upsampled index summaries Patch by Ariel Weisberg; reviewed by Tyler Hobbs for CASSANDRA-10512 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d5c83f49 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d5c83f49 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d5c83f49 Branch: refs/heads/cassandra-3.0 Commit: d5c83f49148ad5f515b19364945260594dc3d27c Parents: af6bd1b Author: Ariel Weisberg <[email protected]> Authored: Thu Feb 11 17:53:02 2016 -0600 Committer: Tyler Hobbs <[email protected]> Committed: Thu Feb 11 17:53:02 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/sstable/format/SSTableReader.java | 18 +- .../cassandra/io/util/MmappedSegmentedFile.java | 10 + .../apache/cassandra/io/util/SegmentedFile.java | 10 + .../cassandra/io/sstable/SSTableReaderTest.java | 541 ---------------- .../io/sstable/format/SSTableReaderTest.java | 648 +++++++++++++++++++ 6 files changed, 681 insertions(+), 547 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1a4717d..fa25980 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.6 + * Always persist upsampled index summaries (CASSANDRA-10512) * (cqlsh) Fix inconsistent auto-complete (CASSANDRA-10733) * Make SELECT JSON and toJson() threadsafe (CASSANDRA-11048) * Fix SELECT on tuple relations for mixed ASC/DESC clustering order (CASSANDRA-7281) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 27ac87c..e81e4e9 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -1155,12 +1155,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { // we can use the existing index summary to make a smaller one newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner); - - try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false); - SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression)) - { - saveSummary(ibuilder, dbuilder, newSummary); - } } else { @@ -1168,6 +1162,18 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS "no adjustments to min/max_index_interval"); } + //Always save the resampled index + try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false); + SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression)) + { + for (long boundry : dfile.copyReadableBounds()) + dbuilder.addPotentialBoundary(boundry); + for (long boundry : ifile.copyReadableBounds()) + ibuilder.addPotentialBoundary(boundry); + + saveSummary(ibuilder, dbuilder, newSummary); + } + long newSize = bytesOnDisk(); StorageMetrics.load.inc(newSize - oldSize); parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java index 01f8370..70ac77a 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java @@ -102,6 +102,16 @@ public class MmappedSegmentedFile extends SegmentedFile return file; } + @Override + public long[] copyReadableBounds() + { + long[] bounds = new long[segments.length + 1]; + for (int i = 0; i < segments.length; i++) + bounds[i] = segments[i].left; + bounds[segments.length] = length; + return bounds; + } + private static final class Cleanup extends SegmentedFile.Cleanup { final Segment[] segments; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index 553cc0d..cb331de 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -155,6 +155,16 @@ public abstract class SegmentedFile extends SharedCloseableImpl } /** + * Retrieve the readable bounds if any so they can be cloned into other files such + * as when downsampling an index summary. Readable bounds are in between record locations in a file + * that are good positions for mapping the file such that records don't cross mappings. + */ + public long[] copyReadableBounds() + { + return new long[0]; + } + + /** * Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it. */ public static abstract class Builder implements AutoCloseable http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java deleted file mode 100644 index 682d999..0000000 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ /dev/null @@ -1,541 +0,0 @@ -package org.apache.cassandra.io.sstable; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; - -import com.google.common.collect.Sets; -import org.apache.cassandra.cache.CachingOptions; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.locator.SimpleStrategy; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.apache.cassandra.OrderedJUnit4ClassRunner; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.Operator; -import org.apache.cassandra.db.BufferDecoratedKey; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.IndexExpression; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.Row; -import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.composites.Composites; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.dht.LocalPartitioner; -import org.apache.cassandra.dht.LocalPartitioner.LocalToken; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.MmappedSegmentedFile; -import org.apache.cassandra.io.util.SegmentedFile; -import org.apache.cassandra.service.CacheService; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.Pair; -import static org.apache.cassandra.Util.cellname; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -@RunWith(OrderedJUnit4ClassRunner.class) -public class SSTableReaderTest -{ - public static final String KEYSPACE1 = "SSTableReaderTest"; - public static final String CF_STANDARD = "Standard1"; - public static final String CF_STANDARD2 = "Standard2"; - public static final String CF_INDEXED = "Indexed1"; - public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval"; - - static Token t(int i) - { - return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(String.valueOf(i))); - } - - @BeforeClass - public static void defineSchema() throws Exception - { - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, - SimpleStrategy.class, - KSMetaData.optsWithRF(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2), - SchemaLoader.indexCFMD(KEYSPACE1, CF_INDEXED, true), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDLOWINDEXINTERVAL) - .minIndexInterval(8) - .maxIndexInterval(256) - .caching(CachingOptions.NONE)); - } - - @Test - public void testGetPositionsForRanges() - { - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2"); - - // insert data and compact to a single sstable - CompactionManager.instance.disableAutoCompaction(); - for (int j = 0; j < 10; j++) - { - ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); - Mutation rm = new Mutation(KEYSPACE1, key); - rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); - rm.applyUnsafe(); - } - store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store, false); - - List<Range<Token>> ranges = new ArrayList<Range<Token>>(); - // 1 key - ranges.add(new Range<Token>(t(0), t(1))); - // 2 keys - ranges.add(new Range<Token>(t(2), t(4))); - // wrapping range from key to end - ranges.add(new Range<Token>(t(6), StorageService.getPartitioner().getMinimumToken())); - // empty range (should be ignored) - ranges.add(new Range<Token>(t(9), t(91))); - - // confirm that positions increase continuously - SSTableReader sstable = store.getSSTables().iterator().next(); - long previous = -1; - for (Pair<Long,Long> section : sstable.getPositionsForRanges(ranges)) - { - assert previous <= section.left : previous + " ! < " + section.left; - assert section.left < section.right : section.left + " ! < " + section.right; - previous = section.right; - } - } - - @Test - public void testSpannedIndexPositions() throws IOException - { - MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments - - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); - - // insert a bunch of data and compact to a single sstable - CompactionManager.instance.disableAutoCompaction(); - for (int j = 0; j < 100; j += 2) - { - ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); - Mutation rm = new Mutation(KEYSPACE1, key); - rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); - rm.applyUnsafe(); - } - store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store, false); - - // check that all our keys are found correctly - SSTableReader sstable = store.getSSTables().iterator().next(); - for (int j = 0; j < 100; j += 2) - { - DecoratedKey dk = Util.dk(String.valueOf(j)); - FileDataInput file = sstable.getFileDataInput(sstable.getPosition(dk, SSTableReader.Operator.EQ).position); - DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file)); - assert keyInDisk.equals(dk) : String.format("%s != %s in %s", keyInDisk, dk, file.getPath()); - } - - // check no false positives - for (int j = 1; j < 110; j += 2) - { - DecoratedKey dk = Util.dk(String.valueOf(j)); - assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == null; - } - } - - @Test - public void testPersistentStatistics() - { - - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); - - for (int j = 0; j < 100; j += 2) - { - ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); - Mutation rm = new Mutation(KEYSPACE1, key); - rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); - rm.applyUnsafe(); - } - store.forceBlockingFlush(); - - clearAndLoad(store); - assert store.metric.maxRowSize.getValue() != 0; - } - - private void clearAndLoad(ColumnFamilyStore cfs) - { - cfs.clearUnsafe(); - cfs.loadNewSSTables(); - } - - @Test - public void testReadRateTracking() - { - // try to make sure CASSANDRA-8239 never happens again - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); - - for (int j = 0; j < 10; j++) - { - ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); - Mutation rm = new Mutation(KEYSPACE1, key); - rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); - rm.apply(); - } - store.forceBlockingFlush(); - - SSTableReader sstable = store.getSSTables().iterator().next(); - assertEquals(0, sstable.getReadMeter().count()); - - DecoratedKey key = sstable.partitioner.decorateKey(ByteBufferUtil.bytes("4")); - store.getColumnFamily(key, Composites.EMPTY, Composites.EMPTY, false, 100, 100); - assertEquals(1, sstable.getReadMeter().count()); - store.getColumnFamily(key, cellname("0"), cellname("0"), false, 100, 100); - assertEquals(2, sstable.getReadMeter().count()); - store.getColumnFamily(Util.namesQueryFilter(store, key, cellname("0"))); - assertEquals(3, sstable.getReadMeter().count()); - } - - @Test - public void testGetPositionsForRangesWithKeyCache() - { - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2"); - CacheService.instance.keyCache.setCapacity(100); - - // insert data and compact to a single sstable - CompactionManager.instance.disableAutoCompaction(); - for (int j = 0; j < 10; j++) - { - ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); - Mutation rm = new Mutation(KEYSPACE1, key); - rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); - rm.applyUnsafe(); - } - store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store, false); - - SSTableReader sstable = store.getSSTables().iterator().next(); - long p2 = sstable.getPosition(k(2), SSTableReader.Operator.EQ).position; - long p3 = sstable.getPosition(k(3), SSTableReader.Operator.EQ).position; - long p6 = sstable.getPosition(k(6), SSTableReader.Operator.EQ).position; - long p7 = sstable.getPosition(k(7), SSTableReader.Operator.EQ).position; - - Pair<Long, Long> p = sstable.getPositionsForRanges(makeRanges(t(2), t(6))).get(0); - - // range are start exclusive so we should start at 3 - assert p.left == p3; - - // to capture 6 we have to stop at the start of 7 - assert p.right == p7; - } - - @Test - public void testPersistentStatisticsWithSecondaryIndex() - { - // Create secondary index and flush to disk - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore("Indexed1"); - ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1")); - Mutation rm = new Mutation(KEYSPACE1, key); - rm.add("Indexed1", cellname("birthdate"), ByteBufferUtil.bytes(1L), System.currentTimeMillis()); - rm.applyUnsafe(); - store.forceBlockingFlush(); - - // check if opening and querying works - assertIndexQueryWorks(store); - } - public void testGetPositionsKeyCacheStats() - { - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2"); - CacheService.instance.keyCache.setCapacity(1000); - - // insert data and compact to a single sstable - CompactionManager.instance.disableAutoCompaction(); - for (int j = 0; j < 10; j++) - { - ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); - Mutation rm = new Mutation("Keyspace1", key); - rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); - rm.apply(); - } - store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store, false); - - SSTableReader sstable = store.getSSTables().iterator().next(); - sstable.getPosition(k(2), SSTableReader.Operator.EQ); - assertEquals(0, sstable.getKeyCacheHit()); - assertEquals(1, sstable.getBloomFilterTruePositiveCount()); - sstable.getPosition(k(2), SSTableReader.Operator.EQ); - assertEquals(1, sstable.getKeyCacheHit()); - assertEquals(2, sstable.getBloomFilterTruePositiveCount()); - sstable.getPosition(k(15), SSTableReader.Operator.EQ); - assertEquals(1, sstable.getKeyCacheHit()); - assertEquals(2, sstable.getBloomFilterTruePositiveCount()); - - } - - - @Test - public void testOpeningSSTable() throws Exception - { - String ks = KEYSPACE1; - String cf = "Standard1"; - - // clear and create just one sstable for this test - Keyspace keyspace = Keyspace.open(ks); - ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf); - store.clearUnsafe(); - store.disableAutoCompaction(); - - DecoratedKey firstKey = null, lastKey = null; - long timestamp = System.currentTimeMillis(); - for (int i = 0; i < store.metadata.getMinIndexInterval(); i++) - { - DecoratedKey key = Util.dk(String.valueOf(i)); - if (firstKey == null) - firstKey = key; - if (lastKey == null) - lastKey = key; - if (store.metadata.getKeyValidator().compare(lastKey.getKey(), key.getKey()) < 0) - lastKey = key; - Mutation rm = new Mutation(ks, key.getKey()); - rm.add(cf, cellname("col"), - ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp); - rm.applyUnsafe(); - } - store.forceBlockingFlush(); - - SSTableReader sstable = store.getSSTables().iterator().next(); - Descriptor desc = sstable.descriptor; - - // test to see if sstable can be opened as expected - SSTableReader target = SSTableReader.open(desc); - Assert.assertEquals(target.getIndexSummarySize(), 1); - Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.getKey()), target.getIndexSummaryKey(0)); - assert target.first.equals(firstKey); - assert target.last.equals(lastKey); - target.selfRef().release(); - } - - @Test - public void testLoadingSummaryUsesCorrectPartitioner() throws Exception - { - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore("Indexed1"); - ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1")); - Mutation rm = new Mutation(KEYSPACE1, key); - rm.add("Indexed1", cellname("birthdate"), ByteBufferUtil.bytes(1L), System.currentTimeMillis()); - rm.applyUnsafe(); - store.forceBlockingFlush(); - - ColumnFamilyStore indexCfs = store.indexManager.getIndexForColumn(ByteBufferUtil.bytes("birthdate")).getIndexCfs(); - assert indexCfs.partitioner instanceof LocalPartitioner; - SSTableReader sstable = indexCfs.getSSTables().iterator().next(); - assert sstable.first.getToken() instanceof LocalToken; - - try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false); - SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), sstable.compression)) - { - sstable.saveSummary(ibuilder, dbuilder); - } - SSTableReader reopened = SSTableReader.open(sstable.descriptor); - assert reopened.first.getToken() instanceof LocalToken; - reopened.selfRef().release(); - } - - /** see CASSANDRA-5407 */ - @Test - public void testGetScannerForNoIntersectingRanges() throws Exception - { - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); - ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1")); - Mutation rm = new Mutation(KEYSPACE1, key); - rm.add("Standard1", cellname("xyz"), ByteBufferUtil.bytes("abc"), 0); - rm.applyUnsafe(); - store.forceBlockingFlush(); - boolean foundScanner = false; - for (SSTableReader s : store.getSSTables()) - { - try (ISSTableScanner scanner = s.getScanner(new Range<Token>(t(0), t(1)), null)) - { - scanner.next(); // throws exception pre 5407 - foundScanner = true; - } - } - assertTrue(foundScanner); - } - - @Test - public void testGetPositionsForRangesFromTableOpenedForBulkLoading() throws IOException - { - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2"); - - // insert data and compact to a single sstable. The - // number of keys inserted is greater than index_interval - // to ensure multiple segments in the index file - CompactionManager.instance.disableAutoCompaction(); - for (int j = 0; j < 130; j++) - { - ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); - Mutation rm = new Mutation(KEYSPACE1, key); - rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); - rm.applyUnsafe(); - } - store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store, false); - - // construct a range which is present in the sstable, but whose - // keys are not found in the first segment of the index. - List<Range<Token>> ranges = new ArrayList<Range<Token>>(); - ranges.add(new Range<Token>(t(98), t(99))); - - SSTableReader sstable = store.getSSTables().iterator().next(); - List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges); - assert sections.size() == 1 : "Expected to find range in sstable" ; - - // re-open the same sstable as it would be during bulk loading - Set<Component> components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX); - if (sstable.components.contains(Component.COMPRESSION_INFO)) - components.add(Component.COMPRESSION_INFO); - SSTableReader bulkLoaded = SSTableReader.openForBatch(sstable.descriptor, components, store.metadata, sstable.partitioner); - sections = bulkLoaded.getPositionsForRanges(ranges); - assert sections.size() == 1 : "Expected to find range in sstable opened for bulk loading"; - bulkLoaded.selfRef().release(); - } - - @Test - public void testIndexSummaryReplacement() throws IOException, ExecutionException, InterruptedException - { - Keyspace keyspace = Keyspace.open(KEYSPACE1); - final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching - CompactionManager.instance.disableAutoCompaction(); - - final int NUM_ROWS = 512; - for (int j = 0; j < NUM_ROWS; j++) - { - ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j)); - Mutation rm = new Mutation(KEYSPACE1, key); - rm.add("StandardLowIndexInterval", Util.cellname("0"), ByteBufferUtil.bytes(String.format("%3d", j)), j); - rm.applyUnsafe(); - } - store.forceBlockingFlush(); - CompactionManager.instance.performMaximal(store, false); - - Collection<SSTableReader> sstables = store.getSSTables(); - assert sstables.size() == 1; - final SSTableReader sstable = sstables.iterator().next(); - - ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5); - List<Future> futures = new ArrayList<>(NUM_ROWS * 2); - for (int i = 0; i < NUM_ROWS; i++) - { - final ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", i)); - final int index = i; - - futures.add(executor.submit(new Runnable() - { - public void run() - { - ColumnFamily result = store.getColumnFamily(sstable.partitioner.decorateKey(key), Composites.EMPTY, Composites.EMPTY, false, 100, 100); - assertFalse(result.isEmpty()); - assertEquals(0, ByteBufferUtil.compare(String.format("%3d", index).getBytes(), result.getColumn(Util.cellname("0")).value())); - } - })); - - futures.add(executor.submit(new Runnable() - { - public void run() - { - Iterable<DecoratedKey> results = store.keySamples( - new Range<>(sstable.partitioner.getMinimumToken(), sstable.partitioner.getToken(key))); - assertTrue(results.iterator().hasNext()); - } - })); - } - - SSTableReader replacement; - try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN)) - { - replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1); - txn.update(replacement, true); - txn.finish(); - } - for (Future future : futures) - future.get(); - - assertEquals(sstable.estimatedKeys(), replacement.estimatedKeys(), 1); - } - - private void assertIndexQueryWorks(ColumnFamilyStore indexedCFS) - { - assert "Indexed1".equals(indexedCFS.name); - - // make sure all sstables including 2ary indexes load from disk - for (ColumnFamilyStore cfs : indexedCFS.concatWithIndexes()) - clearAndLoad(cfs); - - // query using index to see if sstable for secondary index opens - IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), Operator.EQ, ByteBufferUtil.bytes(1L)); - List<IndexExpression> clause = Arrays.asList(expr); - Range<RowPosition> range = Util.range("", ""); - List<Row> rows = indexedCFS.search(range, clause, new IdentityQueryFilter(), 100); - assert rows.size() == 1; - } - - private List<Range<Token>> makeRanges(Token left, Token right) - { - return Arrays.asList(new Range<>(left, right)); - } - - private DecoratedKey k(int i) - { - return new BufferDecoratedKey(t(i), ByteBufferUtil.bytes(String.valueOf(i))); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java new file mode 100644 index 0000000..6d07f1c --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java @@ -0,0 +1,648 @@ +package org.apache.cassandra.io.sstable.format; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; + +import com.google.common.collect.Sets; +import org.apache.cassandra.cache.CachingOptions; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.SimpleStrategy; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IndexExpression; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.Row; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.composites.Composites; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.dht.LocalPartitioner.LocalToken; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.MmappedSegmentedFile; +import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; +import static org.apache.cassandra.Util.cellname; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(OrderedJUnit4ClassRunner.class) +public class SSTableReaderTest +{ + public static final String KEYSPACE1 = "SSTableReaderTest"; + public static final String CF_STANDARD = "Standard1"; + public static final String CF_STANDARD2 = "Standard2"; + public static final String CF_INDEXED = "Indexed1"; + public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval"; + + static Token t(int i) + { + return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(String.valueOf(i))); + } + + @BeforeClass + public static void defineSchema() throws Exception + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2), + SchemaLoader.indexCFMD(KEYSPACE1, CF_INDEXED, true), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDLOWINDEXINTERVAL) + .minIndexInterval(8) + .maxIndexInterval(256) + .caching(CachingOptions.NONE)); + } + + @Test + public void testGetPositionsForRanges() + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2"); + + // insert data and compact to a single sstable + CompactionManager.instance.disableAutoCompaction(); + for (int j = 0; j < 10; j++) + { + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); + rm.applyUnsafe(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + + List<Range<Token>> ranges = new ArrayList<Range<Token>>(); + // 1 key + ranges.add(new Range<Token>(t(0), t(1))); + // 2 keys + ranges.add(new Range<Token>(t(2), t(4))); + // wrapping range from key to end + ranges.add(new Range<Token>(t(6), StorageService.getPartitioner().getMinimumToken())); + // empty range (should be ignored) + ranges.add(new Range<Token>(t(9), t(91))); + + // confirm that positions increase continuously + SSTableReader sstable = store.getSSTables().iterator().next(); + long previous = -1; + for (Pair<Long,Long> section : sstable.getPositionsForRanges(ranges)) + { + assert previous <= section.left : previous + " ! < " + section.left; + assert section.left < section.right : section.left + " ! < " + section.right; + previous = section.right; + } + } + + @Test + public void testSpannedIndexPositions() throws IOException + { + long originalMaxSegmentSize = MmappedSegmentedFile.MAX_SEGMENT_SIZE; + MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments + + try + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); + + // insert a bunch of data and compact to a single sstable + CompactionManager.instance.disableAutoCompaction(); + for (int j = 0; j < 100; j += 2) + { + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); + rm.applyUnsafe(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + + // check that all our keys are found correctly + SSTableReader sstable = store.getSSTables().iterator().next(); + for (int j = 0; j < 100; j += 2) + { + DecoratedKey dk = Util.dk(String.valueOf(j)); + FileDataInput file = sstable.getFileDataInput(sstable.getPosition(dk, SSTableReader.Operator.EQ).position); + DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file)); + assert keyInDisk.equals(dk) : String.format("%s != %s in %s", keyInDisk, dk, file.getPath()); + } + + // check no false positives + for (int j = 1; j < 110; j += 2) + { + DecoratedKey dk = Util.dk(String.valueOf(j)); + assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == null; + } + } + finally + { + MmappedSegmentedFile.MAX_SEGMENT_SIZE = originalMaxSegmentSize; + } + } + + @Test + public void testPersistentStatistics() + { + + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); + + for (int j = 0; j < 100; j += 2) + { + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); + rm.applyUnsafe(); + } + store.forceBlockingFlush(); + + clearAndLoad(store); + assert store.metric.maxRowSize.getValue() != 0; + } + + private void clearAndLoad(ColumnFamilyStore cfs) + { + cfs.clearUnsafe(); + cfs.loadNewSSTables(); + } + + @Test + public void testReadRateTracking() + { + // try to make sure CASSANDRA-8239 never happens again + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); + + for (int j = 0; j < 10; j++) + { + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); + rm.apply(); + } + store.forceBlockingFlush(); + + SSTableReader sstable = store.getSSTables().iterator().next(); + assertEquals(0, sstable.getReadMeter().count()); + + DecoratedKey key = sstable.partitioner.decorateKey(ByteBufferUtil.bytes("4")); + store.getColumnFamily(key, Composites.EMPTY, Composites.EMPTY, false, 100, 100); + assertEquals(1, sstable.getReadMeter().count()); + store.getColumnFamily(key, cellname("0"), cellname("0"), false, 100, 100); + assertEquals(2, sstable.getReadMeter().count()); + store.getColumnFamily(Util.namesQueryFilter(store, key, cellname("0"))); + assertEquals(3, sstable.getReadMeter().count()); + } + + @Test + public void testGetPositionsForRangesWithKeyCache() + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2"); + CacheService.instance.keyCache.setCapacity(100); + + // insert data and compact to a single sstable + CompactionManager.instance.disableAutoCompaction(); + for (int j = 0; j < 10; j++) + { + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); + rm.applyUnsafe(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + + SSTableReader sstable = store.getSSTables().iterator().next(); + long p2 = sstable.getPosition(k(2), SSTableReader.Operator.EQ).position; + long p3 = sstable.getPosition(k(3), SSTableReader.Operator.EQ).position; + long p6 = sstable.getPosition(k(6), SSTableReader.Operator.EQ).position; + long p7 = sstable.getPosition(k(7), SSTableReader.Operator.EQ).position; + + Pair<Long, Long> p = sstable.getPositionsForRanges(makeRanges(t(2), t(6))).get(0); + + // range are start exclusive so we should start at 3 + assert p.left == p3; + + // to capture 6 we have to stop at the start of 7 + assert p.right == p7; + } + + @Test + public void testPersistentStatisticsWithSecondaryIndex() + { + // Create secondary index and flush to disk + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore("Indexed1"); + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1")); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("Indexed1", cellname("birthdate"), ByteBufferUtil.bytes(1L), System.currentTimeMillis()); + rm.applyUnsafe(); + store.forceBlockingFlush(); + + // check if opening and querying works + assertIndexQueryWorks(store); + } + public void testGetPositionsKeyCacheStats() + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2"); + CacheService.instance.keyCache.setCapacity(1000); + + // insert data and compact to a single sstable + CompactionManager.instance.disableAutoCompaction(); + for (int j = 0; j < 10; j++) + { + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); + Mutation rm = new Mutation("Keyspace1", key); + rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); + rm.apply(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + + SSTableReader sstable = store.getSSTables().iterator().next(); + sstable.getPosition(k(2), SSTableReader.Operator.EQ); + assertEquals(0, sstable.getKeyCacheHit()); + assertEquals(1, sstable.getBloomFilterTruePositiveCount()); + sstable.getPosition(k(2), SSTableReader.Operator.EQ); + assertEquals(1, sstable.getKeyCacheHit()); + assertEquals(2, sstable.getBloomFilterTruePositiveCount()); + sstable.getPosition(k(15), SSTableReader.Operator.EQ); + assertEquals(1, sstable.getKeyCacheHit()); + assertEquals(2, sstable.getBloomFilterTruePositiveCount()); + + } + + + @Test + public void testOpeningSSTable() throws Exception + { + String ks = KEYSPACE1; + String cf = "Standard1"; + + // clear and create just one sstable for this test + Keyspace keyspace = Keyspace.open(ks); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf); + store.clearUnsafe(); + store.disableAutoCompaction(); + + DecoratedKey firstKey = null, lastKey = null; + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < store.metadata.getMinIndexInterval(); i++) + { + DecoratedKey key = Util.dk(String.valueOf(i)); + if (firstKey == null) + firstKey = key; + if (lastKey == null) + lastKey = key; + if (store.metadata.getKeyValidator().compare(lastKey.getKey(), key.getKey()) < 0) + lastKey = key; + Mutation rm = new Mutation(ks, key.getKey()); + rm.add(cf, cellname("col"), + ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp); + rm.applyUnsafe(); + } + store.forceBlockingFlush(); + + SSTableReader sstable = store.getSSTables().iterator().next(); + Descriptor desc = sstable.descriptor; + + // test to see if sstable can be opened as expected + SSTableReader target = SSTableReader.open(desc); + Assert.assertEquals(target.getIndexSummarySize(), 1); + Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.getKey()), target.getIndexSummaryKey(0)); + assert target.first.equals(firstKey); + assert target.last.equals(lastKey); + target.selfRef().release(); + } + + @Test + public void testLoadingSummaryUsesCorrectPartitioner() throws Exception + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore("Indexed1"); + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1")); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("Indexed1", cellname("birthdate"), ByteBufferUtil.bytes(1L), System.currentTimeMillis()); + rm.applyUnsafe(); + store.forceBlockingFlush(); + + ColumnFamilyStore indexCfs = store.indexManager.getIndexForColumn(ByteBufferUtil.bytes("birthdate")).getIndexCfs(); + assert indexCfs.partitioner instanceof LocalPartitioner; + SSTableReader sstable = indexCfs.getSSTables().iterator().next(); + assert sstable.first.getToken() instanceof LocalToken; + + try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false); + SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), sstable.compression)) + { + sstable.saveSummary(ibuilder, dbuilder); + } + SSTableReader reopened = SSTableReader.open(sstable.descriptor); + assert reopened.first.getToken() instanceof LocalToken; + reopened.selfRef().release(); + } + + /** see CASSANDRA-5407 */ + @Test + public void testGetScannerForNoIntersectingRanges() throws Exception + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1")); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("Standard1", cellname("xyz"), ByteBufferUtil.bytes("abc"), 0); + rm.applyUnsafe(); + store.forceBlockingFlush(); + boolean foundScanner = false; + for (SSTableReader s : store.getSSTables()) + { + try (ISSTableScanner scanner = s.getScanner(new Range<Token>(t(0), t(1)), null)) + { + scanner.next(); // throws exception pre 5407 + foundScanner = true; + } + } + assertTrue(foundScanner); + } + + @Test + public void testGetPositionsForRangesFromTableOpenedForBulkLoading() throws IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2"); + + // insert data and compact to a single sstable. The + // number of keys inserted is greater than index_interval + // to ensure multiple segments in the index file + CompactionManager.instance.disableAutoCompaction(); + for (int j = 0; j < 130; j++) + { + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); + rm.applyUnsafe(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + + // construct a range which is present in the sstable, but whose + // keys are not found in the first segment of the index. + List<Range<Token>> ranges = new ArrayList<Range<Token>>(); + ranges.add(new Range<Token>(t(98), t(99))); + + SSTableReader sstable = store.getSSTables().iterator().next(); + List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges); + assert sections.size() == 1 : "Expected to find range in sstable" ; + + // re-open the same sstable as it would be during bulk loading + Set<Component> components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX); + if (sstable.compression) + components.add(Component.COMPRESSION_INFO); + SSTableReader bulkLoaded = SSTableReader.openForBatch(sstable.descriptor, components, store.metadata, sstable.partitioner); + sections = bulkLoaded.getPositionsForRanges(ranges); + assert sections.size() == 1 : "Expected to find range in sstable opened for bulk loading"; + bulkLoaded.selfRef().release(); + } + + @Test + public void testIndexSummaryReplacement() throws IOException, ExecutionException, InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching + CompactionManager.instance.disableAutoCompaction(); + + final int NUM_ROWS = 512; + for (int j = 0; j < NUM_ROWS; j++) + { + ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j)); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("StandardLowIndexInterval", Util.cellname("0"), ByteBufferUtil.bytes(String.format("%3d", j)), j); + rm.applyUnsafe(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + + Collection<SSTableReader> sstables = store.getSSTables(); + assert sstables.size() == 1; + final SSTableReader sstable = sstables.iterator().next(); + + ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5); + List<Future> futures = new ArrayList<>(NUM_ROWS * 2); + for (int i = 0; i < NUM_ROWS; i++) + { + final ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", i)); + final int index = i; + + futures.add(executor.submit(new Runnable() + { + public void run() + { + ColumnFamily result = store.getColumnFamily(sstable.partitioner.decorateKey(key), Composites.EMPTY, Composites.EMPTY, false, 100, 100); + assertFalse(result.isEmpty()); + assertEquals(0, ByteBufferUtil.compare(String.format("%3d", index).getBytes(), result.getColumn(Util.cellname("0")).value())); + } + })); + + futures.add(executor.submit(new Runnable() + { + public void run() + { + Iterable<DecoratedKey> results = store.keySamples( + new Range<>(sstable.partitioner.getMinimumToken(), sstable.partitioner.getToken(key))); + assertTrue(results.iterator().hasNext()); + } + })); + } + + SSTableReader replacement; + try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN)) + { + replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1); + txn.update(replacement, true); + txn.finish(); + } + for (Future future : futures) + future.get(); + + assertEquals(sstable.estimatedKeys(), replacement.estimatedKeys(), 1); + } + + @Test + public void testIndexSummaryUpsampleAndReload() throws Exception + { + long originalMaxSegmentSize = MmappedSegmentedFile.MAX_SEGMENT_SIZE; + MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments + + try + { + testIndexSummaryUpsampleAndReload0(); + } + finally + { + MmappedSegmentedFile.MAX_SEGMENT_SIZE = originalMaxSegmentSize; + } + } + + private void testIndexSummaryUpsampleAndReload0() throws Exception + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching + CompactionManager.instance.disableAutoCompaction(); + + final int NUM_ROWS = 512; + for (int j = 0; j < NUM_ROWS; j++) + { + ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j)); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("StandardLowIndexInterval", Util.cellname("0"), ByteBufferUtil.bytes(String.format("%3d", j)), j); + rm.applyUnsafe(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + + Collection<SSTableReader> sstables = store.getSSTables(); + assert sstables.size() == 1; + final SSTableReader sstable = sstables.iterator().next(); + + try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN)) + { + SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, sstable.getIndexSummarySamplingLevel() + 1); + txn.update(replacement, true); + txn.finish(); + } + SSTableReader reopen = SSTableReader.open(sstable.descriptor); + assert reopen.getIndexSummarySamplingLevel() == sstable.getIndexSummarySamplingLevel() + 1; + } + + @Test + public void testIndexSummaryDownsampleAndReload() throws Exception + { + long originalMaxSegmentSize = MmappedSegmentedFile.MAX_SEGMENT_SIZE; + MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments + + try + { + testIndexSummaryDownsampleAndReload0(); + } + finally + { + MmappedSegmentedFile.MAX_SEGMENT_SIZE = originalMaxSegmentSize; + } + } + + private void testIndexSummaryDownsampleAndReload0() throws Exception + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching + CompactionManager.instance.disableAutoCompaction(); + + final int NUM_ROWS = 512; + for (int j = 0; j < NUM_ROWS; j++) + { + ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j)); + Mutation rm = new Mutation(KEYSPACE1, key); + rm.add("StandardLowIndexInterval", Util.cellname("0"), ByteBufferUtil.bytes(String.format("%3d", j)), j); + rm.applyUnsafe(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + + Collection<SSTableReader> sstables = store.getSSTables(); + assert sstables.size() == 1; + final SSTableReader sstable = sstables.iterator().next(); + + try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN)) + { + SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, sstable.getIndexSummarySamplingLevel() / 2); + txn.update(replacement, true); + txn.finish(); + } + SSTableReader reopen = SSTableReader.open(sstable.descriptor); + assert Arrays.equals(sstable.ifile.copyReadableBounds(), reopen.ifile.copyReadableBounds()); + assert Arrays.equals(sstable.dfile.copyReadableBounds(), reopen.dfile.copyReadableBounds()); + } + + + private void assertIndexQueryWorks(ColumnFamilyStore indexedCFS) + { + assert "Indexed1".equals(indexedCFS.name); + + // make sure all sstables including 2ary indexes load from disk + for (ColumnFamilyStore cfs : indexedCFS.concatWithIndexes()) + clearAndLoad(cfs); + + // query using index to see if sstable for secondary index opens + IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), Operator.EQ, ByteBufferUtil.bytes(1L)); + List<IndexExpression> clause = Arrays.asList(expr); + Range<RowPosition> range = Util.range("", ""); + List<Row> rows = indexedCFS.search(range, clause, new IdentityQueryFilter(), 100); + assert rows.size() == 1; + } + + private List<Range<Token>> makeRanges(Token left, Token right) + { + return Arrays.asList(new Range<>(left, right)); + } + + private DecoratedKey k(int i) + { + return new BufferDecoratedKey(t(i), ByteBufferUtil.bytes(String.valueOf(i))); + } +}
