Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1667494
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1667494
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1667494

Branch: refs/heads/cassandra-3.0
Commit: f16674949592b518ba9da837b70665df10832e9b
Parents: 7dd6b7d fc7075a
Author: Marcus Eriksson <marc...@apache.org>
Authored: Fri Dec 11 17:32:31 2015 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Fri Dec 11 17:32:31 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/compaction/CompactionInfo.java |  14 +-
 .../db/compaction/CompactionManager.java        |  14 +
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../io/sstable/IndexSummaryManager.java         | 279 +--------------
 .../io/sstable/IndexSummaryRedistribution.java  | 349 +++++++++++++++++++
 .../io/sstable/IndexSummaryManagerTest.java     |  83 ++++-
 7 files changed, 459 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 30a76a9,2ee8b07..5da0d42
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,31 -1,10 +1,32 @@@
 -2.1.13
 - * Allow cancellation of index summary redistribution (CASSANDRA-8805)
 +2.2.5
 + * Fix regression in split size on CqlInputFormat (CASSANDRA-10835)
 + * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
   * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
 + * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
 +Merged from 2.1:
++ * Allow cancellation of index summary redistribution (CASSANDRA-8805)
   * Fix Stress profile parsing on Windows (CASSANDRA-10808)
  
 -
 -2.1.12
 +2.2.4
 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
 + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
 + * Reject index queries while the index is building (CASSANDRA-8505)
 + * CQL.textile syntax incorrectly includes optional keyspace for aggregate 
SFUNC and FINALFUNC (CASSANDRA-10747)
 + * Fix JSON update with prepared statements (CASSANDRA-10631)
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait 
(CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large 
buffers (CASSANDRA-10592)
 +Merged from 2.1:
   * Fix incremental repair hang when replica is down (CASSANDRA-10288)
   * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
   * Optimize the way we check if a token is repaired in anticompaction 
(CASSANDRA-10768)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 65f93c0,9bddaf5..ba9c25e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1345,7 -1222,21 +1345,21 @@@ public class CompactionManager implemen
          return executor.submit(runnable);
      }
  
+     public List<SSTableReader> 
runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws 
IOException
+     {
+         metrics.beginCompaction(redistribution);
+ 
+         try
+         {
+             return redistribution.redistributeSummaries();
+         }
+         finally
+         {
+             metrics.finishCompaction(redistribution);
+         }
+     }
+ 
 -    static int getDefaultGcBefore(ColumnFamilyStore cfs)
 +    public static int getDefaultGcBefore(ColumnFamilyStore cfs)
      {
          // 2ndary indexes have ExpiringColumns too, so we need to purge 
tombstones deleted before now. We do not need to
          // add any GcGrace however since 2ndary indexes are local to a node.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/OperationType.java
index a14f13f,475b591..6b66ded
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@@ -32,7 -32,7 +32,8 @@@ public enum OperationTyp
      TOMBSTONE_COMPACTION("Tombstone Compaction"),
      UNKNOWN("Unknown compaction type"),
      ANTICOMPACTION("Anticompaction after repair"),
-     VERIFY("Verify");
++    VERIFY("Verify"),
+     INDEX_SUMMARY("Index summary redistribution");
  
      private final String type;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 1dd3a4e,be5cc3c..4438dc1
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -26,19 -30,19 +26,20 @@@ import javax.management.MBeanServer
  import javax.management.ObjectName;
  
  import com.google.common.annotations.VisibleForTesting;
- import com.google.common.collect.*;
- 
- import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import com.google.common.collect.HashMultimap;
 -import com.google.common.collect.Lists;
 -import com.google.common.collect.Multimap;
++import com.google.common.collect.ImmutableSet;
+ import com.google.common.collect.Sets;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DataTracker;
 +import org.apache.cassandra.db.compaction.OperationType;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.lifecycle.View;
+ import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.WrappedRunnable;
  
@@@ -57,13 -59,13 +56,6 @@@ public class IndexSummaryManager implem
      private int resizeIntervalInMinutes = 0;
      private long memoryPoolBytes;
  
--    // The target (or ideal) number of index summary entries must differ from 
the actual number of
--    // entries by this ratio in order to trigger an upsample or downsample of 
the summary.  Because
--    // upsampling requires reading the primary index in order to rebuild the 
summary, the threshold
--    // for upsampling is is higher.
--    static final double UPSAMPLE_THRESHOLD = 1.5;
--    static final double DOWNSAMPLE_THESHOLD = 0.75;
--
      private final DebuggableScheduledThreadPoolExecutor executor;
  
      // our next scheduled resizing run
@@@ -251,267 -249,8 +243,8 @@@
       * @return a list of new SSTableReader instances
       */
      @VisibleForTesting
 -    public static List<SSTableReader> 
redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> 
nonCompacting, long memoryPoolBytes) throws IOException
 +    public static List<SSTableReader> 
redistributeSummaries(List<SSTableReader> compacting, Map<UUID, 
LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException
      {
-         logger.info("Redistributing index summaries");
-         List<SSTableReader> oldFormatSSTables = new ArrayList<>();
-         List<SSTableReader> redistribute = new ArrayList<>();
-         for (LifecycleTransaction txn : transactions.values())
-         {
-             for (SSTableReader sstable : 
ImmutableList.copyOf(txn.originals()))
-             {
-                 // We can't change the sampling level of sstables with the 
old format, because the serialization format
-                 // doesn't include the sampling level.  Leave this one as it 
is.  (See CASSANDRA-8993 for details.)
-                 logger.trace("SSTable {} cannot be re-sampled due to old 
sstable format", sstable);
-                 if (!sstable.descriptor.version.hasSamplingLevel())
-                 {
-                     oldFormatSSTables.add(sstable);
-                     txn.cancel(sstable);
-                 }
-             }
-             redistribute.addAll(txn.originals());
-         }
- 
-         long total = 0;
-         for (SSTableReader sstable : Iterables.concat(compacting, 
redistribute))
-             total += sstable.getIndexSummaryOffHeapSize();
- 
-         logger.trace("Beginning redistribution of index summaries for {} 
sstables with memory pool size {} MB; current spaced used is {} MB",
-                      redistribute.size(), memoryPoolBytes / 1024L / 1024L, 
total / 1024.0 / 1024.0);
- 
-         final Map<SSTableReader, Double> readRates = new 
HashMap<>(redistribute.size());
-         double totalReadsPerSec = 0.0;
-         for (SSTableReader sstable : redistribute)
-         {
-             if (sstable.getReadMeter() != null)
-             {
-                 Double readRate = sstable.getReadMeter().fifteenMinuteRate();
-                 totalReadsPerSec += readRate;
-                 readRates.put(sstable, readRate);
-             }
-         }
-         logger.trace("Total reads/sec across all sstables in index summary 
resize process: {}", totalReadsPerSec);
- 
-         // copy and sort by read rates (ascending)
-         List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
-         Collections.sort(sstablesByHotness, new 
ReadRateComparator(readRates));
- 
-         long remainingBytes = memoryPoolBytes;
-         for (SSTableReader sstable : Iterables.concat(compacting, 
oldFormatSSTables))
-             remainingBytes -= sstable.getIndexSummaryOffHeapSize();
- 
-         logger.trace("Index summaries for compacting SSTables are using {} MB 
of space",
-                      (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
-         List<SSTableReader> newSSTables = 
adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, 
remainingBytes);
- 
-         for (LifecycleTransaction txn : transactions.values())
-             txn.finish();
- 
-         total = 0;
-         for (SSTableReader sstable : Iterables.concat(compacting, 
oldFormatSSTables, newSSTables))
-             total += sstable.getIndexSummaryOffHeapSize();
-         logger.trace("Completed resizing of index summaries; current 
approximate memory used: {} MB",
-                      total / 1024.0 / 1024.0);
- 
-         return newSSTables;
-     }
- 
-     private static List<SSTableReader> 
adjustSamplingLevels(List<SSTableReader> sstables, Map<UUID, 
LifecycleTransaction> transactions,
-                                                             double 
totalReadsPerSec, long memoryPoolCapacity) throws IOException
-     {
- 
-         List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 
4);
-         List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
-         List<ResampleEntry> forceResample = new ArrayList<>();
-         List<ResampleEntry> forceUpsample = new ArrayList<>();
-         List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
- 
-         // Going from the coldest to the hottest sstables, try to give each 
sstable an amount of space proportional
-         // to the number of total reads/sec it handles.
-         long remainingSpace = memoryPoolCapacity;
-         for (SSTableReader sstable : sstables)
-         {
-             int minIndexInterval = sstable.metadata.getMinIndexInterval();
-             int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
- 
-             double readsPerSec = sstable.getReadMeter() == null ? 0.0 : 
sstable.getReadMeter().fifteenMinuteRate();
-             long idealSpace = Math.round(remainingSpace * (readsPerSec / 
totalReadsPerSec));
- 
-             // figure out how many entries our idealSpace would buy us, and 
pick a new sampling level based on that
-             int currentNumEntries = sstable.getIndexSummarySize();
-             double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / 
(double) currentNumEntries;
-             long targetNumEntries = Math.max(1, Math.round(idealSpace / 
avgEntrySize));
-             int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
-             int maxSummarySize = sstable.getMaxIndexSummarySize();
- 
-             // if the min_index_interval changed, calculate what our current 
sampling level would be under the new min
-             if (sstable.getMinIndexInterval() != minIndexInterval)
-             {
-                 int effectiveSamplingLevel = (int) 
Math.round(currentSamplingLevel * (minIndexInterval / (double) 
sstable.getMinIndexInterval()));
-                 maxSummarySize = (int) Math.round(maxSummarySize * 
(sstable.getMinIndexInterval() / (double) minIndexInterval));
-                 logger.trace("min_index_interval changed from {} to {}, so 
the current sampling level for {} is effectively now {} (was {})",
-                              sstable.getMinIndexInterval(), minIndexInterval, 
sstable, effectiveSamplingLevel, currentSamplingLevel);
-                 currentSamplingLevel = effectiveSamplingLevel;
-             }
- 
-             int newSamplingLevel = 
IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, 
currentNumEntries, targetNumEntries,
-                     minIndexInterval, maxIndexInterval);
-             int numEntriesAtNewSamplingLevel = 
IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
-             double effectiveIndexInterval = 
sstable.getEffectiveIndexInterval();
- 
-             logger.trace("{} has {} reads/sec; ideal space for index summary: 
{} bytes ({} entries); considering moving " +
-                     "from level {} ({} entries, {} bytes) to level {} ({} 
entries, {} bytes)",
-                     sstable.getFilename(), readsPerSec, idealSpace, 
targetNumEntries, currentSamplingLevel, currentNumEntries,
-                     currentNumEntries * avgEntrySize, newSamplingLevel, 
numEntriesAtNewSamplingLevel,
-                     numEntriesAtNewSamplingLevel * avgEntrySize);
- 
-             if (effectiveIndexInterval < minIndexInterval)
-             {
-                 // The min_index_interval was changed; re-sample to match it.
-                 logger.trace("Forcing resample of {} because the current 
index interval ({}) is below min_index_interval ({})",
-                         sstable, effectiveIndexInterval, minIndexInterval);
-                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
-                 forceResample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
-                 remainingSpace -= spaceUsed;
-             }
-             else if (effectiveIndexInterval > maxIndexInterval)
-             {
-                 // The max_index_interval was lowered; force an upsample to 
the effective minimum sampling level
-                 logger.trace("Forcing upsample of {} because the current 
index interval ({}) is above max_index_interval ({})",
-                         sstable, effectiveIndexInterval, maxIndexInterval);
-                 newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * 
minIndexInterval) / maxIndexInterval);
-                 numEntriesAtNewSamplingLevel = 
IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, 
sstable.getMaxIndexSummarySize());
-                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
-                 forceUpsample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
-                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
-             }
-             else if (targetNumEntries >= currentNumEntries * 
UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
-             {
-                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
-                 toUpsample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
-                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
-             }
-             else if (targetNumEntries < currentNumEntries * 
DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
-             {
-                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
-                 toDownsample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
-                 remainingSpace -= spaceUsed;
-             }
-             else
-             {
-                 // keep the same sampling level
-                 logger.trace("SSTable {} is within thresholds of ideal 
sampling", sstable);
-                 remainingSpace -= sstable.getIndexSummaryOffHeapSize();
-                 newSSTables.add(sstable);
-                 transactions.get(sstable.metadata.cfId).cancel(sstable);
-             }
-             totalReadsPerSec -= readsPerSec;
-         }
- 
-         if (remainingSpace > 0)
-         {
-             Pair<List<SSTableReader>, List<ResampleEntry>> result = 
distributeRemainingSpace(toDownsample, remainingSpace);
-             toDownsample = result.right;
-             newSSTables.addAll(result.left);
-             for (SSTableReader sstable : result.left)
-                 transactions.get(sstable.metadata.cfId).cancel(sstable);
-         }
- 
-         // downsample first, then upsample
-         toDownsample.addAll(forceResample);
-         toDownsample.addAll(toUpsample);
-         toDownsample.addAll(forceUpsample);
-         for (ResampleEntry entry : toDownsample)
-         {
-             SSTableReader sstable = entry.sstable;
-             logger.trace("Re-sampling index summary for {} from {}/{} to 
{}/{} of the original number of entries",
-                          sstable, sstable.getIndexSummarySamplingLevel(), 
Downsampling.BASE_SAMPLING_LEVEL,
-                          entry.newSamplingLevel, 
Downsampling.BASE_SAMPLING_LEVEL);
-             ColumnFamilyStore cfs = 
Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
-             SSTableReader replacement = 
sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
-             newSSTables.add(replacement);
-             transactions.get(sstable.metadata.cfId).update(replacement, true);
-         }
- 
-         return newSSTables;
-     }
- 
-     @VisibleForTesting
-     static Pair<List<SSTableReader>, List<ResampleEntry>> 
distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
-     {
-         // sort by the amount of space regained by doing the downsample 
operation; we want to try to avoid operations
-         // that will make little difference.
-         Collections.sort(toDownsample, new Comparator<ResampleEntry>()
-         {
-             public int compare(ResampleEntry o1, ResampleEntry o2)
-             {
-                 return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() 
- o1.newSpaceUsed,
-                                       o2.sstable.getIndexSummaryOffHeapSize() 
- o2.newSpaceUsed);
-             }
-         });
- 
-         int noDownsampleCutoff = 0;
-         List<SSTableReader> willNotDownsample = new ArrayList<>();
-         while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
-         {
-             ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
- 
-             long extraSpaceRequired = 
entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
-             // see if we have enough leftover space to keep the current 
sampling level
-             if (extraSpaceRequired <= remainingSpace)
-             {
-                 logger.trace("Using leftover space to keep {} at the current 
sampling level ({})",
-                              entry.sstable, 
entry.sstable.getIndexSummarySamplingLevel());
-                 willNotDownsample.add(entry.sstable);
-                 remainingSpace -= extraSpaceRequired;
-             }
-             else
-             {
-                 break;
-             }
- 
-             noDownsampleCutoff++;
-         }
-         return Pair.create(willNotDownsample, 
toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
-     }
- 
-     private static class ResampleEntry
-     {
-         public final SSTableReader sstable;
-         public final long newSpaceUsed;
-         public final int newSamplingLevel;
- 
-         public ResampleEntry(SSTableReader sstable, long newSpaceUsed, int 
newSamplingLevel)
-         {
-             this.sstable = sstable;
-             this.newSpaceUsed = newSpaceUsed;
-             this.newSamplingLevel = newSamplingLevel;
-         }
-     }
- 
-     /** Utility class for sorting sstables by their read rates. */
-     private static class ReadRateComparator implements 
Comparator<SSTableReader>
-     {
-         private final Map<SSTableReader, Double> readRates;
- 
-         public ReadRateComparator(Map<SSTableReader, Double> readRates)
-         {
-             this.readRates = readRates;
-         }
- 
-         @Override
-         public int compare(SSTableReader o1, SSTableReader o2)
-         {
-             Double readRate1 = readRates.get(o1);
-             Double readRate2 = readRates.get(o2);
-             if (readRate1 == null && readRate2 == null)
-                 return 0;
-             else if (readRate1 == null)
-                 return -1;
-             else if (readRate2 == null)
-                 return 1;
-             else
-                 return Double.compare(readRate1, readRate2);
-         }
 -        return CompactionManager.instance.runIndexSummaryRedistribution(new 
IndexSummaryRedistribution(compacting, nonCompacting, memoryPoolBytes));
++        return CompactionManager.instance.runIndexSummaryRedistribution(new 
IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes));
      }
- }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 0000000,adb3e4e..aad479b
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@@ -1,0 -1,338 +1,349 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.io.sstable;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
++import java.util.UUID;
+ 
+ import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.collect.HashMultimap;
++import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Iterables;
 -import com.google.common.collect.Multimap;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DataTracker;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+ import org.apache.cassandra.db.compaction.OperationType;
++import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.Pair;
+ 
+ import static 
org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+ 
+ public class IndexSummaryRedistribution extends CompactionInfo.Holder
+ {
+     private static final Logger logger = 
LoggerFactory.getLogger(IndexSummaryRedistribution.class);
+ 
++    // The target (or ideal) number of index summary entries must differ from 
the actual number of
++    // entries by this ratio in order to trigger an upsample or downsample of 
the summary.  Because
++    // upsampling requires reading the primary index in order to rebuild the 
summary, the threshold
++    // for upsampling is is higher.
++    static final double UPSAMPLE_THRESHOLD = 1.5;
++    static final double DOWNSAMPLE_THESHOLD = 0.75;
++
+     private final List<SSTableReader> compacting;
 -    private final List<SSTableReader> nonCompacting;
++    private final Map<UUID, LifecycleTransaction> transactions;
+     private final long memoryPoolBytes;
++    private final UUID compactionId;
+     private volatile long remainingSpace;
+ 
 -    public IndexSummaryRedistribution(List<SSTableReader> compacting, 
List<SSTableReader> nonCompacting, long memoryPoolBytes)
++    public IndexSummaryRedistribution(List<SSTableReader> compacting, 
Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes)
+     {
+         this.compacting = compacting;
 -        this.nonCompacting = nonCompacting;
++        this.transactions = transactions;
+         this.memoryPoolBytes = memoryPoolBytes;
++        this.compactionId = UUID.randomUUID();
+     }
+ 
+     public List<SSTableReader> redistributeSummaries() throws IOException
+     {
 -        long total = 0;
 -        for (SSTableReader sstable : Iterables.concat(compacting, 
nonCompacting))
 -            total += sstable.getIndexSummaryOffHeapSize();
 -
++        logger.info("Redistributing index summaries");
+         List<SSTableReader> oldFormatSSTables = new ArrayList<>();
 -        for (SSTableReader sstable : nonCompacting)
++        List<SSTableReader> redistribute = new ArrayList<>();
++        for (LifecycleTransaction txn : transactions.values())
+         {
 -            // We can't change the sampling level of sstables with the old 
format, because the serialization format
 -            // doesn't include the sampling level.  Leave this one as it is.  
(See CASSANDRA-8993 for details.)
 -            logger.trace("SSTable {} cannot be re-sampled due to old sstable 
format", sstable);
 -            if (!sstable.descriptor.version.hasSamplingLevel)
 -                oldFormatSSTables.add(sstable);
++            for (SSTableReader sstable : 
ImmutableList.copyOf(txn.originals()))
++            {
++                // We can't change the sampling level of sstables with the 
old format, because the serialization format
++                // doesn't include the sampling level.  Leave this one as it 
is.  (See CASSANDRA-8993 for details.)
++                logger.trace("SSTable {} cannot be re-sampled due to old 
sstable format", sstable);
++                if (!sstable.descriptor.version.hasSamplingLevel())
++                {
++                    oldFormatSSTables.add(sstable);
++                    txn.cancel(sstable);
++                }
++            }
++            redistribute.addAll(txn.originals());
+         }
 -        nonCompacting.removeAll(oldFormatSSTables);
+ 
 -        logger.debug("Beginning redistribution of index summaries for {} 
sstables with memory pool size {} MB; current spaced used is {} MB",
 -                     nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, 
total / 1024.0 / 1024.0);
++        long total = 0;
++        for (SSTableReader sstable : Iterables.concat(compacting, 
redistribute))
++            total += sstable.getIndexSummaryOffHeapSize();
+ 
 -        final Map<SSTableReader, Double> readRates = new 
HashMap<>(nonCompacting.size());
++        logger.trace("Beginning redistribution of index summaries for {} 
sstables with memory pool size {} MB; current spaced used is {} MB",
++                     redistribute.size(), memoryPoolBytes / 1024L / 1024L, 
total / 1024.0 / 1024.0);
++
++        final Map<SSTableReader, Double> readRates = new 
HashMap<>(redistribute.size());
+         double totalReadsPerSec = 0.0;
 -        for (SSTableReader sstable : nonCompacting)
++        for (SSTableReader sstable : redistribute)
+         {
+             if (isStopRequested())
+                 throw new CompactionInterruptedException(getCompactionInfo());
+ 
+             if (sstable.getReadMeter() != null)
+             {
+                 Double readRate = sstable.getReadMeter().fifteenMinuteRate();
+                 totalReadsPerSec += readRate;
+                 readRates.put(sstable, readRate);
+             }
+         }
+         logger.trace("Total reads/sec across all sstables in index summary 
resize process: {}", totalReadsPerSec);
+ 
+         // copy and sort by read rates (ascending)
 -        List<SSTableReader> sstablesByHotness = new 
ArrayList<>(nonCompacting);
++        List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
+         Collections.sort(sstablesByHotness, new 
ReadRateComparator(readRates));
+ 
+         long remainingBytes = memoryPoolBytes;
+         for (SSTableReader sstable : Iterables.concat(compacting, 
oldFormatSSTables))
+             remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+ 
+         logger.trace("Index summaries for compacting SSTables are using {} MB 
of space",
+                      (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
 -        List<SSTableReader> newSSTables = 
adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
++        List<SSTableReader> newSSTables = 
adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, 
remainingBytes);
++
++        for (LifecycleTransaction txn : transactions.values())
++            txn.finish();
+ 
+         total = 0;
+         for (SSTableReader sstable : Iterables.concat(compacting, 
oldFormatSSTables, newSSTables))
+             total += sstable.getIndexSummaryOffHeapSize();
 -        logger.debug("Completed resizing of index summaries; current 
approximate memory used: {} MB",
++        logger.trace("Completed resizing of index summaries; current 
approximate memory used: {} MB",
+                      total / 1024.0 / 1024.0);
+ 
+         return newSSTables;
+     }
+ 
+     private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> 
sstables,
++                                                     Map<UUID, 
LifecycleTransaction> transactions,
+                                                      double totalReadsPerSec, 
long memoryPoolCapacity) throws IOException
+     {
 -
+         List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 
4);
+         List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+         List<ResampleEntry> forceResample = new ArrayList<>();
+         List<ResampleEntry> forceUpsample = new ArrayList<>();
+         List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+ 
+         // Going from the coldest to the hottest sstables, try to give each 
sstable an amount of space proportional
+         // to the number of total reads/sec it handles.
+         remainingSpace = memoryPoolCapacity;
+         for (SSTableReader sstable : sstables)
+         {
+             if (isStopRequested())
+                 throw new CompactionInterruptedException(getCompactionInfo());
+ 
+             int minIndexInterval = sstable.metadata.getMinIndexInterval();
+             int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
+ 
+             double readsPerSec = sstable.getReadMeter() == null ? 0.0 : 
sstable.getReadMeter().fifteenMinuteRate();
+             long idealSpace = Math.round(remainingSpace * (readsPerSec / 
totalReadsPerSec));
+ 
+             // figure out how many entries our idealSpace would buy us, and 
pick a new sampling level based on that
+             int currentNumEntries = sstable.getIndexSummarySize();
+             double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / 
(double) currentNumEntries;
+             long targetNumEntries = Math.max(1, Math.round(idealSpace / 
avgEntrySize));
+             int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+             int maxSummarySize = sstable.getMaxIndexSummarySize();
+ 
+             // if the min_index_interval changed, calculate what our current 
sampling level would be under the new min
+             if (sstable.getMinIndexInterval() != minIndexInterval)
+             {
+                 int effectiveSamplingLevel = (int) 
Math.round(currentSamplingLevel * (minIndexInterval / (double) 
sstable.getMinIndexInterval()));
+                 maxSummarySize = (int) Math.round(maxSummarySize * 
(sstable.getMinIndexInterval() / (double) minIndexInterval));
+                 logger.trace("min_index_interval changed from {} to {}, so 
the current sampling level for {} is effectively now {} (was {})",
+                              sstable.getMinIndexInterval(), minIndexInterval, 
sstable, effectiveSamplingLevel, currentSamplingLevel);
+                 currentSamplingLevel = effectiveSamplingLevel;
+             }
+ 
+             int newSamplingLevel = 
IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, 
currentNumEntries, targetNumEntries,
 -                                                                              
minIndexInterval, maxIndexInterval);
++                    minIndexInterval, maxIndexInterval);
+             int numEntriesAtNewSamplingLevel = 
IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
+             double effectiveIndexInterval = 
sstable.getEffectiveIndexInterval();
+ 
+             logger.trace("{} has {} reads/sec; ideal space for index summary: 
{} bytes ({} entries); considering moving " +
 -                         "from level {} ({} entries, {} bytes) to level {} 
({} entries, {} bytes)",
 -                         sstable.getFilename(), readsPerSec, idealSpace, 
targetNumEntries, currentSamplingLevel, currentNumEntries,
 -                         currentNumEntries * avgEntrySize, newSamplingLevel, 
numEntriesAtNewSamplingLevel,
 -                         numEntriesAtNewSamplingLevel * avgEntrySize);
++                    "from level {} ({} entries, {} bytes) to level {} ({} 
entries, {} bytes)",
++                    sstable.getFilename(), readsPerSec, idealSpace, 
targetNumEntries, currentSamplingLevel, currentNumEntries,
++                    currentNumEntries * avgEntrySize, newSamplingLevel, 
numEntriesAtNewSamplingLevel,
++                    numEntriesAtNewSamplingLevel * avgEntrySize);
+ 
+             if (effectiveIndexInterval < minIndexInterval)
+             {
+                 // The min_index_interval was changed; re-sample to match it.
 -                logger.debug("Forcing resample of {} because the current 
index interval ({}) is below min_index_interval ({})",
 -                             sstable, effectiveIndexInterval, 
minIndexInterval);
++                logger.trace("Forcing resample of {} because the current 
index interval ({}) is below min_index_interval ({})",
++                        sstable, effectiveIndexInterval, minIndexInterval);
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
+                 forceResample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
+                 remainingSpace -= spaceUsed;
+             }
+             else if (effectiveIndexInterval > maxIndexInterval)
+             {
+                 // The max_index_interval was lowered; force an upsample to 
the effective minimum sampling level
 -                logger.debug("Forcing upsample of {} because the current 
index interval ({}) is above max_index_interval ({})",
 -                             sstable, effectiveIndexInterval, 
maxIndexInterval);
++                logger.trace("Forcing upsample of {} because the current 
index interval ({}) is above max_index_interval ({})",
++                        sstable, effectiveIndexInterval, maxIndexInterval);
+                 newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * 
minIndexInterval) / maxIndexInterval);
+                 numEntriesAtNewSamplingLevel = 
IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, 
sstable.getMaxIndexSummarySize());
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
+                 forceUpsample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
+                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+             }
 -            else if (targetNumEntries >= currentNumEntries * 
IndexSummaryManager.UPSAMPLE_THRESHOLD && newSamplingLevel > 
currentSamplingLevel)
++            else if (targetNumEntries >= currentNumEntries * 
UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
+             {
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
+                 toUpsample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
+                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+             }
 -            else if (targetNumEntries < currentNumEntries * 
IndexSummaryManager.DOWNSAMPLE_THESHOLD && newSamplingLevel < 
currentSamplingLevel)
++            else if (targetNumEntries < currentNumEntries * 
DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
+             {
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
+                 toDownsample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
+                 remainingSpace -= spaceUsed;
+             }
+             else
+             {
+                 // keep the same sampling level
+                 logger.trace("SSTable {} is within thresholds of ideal 
sampling", sstable);
+                 remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+                 newSSTables.add(sstable);
++                transactions.get(sstable.metadata.cfId).cancel(sstable);
+             }
+             totalReadsPerSec -= readsPerSec;
+         }
+ 
+         if (remainingSpace > 0)
+         {
+             Pair<List<SSTableReader>, List<ResampleEntry>> result = 
distributeRemainingSpace(toDownsample, remainingSpace);
+             toDownsample = result.right;
+             newSSTables.addAll(result.left);
++            for (SSTableReader sstable : result.left)
++                transactions.get(sstable.metadata.cfId).cancel(sstable);
+         }
+ 
+         // downsample first, then upsample
+         toDownsample.addAll(forceResample);
+         toDownsample.addAll(toUpsample);
+         toDownsample.addAll(forceUpsample);
 -        Multimap<DataTracker, SSTableReader> replacedByTracker = 
HashMultimap.create();
 -        Multimap<DataTracker, SSTableReader> replacementsByTracker = 
HashMultimap.create();
 -
 -        try
++        for (ResampleEntry entry : toDownsample)
+         {
 -            for (ResampleEntry entry : toDownsample)
 -            {
 -                if (isStopRequested())
 -                    throw new 
CompactionInterruptedException(getCompactionInfo());
 -
 -                SSTableReader sstable = entry.sstable;
 -                logger.debug("Re-sampling index summary for {} from {}/{} to 
{}/{} of the original number of entries",
 -                             sstable, sstable.getIndexSummarySamplingLevel(), 
Downsampling.BASE_SAMPLING_LEVEL,
 -                             entry.newSamplingLevel, 
Downsampling.BASE_SAMPLING_LEVEL);
 -                ColumnFamilyStore cfs = 
Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
 -                DataTracker tracker = cfs.getDataTracker();
 -                SSTableReader replacement = 
sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
 -                newSSTables.add(replacement);
 -                replacedByTracker.put(tracker, sstable);
 -                replacementsByTracker.put(tracker, replacement);
 -            }
 -        }
 -        finally
 -        {
 -            for (DataTracker tracker : replacedByTracker.keySet())
 -                
tracker.replaceWithNewInstances(replacedByTracker.get(tracker), 
replacementsByTracker.get(tracker));
++            if (isStopRequested())
++                throw new CompactionInterruptedException(getCompactionInfo());
++
++            SSTableReader sstable = entry.sstable;
++            logger.trace("Re-sampling index summary for {} from {}/{} to 
{}/{} of the original number of entries",
++                         sstable, sstable.getIndexSummarySamplingLevel(), 
Downsampling.BASE_SAMPLING_LEVEL,
++                         entry.newSamplingLevel, 
Downsampling.BASE_SAMPLING_LEVEL);
++            ColumnFamilyStore cfs = 
Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
++            SSTableReader replacement = 
sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
++            newSSTables.add(replacement);
++            transactions.get(sstable.metadata.cfId).update(replacement, true);
+         }
+ 
+         return newSSTables;
+     }
+ 
+     @VisibleForTesting
+     static Pair<List<SSTableReader>, List<ResampleEntry>> 
distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
+     {
+         // sort by the amount of space regained by doing the downsample 
operation; we want to try to avoid operations
+         // that will make little difference.
+         Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+         {
+             public int compare(ResampleEntry o1, ResampleEntry o2)
+             {
+                 return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() 
- o1.newSpaceUsed,
+                                       o2.sstable.getIndexSummaryOffHeapSize() 
- o2.newSpaceUsed);
+             }
+         });
+ 
+         int noDownsampleCutoff = 0;
+         List<SSTableReader> willNotDownsample = new ArrayList<>();
+         while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+         {
+             ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+ 
+             long extraSpaceRequired = 
entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+             // see if we have enough leftover space to keep the current 
sampling level
+             if (extraSpaceRequired <= remainingSpace)
+             {
+                 logger.trace("Using leftover space to keep {} at the current 
sampling level ({})",
+                              entry.sstable, 
entry.sstable.getIndexSummarySamplingLevel());
+                 willNotDownsample.add(entry.sstable);
+                 remainingSpace -= extraSpaceRequired;
+             }
+             else
+             {
+                 break;
+             }
+ 
+             noDownsampleCutoff++;
+         }
+         return Pair.create(willNotDownsample, 
toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+     }
+ 
+     public CompactionInfo getCompactionInfo()
+     {
 -        return new CompactionInfo(OperationType.INDEX_SUMMARY, 
(remainingSpace - memoryPoolBytes), memoryPoolBytes, "bytes");
++        return new CompactionInfo(OperationType.INDEX_SUMMARY, 
(memoryPoolBytes - remainingSpace), memoryPoolBytes, "bytes", compactionId);
+     }
+ 
+     /** Utility class for sorting sstables by their read rates. */
+     private static class ReadRateComparator implements 
Comparator<SSTableReader>
+     {
+         private final Map<SSTableReader, Double> readRates;
+ 
+         ReadRateComparator(Map<SSTableReader, Double> readRates)
+         {
+             this.readRates = readRates;
+         }
+ 
+         @Override
+         public int compare(SSTableReader o1, SSTableReader o2)
+         {
+             Double readRate1 = readRates.get(o1);
+             Double readRate2 = readRates.get(o2);
+             if (readRate1 == null && readRate2 == null)
+                 return 0;
+             else if (readRate1 == null)
+                 return -1;
+             else if (readRate2 == null)
+                 return 1;
+             else
+                 return Double.compare(readRate1, readRate2);
+         }
+     }
+ 
+     private static class ResampleEntry
+     {
+         public final SSTableReader sstable;
+         public final long newSpaceUsed;
+         public final int newSamplingLevel;
+ 
+         ResampleEntry(SSTableReader sstable, long newSpaceUsed, int 
newSamplingLevel)
+         {
+             this.sstable = sstable;
+             this.newSpaceUsed = newSpaceUsed;
+             this.newSamplingLevel = newSamplingLevel;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 5e46b8e,63928e2..6935680
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@@ -21,38 -21,33 +21,41 @@@ import java.io.IOException
  import java.nio.ByteBuffer;
  import java.util.*;
  import java.util.concurrent.*;
--import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicReference;
  
- import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import com.google.common.base.Joiner;
+ import com.google.common.collect.Sets;
  import org.junit.After;
  import org.junit.Before;
 +import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
- 
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
--import junit.framework.Assert;
  import org.apache.cassandra.OrderedJUnit4ClassRunner;
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
  import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.compaction.OperationType;
  import org.apache.cassandra.db.filter.QueryFilter;
++import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.locator.SimpleStrategy;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.metrics.CompactionMetrics;
  import org.apache.cassandra.metrics.RestorableMeter;
  
 +import static com.google.common.collect.ImmutableMap.of;
 +import static java.util.Arrays.asList;
  import static 
org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
--import static 
org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
--import static 
org.apache.cassandra.io.sstable.IndexSummaryManager.UPSAMPLE_THRESHOLD;
++import static 
org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD;
++import static 
org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD;
  import static 
org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertNotNull;
@@@ -106,8 -78,13 +109,13 @@@ public class IndexSummaryManagerTes
      @After
      public void afterTest()
      {
+         for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions())
+         {
+             holder.stop();
+         }
+ 
 -        String ksname = "Keyspace1";
 -        String cfname = "StandardLowIndexInterval"; // index interval of 8, 
no key caching
 +        String ksname = KEYSPACE1;
 +        String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no 
key caching
          Keyspace keyspace = Keyspace.open(ksname);
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
          cfs.metadata.minIndexInterval(originalMinIndexInterval);
@@@ -581,4 -507,59 +589,65 @@@
                  assertTrue(entry.getValue() >= 
cfs.metadata.getMinIndexInterval());
          }
      }
+ 
+     @Test
+     public void testCancelIndex() throws Exception
+     {
 -        String ksname = "Keyspace1";
 -        String cfname = "StandardLowIndexInterval"; // index interval of 8, 
no key caching
++        String ksname = KEYSPACE1;
++        String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no 
key caching
+         Keyspace keyspace = Keyspace.open(ksname);
 -        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
++        final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+         final int numSSTables = 4;
++        final int numTries = 4;
+         int numRows = 256;
+         createSSTables(ksname, cfname, numSSTables, numRows);
+ 
+         final List<SSTableReader> sstables = new 
ArrayList<>(cfs.getSSTables());
+         for (SSTableReader sstable : sstables)
+             sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+ 
+         final long singleSummaryOffHeapSpace = 
sstables.get(0).getIndexSummaryOffHeapSize();
+ 
+         // everything should get cut in half
+         final AtomicReference<CompactionInterruptedException> exception = new 
AtomicReference<>();
++
+         Thread t = new Thread(new Runnable()
+         {
+             public void run()
+             {
+                 try
+                 {
 -                    
redistributeSummaries(Collections.<SSTableReader>emptyList(), sstables, 
(singleSummaryOffHeapSpace * (numSSTables / 2)));
++                    // Don't leave enough space for even the minimal index 
summaries
++                    try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
++                    {
++                        redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace);
++                    }
+                 }
+                 catch (CompactionInterruptedException ex)
+                 {
+                     exception.set(ex);
+                 }
+                 catch (IOException ignored)
+                 {
+                 }
+             }
+         });
+         t.start();
+         while (CompactionManager.instance.getActiveCompactions() == 0 && 
t.isAlive())
 -            Thread.sleep(1);
++            Thread.yield();
+         CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+         t.join();
+ 
+         assertNotNull("Expected compaction interrupted exception", 
exception.get());
+         assertTrue("Expected no active compactions", 
CompactionMetrics.getCompactions().isEmpty());
+ 
+         Set<SSTableReader> beforeRedistributionSSTables = new 
HashSet<>(sstables);
+         Set<SSTableReader> afterCancelSSTables = new 
HashSet<>(cfs.getSSTables());
+         Set<SSTableReader> disjoint = 
Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
+         assertTrue(String.format("Mismatched files before and after 
cancelling redistribution: %s",
+                                  Joiner.on(",").join(disjoint)),
+                    disjoint.isEmpty());
+ 
+         validateData(cfs, numRows);
+     }
  }

Reply via email to