Repository: cassandra
Updated Branches:
  refs/heads/trunk 3a6bcb5ae -> 8e115dac9


Fixes to index summary resampling on old sstable formats

Patch by Tyler Hobbs; reviewed by Benedict Elliot Smith for
CASSANDRA-8993


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7ff25f0d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7ff25f0d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7ff25f0d

Branch: refs/heads/trunk
Commit: 7ff25f0df55bf492e741730473b94bcba8ac6c0b
Parents: 93156d7
Author: Tyler Hobbs <[email protected]>
Authored: Thu Mar 26 17:36:19 2015 -0500
Committer: Tyler Hobbs <[email protected]>
Committed: Thu Mar 26 17:36:19 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../cassandra/io/sstable/Downsampling.java      | 22 ++---
 .../cassandra/io/sstable/IndexSummary.java      |  1 +
 .../io/sstable/IndexSummaryManager.java         | 15 ++-
 .../cassandra/io/sstable/SSTableReader.java     | 96 +++++++++++++++++---
 .../cassandra/io/sstable/IndexSummaryTest.java  |  2 +-
 6 files changed, 108 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9bc314d..dba397c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,7 @@
 2.1.4
+ * Avoid overwriting index summaries for sstables with an older format that
+   does not support downsampling; rebuild summaries on startup when this
+   is detected (CASSANDRA-8993)
  * Fix potential data loss in CompressedSequentialWriter (CASSANDRA-8949)
  * Make PasswordAuthenticator number of hashing rounds configurable 
(CASSANDRA-8085)
  * Fix AssertionError when binding nested collections in DELETE 
(CASSANDRA-8900)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/Downsampling.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Downsampling.java 
b/src/java/org/apache/cassandra/io/sstable/Downsampling.java
index 6842b25..8455d0b 100644
--- a/src/java/org/apache/cassandra/io/sstable/Downsampling.java
+++ b/src/java/org/apache/cassandra/io/sstable/Downsampling.java
@@ -79,8 +79,8 @@ public class Downsampling
      * Returns a list that can be used to translate current index summary 
indexes to their original index before
      * downsampling.  (This repeats every `samplingLevel`, so that's how many 
entries we return.)
      *
-     * For example, if [7, 15] is returned, the current index summary entry at 
index 0 was originally
-     * at index 7, and the current index 1 was originally at index 15.
+     * For example, if [0, 64] is returned, the current index summary entry at 
index 0 was originally
+     * at index 0, and the current index 1 was originally at index 64.
      *
      * @param samplingLevel the current sampling level for the index summary
      *
@@ -115,21 +115,11 @@ public class Downsampling
      */
     public static int getEffectiveIndexIntervalAfterIndex(int index, int 
samplingLevel, int minIndexInterval)
     {
-        assert index >= -1;
-        List<Integer> originalIndexes = getOriginalIndexes(samplingLevel);
-        if (index == -1)
-            return originalIndexes.get(0) * minIndexInterval;
-
+        assert index >= 0;
         index %= samplingLevel;
-        if (index == originalIndexes.size() - 1)
-        {
-            // account for partitions after the "last" entry as well as 
partitions before the "first" entry
-            return ((BASE_SAMPLING_LEVEL - originalIndexes.get(index)) + 
originalIndexes.get(0)) * minIndexInterval;
-        }
-        else
-        {
-            return (originalIndexes.get(index + 1) - 
originalIndexes.get(index)) * minIndexInterval;
-        }
+        List<Integer> originalIndexes = getOriginalIndexes(samplingLevel);
+        int nextEntryOriginalIndex = (index == originalIndexes.size() - 1) ? 
BASE_SAMPLING_LEVEL : originalIndexes.get(index + 1);
+        return (nextEntryOriginalIndex - originalIndexes.get(index)) * 
minIndexInterval;
     }
 
     public static int[] getStartPoints(int currentSamplingLevel, int 
newSamplingLevel)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index bad50b4..0ea0b48 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -86,6 +86,7 @@ public class IndexSummary extends WrappedSharedCloseable
         this.offsets = offsets;
         this.entries = entries;
         this.samplingLevel = samplingLevel;
+        assert samplingLevel > 0;
     }
 
     private IndexSummary(IndexSummary copy)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 4144c32..0c196ff 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -259,6 +259,17 @@ public class IndexSummaryManager implements 
IndexSummaryManagerMBean
         for (SSTableReader sstable : Iterables.concat(compacting, 
nonCompacting))
             total += sstable.getIndexSummaryOffHeapSize();
 
+        List<SSTableReader> oldFormatSSTables = new ArrayList<>();
+        for (SSTableReader sstable : nonCompacting)
+        {
+            // We can't change the sampling level of sstables with the old 
format, because the serialization format
+            // doesn't include the sampling level.  Leave this one as it is.  
(See CASSANDRA-8993 for details.)
+            logger.trace("SSTable {} cannot be re-sampled due to old sstable 
format", sstable);
+            if (!sstable.descriptor.version.hasSamplingLevel)
+                oldFormatSSTables.add(sstable);
+        }
+        nonCompacting.removeAll(oldFormatSSTables);
+
         logger.debug("Beginning redistribution of index summaries for {} 
sstables with memory pool size {} MB; current spaced used is {} MB",
                      nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, 
total / 1024.0 / 1024.0);
 
@@ -280,7 +291,7 @@ public class IndexSummaryManager implements 
IndexSummaryManagerMBean
         Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
 
         long remainingBytes = memoryPoolBytes;
-        for (SSTableReader sstable : compacting)
+        for (SSTableReader sstable : Iterables.concat(compacting, 
oldFormatSSTables))
             remainingBytes -= sstable.getIndexSummaryOffHeapSize();
 
         logger.trace("Index summaries for compacting SSTables are using {} MB 
of space",
@@ -288,7 +299,7 @@ public class IndexSummaryManager implements 
IndexSummaryManagerMBean
         List<SSTableReader> newSSTables = 
adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
 
         total = 0;
-        for (SSTableReader sstable : Iterables.concat(compacting, newSSTables))
+        for (SSTableReader sstable : Iterables.concat(compacting, 
oldFormatSSTables, newSSTables))
             total += sstable.getIndexSummaryOffHeapSize();
         logger.debug("Completed resizing of index summaries; current 
approximate memory used: {} MB",
                      total / 1024.0 / 1024.0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index f5eef09..8fd7b85 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -26,16 +26,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -708,13 +699,39 @@ public class SSTableReader extends SSTable implements 
SelfRefCounted<SSTableRead
                                          : 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 
         boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
+        boolean builtSummary = false;
         if (recreateBloomFilter || !summaryLoaded)
+        {
             buildSummary(recreateBloomFilter, ibuilder, dbuilder, 
summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
+            builtSummary = true;
+        }
 
         ifile = 
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
-        if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) 
// save summary information to disk
+
+        // Check for an index summary that was downsampled even though the 
serialization format doesn't support
+        // that.  If it was downsampled, rebuild it.  See CASSANDRA-8993 for 
details.
+        if (!descriptor.version.hasSamplingLevel && !builtSummary && 
!validateSummarySamplingLevel())
+        {
+            indexSummary.close();
+            ifile.close();
+            dfile.close();
+
+            logger.info("Detected erroneously downsampled index summary; will 
rebuild summary at full sampling");
+            FileUtils.deleteWithConfirm(new 
File(descriptor.filenameFor(Component.SUMMARY)));
+            ibuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+            dbuilder = compression
+                       ? SegmentedFile.getCompressedBuilder()
+                       : 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+            buildSummary(false, ibuilder, dbuilder, false, 
Downsampling.BASE_SAMPLING_LEVEL);
+            ifile = 
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+            dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+            saveSummary(ibuilder, dbuilder);
+        }
+        else if (saveSummaryIfCreated && builtSummary)
+        {
             saveSummary(ibuilder, dbuilder);
+        }
     }
 
     /**
@@ -800,7 +817,9 @@ public class SSTableReader extends SSTable implements 
SelfRefCounted<SSTableRead
         try
         {
             iStream = new DataInputStream(new FileInputStream(summariesFile));
-            indexSummary = IndexSummary.serializer.deserialize(iStream, 
partitioner, descriptor.version.hasSamplingLevel, 
metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
+            indexSummary = IndexSummary.serializer.deserialize(
+                    iStream, partitioner, descriptor.version.hasSamplingLevel,
+                    metadata.getMinIndexInterval(), 
metadata.getMaxIndexInterval());
             first = 
partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
             last = 
partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
             ibuilder.deserializeBounds(iStream);
@@ -826,6 +845,57 @@ public class SSTableReader extends SSTable implements 
SelfRefCounted<SSTableRead
     }
 
     /**
+     * Validates that an index summary has full sampling, as expected when the 
serialization format does not support
+     * persisting the sampling level.
+     * @return true if the summary has full sampling, false otherwise
+     */
+    private boolean validateSummarySamplingLevel()
+    {
+        // We need to check index summary entries against the index to verify 
that none of them were dropped due to
+        // downsampling.  Downsampling can drop any of the first 
BASE_SAMPLING_LEVEL entries (repeating that drop pattern
+        // for the remainder of the summary).  Unfortunately, the first entry 
to be dropped is the entry at
+        // index (BASE_SAMPLING_LEVEL - 1), so we need to check a full set of 
BASE_SAMPLING_LEVEL entries.
+        Iterator<FileDataInput> segments = ifile.iterator(0);
+        int i = 0;
+        int summaryEntriesChecked = 0;
+        int expectedIndexInterval = getMinIndexInterval();
+        while (segments.hasNext())
+        {
+            FileDataInput in = segments.next();
+            try
+            {
+                while (!in.isEOF())
+                {
+                    ByteBuffer indexKey = 
ByteBufferUtil.readWithShortLength(in);
+                    if (i % expectedIndexInterval == 0)
+                    {
+                        ByteBuffer summaryKey = 
ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval));
+                        if (!summaryKey.equals(indexKey))
+                            return false;
+                        summaryEntriesChecked++;
+
+                        if (summaryEntriesChecked == 
Downsampling.BASE_SAMPLING_LEVEL)
+                            return true;
+                    }
+                    RowIndexEntry.Serializer.skip(in);
+                    i++;
+                }
+            }
+            catch (IOException e)
+            {
+                markSuspect();
+                throw new CorruptSSTableException(e, in.getPath());
+            }
+            finally
+            {
+                FileUtils.closeQuietly(in);
+            }
+        }
+
+        return true;
+    }
+
+    /**
      * Save index summary to Summary.db file.
      *
      * @param ibuilder
@@ -946,6 +1016,8 @@ public class SSTableReader extends SSTable implements 
SelfRefCounted<SSTableRead
      */
     public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore 
parent, int samplingLevel) throws IOException
     {
+        assert descriptor.version.hasSamplingLevel;
+
         synchronized (tidy.global)
         {
             assert openReason != OpenReason.EARLY;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java 
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 0760aa3..9ed5b32 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -257,7 +257,7 @@ public class IndexSummaryTest
         assertEquals(128, BASE_SAMPLING_LEVEL);
         assertEquals(Arrays.asList(0, 32, 64, 96), 
Downsampling.getOriginalIndexes(4));
         assertEquals(Arrays.asList(0, 64), Downsampling.getOriginalIndexes(2));
-        assertEquals(Arrays.asList(), Downsampling.getOriginalIndexes(0));
+        assertEquals(Arrays.asList(0), Downsampling.getOriginalIndexes(1));
     }
 
     @Test

Reply via email to