Avoid assertion error when IndexSummary > 2G

patch by Stefania Alborghetti; reviewed by krummas for CASSANDRA-12014


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ae88fd6c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ae88fd6c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ae88fd6c

Branch: refs/heads/cassandra-3.11
Commit: ae88fd6c79b066f12ad76c2c1bfc1620d86bdbc5
Parents: d34f479
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Fri Sep 8 09:33:20 2017 +0800
Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Committed: Fri Sep 8 12:37:45 2017 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../io/sstable/IndexSummaryBuilder.java         |  70 ++++++++---
 .../cassandra/io/sstable/IndexSummaryTest.java  | 121 ++++++++++++++++++-
 3 files changed, 176 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae88fd6c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4302fdf..f4360be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -95,6 +95,7 @@ Merged from 2.2:
  * Legacy caching options can prevent 3.0 upgrade (CASSANDRA-13384)
  * Nodetool upgradesstables/scrub/compact ignores system tables 
(CASSANDRA-13410)
  * Fix NPE issue in StorageService (CASSANDRA-13060)
+ * Avoid assertion error when IndexSummary > 2G (CASSANDRA-12014)
 Merged from 2.2:
  * Avoid starting gossiper in RemoveTest (CASSANDRA-13407)
  * Fix weightedSize() for row-cache reported by JMX and NodeTool 
(CASSANDRA-13393)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae88fd6c/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 6110afe..e3006b3 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -25,7 +25,9 @@ import java.util.TreeMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.io.util.SafeMemoryWriter;
@@ -36,6 +38,9 @@ public class IndexSummaryBuilder implements AutoCloseable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(IndexSummaryBuilder.class);
 
+    static final String defaultExpectedKeySizeName = Config.PROPERTY_PREFIX + 
"index_summary_expected_key_size";
+    static long defaultExpectedKeySize = 
Long.valueOf(System.getProperty(defaultExpectedKeySizeName, "64"));
+
     // the offset in the keys memory region to look for a given summary 
boundary
     private final SafeMemoryWriter offsets;
     private final SafeMemoryWriter entries;
@@ -85,20 +90,30 @@ public class IndexSummaryBuilder implements AutoCloseable
         }
     }
 
+    /**
+     * Build an index summary builder.
+     *
+     * @param expectedKeys - the number of keys we expect in the sstable
+     * @param minIndexInterval - the minimum interval between entries selected 
for sampling
+     * @param samplingLevel - the level at which entries are sampled
+     */
     public IndexSummaryBuilder(long expectedKeys, int minIndexInterval, int 
samplingLevel)
     {
         this.samplingLevel = samplingLevel;
         this.startPoints = Downsampling.getStartPoints(BASE_SAMPLING_LEVEL, 
samplingLevel);
 
+        long expectedEntrySize = getEntrySize(defaultExpectedKeySize);
         long maxExpectedEntries = expectedKeys / minIndexInterval;
-        if (maxExpectedEntries > Integer.MAX_VALUE)
+        long maxExpectedEntriesSize = maxExpectedEntries * expectedEntrySize;
+        if (maxExpectedEntriesSize > Integer.MAX_VALUE)
         {
             // that's a _lot_ of keys, and a very low min index interval
-            int effectiveMinInterval = (int) Math.ceil((double) 
Integer.MAX_VALUE / expectedKeys);
+            int effectiveMinInterval = (int) Math.ceil((double)(expectedKeys * 
expectedEntrySize) / Integer.MAX_VALUE);
             maxExpectedEntries = expectedKeys / effectiveMinInterval;
-            assert maxExpectedEntries <= Integer.MAX_VALUE : 
maxExpectedEntries;
-            logger.warn("min_index_interval of {} is too low for {} expected 
keys; using interval of {} instead",
-                        minIndexInterval, expectedKeys, effectiveMinInterval);
+            maxExpectedEntriesSize = maxExpectedEntries * expectedEntrySize;
+            assert maxExpectedEntriesSize <= Integer.MAX_VALUE : 
maxExpectedEntriesSize;
+            logger.warn("min_index_interval of {} is too low for {} expected 
keys of avg size {}; using interval of {} instead",
+                        minIndexInterval, expectedKeys, 
defaultExpectedKeySize, effectiveMinInterval);
             this.minIndexInterval = effectiveMinInterval;
         }
         else
@@ -109,13 +124,30 @@ public class IndexSummaryBuilder implements AutoCloseable
         // for initializing data structures, adjust our estimates based on the 
sampling level
         maxExpectedEntries = Math.max(1, (maxExpectedEntries * samplingLevel) 
/ BASE_SAMPLING_LEVEL);
         offsets = new SafeMemoryWriter(4 * 
maxExpectedEntries).order(ByteOrder.nativeOrder());
-        entries = new SafeMemoryWriter(40 * 
maxExpectedEntries).order(ByteOrder.nativeOrder());
+        entries = new SafeMemoryWriter(expectedEntrySize * 
maxExpectedEntries).order(ByteOrder.nativeOrder());
 
         // the summary will always contain the first index entry (downsampling 
will never remove it)
         nextSamplePosition = 0;
         indexIntervalMatches++;
     }
 
+    /**
+     * Given a key, return how long the serialized index summary entry will be.
+     */
+    private static long getEntrySize(DecoratedKey key)
+    {
+        return getEntrySize(key.getKey().remaining());
+    }
+
+    /**
+     * Given a key size, return how long the serialized index summary entry 
will be, that is add 8 bytes to
+     * accomodate for the size of the position.
+     */
+    private static long getEntrySize(long keySize)
+    {
+        return keySize + TypeSizes.sizeof(0L);
+    }
+
     // the index file has been flushed to the provided position; stash it and 
use that to recalculate our max readable boundary
     public void markIndexSynced(long upToPosition)
     {
@@ -169,21 +201,29 @@ public class IndexSummaryBuilder implements AutoCloseable
     {
         if (keysWritten == nextSamplePosition)
         {
-            assert entries.length() <= Integer.MAX_VALUE;
-            offsets.writeInt((int) entries.length());
-            entries.write(decoratedKey.getKey());
-            entries.writeLong(indexStart);
-            setNextSamplePosition(keysWritten);
+            if ((entries.length() + getEntrySize(decoratedKey)) <= 
Integer.MAX_VALUE)
+            {
+                offsets.writeInt((int) entries.length());
+                entries.write(decoratedKey.getKey());
+                entries.writeLong(indexStart);
+                setNextSamplePosition(keysWritten);
+            }
+            else
+            {
+                // we cannot fully sample this sstable due to too much memory 
in the index summary, so let's tell the user
+                logger.error("Memory capacity of index summary exceeded (2GB), 
index summary will not cover full sstable, " +
+                             "you should increase min_sampling_level");
+            }
         }
         else if (dataEnd != 0 && keysWritten + 1 == nextSamplePosition)
         {
             // this is the last key in this summary interval, so stash it
-            ReadableBoundary boundary = new ReadableBoundary(decoratedKey, 
indexEnd, dataEnd, (int)(offsets.length() / 4), entries.length());
+            ReadableBoundary boundary = new ReadableBoundary(decoratedKey, 
indexEnd, dataEnd, (int) (offsets.length() / 4), entries.length());
             lastReadableByData.put(dataEnd, boundary);
             lastReadableByIndex.put(indexEnd, boundary);
         }
-        keysWritten++;
 
+        keysWritten++;
         return this;
     }
 
@@ -251,12 +291,12 @@ public class IndexSummaryBuilder implements AutoCloseable
         return accumulate;
     }
 
-    public static int entriesAtSamplingLevel(int samplingLevel, int 
maxSummarySize)
+    static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
     {
         return (int) Math.ceil((samplingLevel * maxSummarySize) / (double) 
BASE_SAMPLING_LEVEL);
     }
 
-    public static int calculateSamplingLevel(int currentSamplingLevel, int 
currentNumEntries, long targetNumEntries, int minIndexInterval, int 
maxIndexInterval)
+    static int calculateSamplingLevel(int currentSamplingLevel, int 
currentNumEntries, long targetNumEntries, int minIndexInterval, int 
maxIndexInterval)
     {
         // effective index interval == (BASE_SAMPLING_LEVEL / samplingLevel) * 
minIndexInterval
         // so we can just solve for minSamplingLevel here:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae88fd6c/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java 
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index baa6fad..ad08ba0 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Lists;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.Util;
@@ -43,7 +44,125 @@ import static org.junit.Assert.*;
 
 public class IndexSummaryTest
 {
-    IPartitioner partitioner = Util.testPartitioner();
+    private final static Random random = new Random();
+    private final static IPartitioner partitioner = Util.testPartitioner();
+
+    @BeforeClass
+    public static void setup()
+    {
+        final long seed = System.nanoTime();
+        System.out.println("Using seed: " + seed);
+        random.setSeed(seed);
+    }
+
+    @Test
+    public void testIndexSummaryKeySizes() throws IOException
+    {
+        testIndexSummaryProperties(32, 100);
+        testIndexSummaryProperties(64, 100);
+        testIndexSummaryProperties(100, 100);
+        testIndexSummaryProperties(1000, 100);
+        testIndexSummaryProperties(10000, 100);
+    }
+
+    private void testIndexSummaryProperties(int keySize, int numKeys) throws 
IOException
+    {
+        final int minIndexInterval = 1;
+        final List<DecoratedKey> keys = new ArrayList<>(numKeys);
+
+        try (IndexSummaryBuilder builder = new IndexSummaryBuilder(numKeys, 
minIndexInterval, BASE_SAMPLING_LEVEL))
+        {
+            for (int i = 0; i < numKeys; i++)
+            {
+                byte[] randomBytes = new byte[keySize];
+                random.nextBytes(randomBytes);
+                DecoratedKey key = 
partitioner.decorateKey(ByteBuffer.wrap(randomBytes));
+                keys.add(key);
+                builder.maybeAddEntry(key, i);
+            }
+
+            try(IndexSummary indexSummary = builder.build(partitioner))
+            {
+                assertEquals(numKeys, keys.size());
+                assertEquals(minIndexInterval, 
indexSummary.getMinIndexInterval());
+                assertEquals(numKeys, indexSummary.getMaxNumberOfEntries());
+                assertEquals(numKeys + 1, indexSummary.getEstimatedKeyCount());
+
+                for (int i = 0; i < numKeys; i++)
+                    assertEquals(keys.get(i).getKey(), 
ByteBuffer.wrap(indexSummary.getKey(i)));
+            }
+        }
+    }
+
+    /**
+     * Test an index summary whose total size is bigger than 2GB,
+     * the index summary builder should log an error but it should still
+     * create an index summary, albeit one that does not cover the entire 
sstable.
+     */
+    @Test
+    public void tesLargeIndexSummary() throws IOException
+    {
+        final int numKeys = 1000000;
+        final int keySize = 3000;
+        final int minIndexInterval = 1;
+
+        try (IndexSummaryBuilder builder = new IndexSummaryBuilder(numKeys, 
minIndexInterval, BASE_SAMPLING_LEVEL))
+        {
+            for (int i = 0; i < numKeys; i++)
+            {
+                byte[] randomBytes = new byte[keySize];
+                random.nextBytes(randomBytes);
+                DecoratedKey key = 
partitioner.decorateKey(ByteBuffer.wrap(randomBytes));
+                builder.maybeAddEntry(key, i);
+            }
+
+            try (IndexSummary indexSummary = builder.build(partitioner))
+            {
+                assertNotNull(indexSummary);
+                assertEquals(numKeys, indexSummary.getMaxNumberOfEntries());
+                assertEquals(numKeys + 1, indexSummary.getEstimatedKeyCount());
+            }
+        }
+    }
+
+    /**
+     * Test an index summary whose total size is bigger than 2GB,
+     * having updated IndexSummaryBuilder.defaultExpectedKeySize to match the 
size,
+     * the index summary should be downsampled automatically.
+     */
+    @Test
+    public void tesLargeIndexSummaryWithExpectedSizeMatching() throws 
IOException
+    {
+        final int numKeys = 1000000;
+        final int keySize = 3000;
+        final int minIndexInterval = 1;
+
+        long oldExpectedKeySize = IndexSummaryBuilder.defaultExpectedKeySize;
+        IndexSummaryBuilder.defaultExpectedKeySize = 3000;
+
+        try (IndexSummaryBuilder builder = new IndexSummaryBuilder(numKeys, 
minIndexInterval, BASE_SAMPLING_LEVEL))
+        {
+            for (int i = 0; i < numKeys; i++)
+            {
+                byte[] randomBytes = new byte[keySize];
+                random.nextBytes(randomBytes);
+                DecoratedKey key = 
partitioner.decorateKey(ByteBuffer.wrap(randomBytes));
+                builder.maybeAddEntry(key, i);
+            }
+
+            try (IndexSummary indexSummary = builder.build(partitioner))
+            {
+                assertNotNull(indexSummary);
+                assertEquals(minIndexInterval * 2, 
indexSummary.getMinIndexInterval());
+                assertEquals(numKeys / 2, 
indexSummary.getMaxNumberOfEntries());
+                assertEquals(numKeys + 2, indexSummary.getEstimatedKeyCount());
+            }
+        }
+        finally
+        {
+            IndexSummaryBuilder.defaultExpectedKeySize = oldExpectedKeySize;
+        }
+    }
 
     @Test
     public void testGetKey()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to