This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 8fdd6c74ede5ad30a517d372f08ac553b35e04cd
Merge: 9063cea 08018ab
Author: Marcus Eriksson <[email protected]>
AuthorDate: Tue Mar 26 10:12:48 2019 +0100

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 .../cassandra/db/compaction/CompactionInfo.java    |   5 -
 .../cassandra/db/compaction/CompactionManager.java |   2 +-
 .../cassandra/io/sstable/IndexSummaryManager.java  |  38 +++++--
 .../io/sstable/IndexSummaryRedistribution.java     |  26 +++--
 .../org/apache/cassandra/utils/FBUtilities.java    |   2 +-
 test/unit/org/apache/cassandra/Util.java           |  27 +++++
 .../db/compaction/ActiveCompactionsTest.java       |   6 +-
 .../db/compaction/AntiCompactionBytemanTest.java   |   3 +-
 .../db/compaction/AntiCompactionTest.java          |  22 +---
 .../io/sstable/IndexSummaryManagerTest.java        | 111 ++++++++++++++-------
 11 files changed, 154 insertions(+), 89 deletions(-)

diff --cc CHANGES.txt
index 131bcb2,42fd101..a4c285f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -364,8 -7,10 +364,9 @@@ Merged from 3.0
   * Add missing commands to nodetool_completion (CASSANDRA-14916)
   * Anti-compaction temporarily corrupts sstable state for readers 
(CASSANDRA-15004)
  Merged from 2.2:
+  * Fix index summary redistribution cancellation (CASSANDRA-15045)
   * Refactor Circle CI configuration (CASSANDRA-14806)
   * Fixing invalid CQL in security documentation (CASSANDRA-15020)
 - * Multi-version in-JVM dtests (CASSANDRA-14937)
  
  
  3.11.4
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 7c950c0,93bb4c9..09bed74
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@@ -49,19 -34,41 +49,14 @@@ public final class CompactionInf
      private final long total;
      private final Unit unit;
      private final UUID compactionId;
 +    private final ImmutableSet<SSTableReader> sstables;
  
 -    public static enum Unit
 -    {
 -        BYTES("bytes"), RANGES("ranges"), KEYS("keys");
 -
 -        private final String name;
 -
 -        private Unit(String name)
 -        {
 -            this.name = name;
 -        }
 -
 -        @Override
 -        public String toString()
 -        {
 -            return name;
 -        }
 -
 -        public static boolean isFileSize(String unit)
 -        {
 -            return BYTES.toString().equals(unit);
 -        }
 -    }
 -
 -    public CompactionInfo(CFMetaData cfm, OperationType tasktype, long 
bytesComplete, long totalBytes, UUID compactionId)
 -    {
 -        this(cfm, tasktype, bytesComplete, totalBytes, Unit.BYTES, 
compactionId);
 -    }
 -
 -    public CompactionInfo(OperationType tasktype, long completed, long total, 
Unit unit, UUID compactionId)
 +    public CompactionInfo(TableMetadata metadata, OperationType tasktype, 
long bytesComplete, long totalBytes, UUID compactionId, 
Collection<SSTableReader> sstables)
      {
 -        this(null, tasktype, completed, total, unit, compactionId);
 +        this(metadata, tasktype, bytesComplete, totalBytes, Unit.BYTES, 
compactionId, sstables);
      }
  
-     public CompactionInfo(OperationType tasktype, long completed, long total, 
Unit unit, UUID compactionId, Collection<SSTableReader> sstables)
-     {
-         this(null, tasktype, completed, total, unit, compactionId, sstables);
-     }
- 
 -    public CompactionInfo(CFMetaData cfm, OperationType tasktype, long 
completed, long total, Unit unit, UUID compactionId)
 +    private CompactionInfo(TableMetadata metadata, OperationType tasktype, 
long completed, long total, Unit unit, UUID compactionId, 
Collection<SSTableReader> sstables)
      {
          this.tasktype = tasktype;
          this.completed = completed;
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b388098,e0ec179..d387701
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -2107,11 -2078,9 +2107,11 @@@ public class CompactionManager implemen
              if ((info.getTaskType() == OperationType.VALIDATION) && 
!interruptValidation)
                  continue;
  
-             if (Iterables.contains(columnFamilies, info.getTableMetadata()))
 -            // cfmetadata is null for index summary redistributions which are 
'global' - they involve all keyspaces/tables
 -            if (info.getCFMetaData() == null || 
Iterables.contains(columnFamilies, info.getCFMetaData()))
 -                compactionHolder.stop(); // signal compaction to stop
++            if (info.getTableMetadata() == null || 
Iterables.contains(columnFamilies, info.getTableMetadata()))
 +            {
 +                if (info.shouldStop(sstablePredicate))
 +                    compactionHolder.stop();
 +            }
          }
      }
  
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 3630c2a,507b6fa..2d58cf8
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -42,7 -44,7 +43,8 @@@ import org.apache.cassandra.db.lifecycl
  import org.apache.cassandra.db.lifecycle.SSTableSet;
  import org.apache.cassandra.db.lifecycle.View;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.TableId;
+ import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.MBeanWrapper;
  import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.WrappedRunnable;
@@@ -186,14 -188,14 +188,17 @@@ public class IndexSummaryManager implem
      }
  
      /**
--     * Returns a Pair of all compacting and non-compacting sstables.  
Non-compacting sstables will be marked as
--     * compacting.
++     * Marks the non-compacting sstables as compacting for index summary 
redistribution for all keyspaces/tables.
++     *
++     * @return Pair containing:
++     *          left: total size of the off heap index summaries for the 
sstables we were unable to mark compacting (they were involved in other 
compactions)
++     *          right: the transactions, keyed by table id.
       */
      @SuppressWarnings("resource")
-     private Pair<List<SSTableReader>, Map<TableId, LifecycleTransaction>> 
getCompactingAndNonCompactingSSTables()
 -    private Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> 
getCompactingAndNonCompactingSSTables()
++    private Pair<Long, Map<TableId, LifecycleTransaction>> 
getRestributionTransactions()
      {
          List<SSTableReader> allCompacting = new ArrayList<>();
 -        Map<UUID, LifecycleTransaction> allNonCompacting = new HashMap<>();
 +        Map<TableId, LifecycleTransaction> allNonCompacting = new HashMap<>();
          for (Keyspace ks : Keyspace.all())
          {
              for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores())
@@@ -212,22 -214,34 +217,37 @@@
                  allCompacting.addAll(Sets.difference(allSSTables, 
nonCompacting));
              }
          }
--        return Pair.create(allCompacting, allNonCompacting);
++        long nonRedistributingOffHeapSize = 
allCompacting.stream().mapToLong(SSTableReader::getIndexSummaryOffHeapSize).sum();
++        return Pair.create(nonRedistributingOffHeapSize, allNonCompacting);
      }
  
      public void redistributeSummaries() throws IOException
      {
-         Pair<List<SSTableReader>, Map<TableId, LifecycleTransaction>> 
compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
 -        Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> 
compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
++        Pair<Long, Map<TableId, LifecycleTransaction>> 
redistributionTransactionInfo = getRestributionTransactions();
++        Map<TableId, LifecycleTransaction> transactions = 
redistributionTransactionInfo.right;
++        long nonRedistributingOffHeapSize = 
redistributionTransactionInfo.left;
          try
          {
--            redistributeSummaries(new 
IndexSummaryRedistribution(compactingAndNonCompacting.left,
--                                                                 
compactingAndNonCompacting.right,
++            redistributeSummaries(new IndexSummaryRedistribution(transactions,
++                                                                 
nonRedistributingOffHeapSize,
                                                                   
this.memoryPoolBytes));
          }
+         catch (Exception e)
+         {
+             if (!(e instanceof CompactionInterruptedException))
+                 logger.error("Got exception during index summary 
redistribution", e);
+             throw e;
+         }
          finally
          {
-             for (LifecycleTransaction modifier : 
compactingAndNonCompacting.right.values())
-                 modifier.close();
+             try
+             {
 -                
FBUtilities.closeAll(compactingAndNonCompacting.right.values());
++                FBUtilities.closeAll(transactions.values());
+             }
+             catch (Exception e)
+             {
+                 throw new RuntimeException(e);
+             }
          }
      }
  
diff --cc 
src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index b4fca41,ab23ef3..a8fcad1
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@@ -58,16 -58,16 +58,22 @@@ public class IndexSummaryRedistributio
      static final double UPSAMPLE_THRESHOLD = 1.5;
      static final double DOWNSAMPLE_THESHOLD = 0.75;
  
--    private final List<SSTableReader> compacting;
 -    private final Map<UUID, LifecycleTransaction> transactions;
 +    private final Map<TableId, LifecycleTransaction> transactions;
++    private final long nonRedistributingOffHeapSize;
      private final long memoryPoolBytes;
      private final UUID compactionId;
      private volatile long remainingSpace;
  
-     public IndexSummaryRedistribution(List<SSTableReader> compacting, 
Map<TableId, LifecycleTransaction> transactions, long memoryPoolBytes)
 -    public IndexSummaryRedistribution(List<SSTableReader> compacting, 
Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes)
++    /**
++     *
++     * @param transactions the transactions for the different 
keyspaces/tables we are to redistribute
++     * @param nonRedistributingOffHeapSize the total index summary off heap 
size for all sstables we were not able to mark compacting (due to them being 
involved in other compactions)
++     * @param memoryPoolBytes size of the memory pool
++     */
++    public IndexSummaryRedistribution(Map<TableId, LifecycleTransaction> 
transactions, long nonRedistributingOffHeapSize, long memoryPoolBytes)
      {
--        this.compacting = compacting;
          this.transactions = transactions;
++        this.nonRedistributingOffHeapSize = nonRedistributingOffHeapSize;
          this.memoryPoolBytes = memoryPoolBytes;
          this.compactionId = UUID.randomUUID();
      }
@@@ -81,8 -82,19 +87,8 @@@
              redistribute.addAll(txn.originals());
          }
  
--        long total = 0;
--        for (SSTableReader sstable : Iterables.concat(compacting, 
redistribute))
++        long total = nonRedistributingOffHeapSize;
++        for (SSTableReader sstable : redistribute)
              total += sstable.getIndexSummaryOffHeapSize();
  
          logger.trace("Beginning redistribution of index summaries for {} 
sstables with memory pool size {} MB; current spaced used is {} MB",
@@@ -108,9 -120,9 +114,7 @@@
          List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
          Collections.sort(sstablesByHotness, new 
ReadRateComparator(readRates));
  
--        long remainingBytes = memoryPoolBytes;
-         for (SSTableReader sstable : compacting)
 -        for (SSTableReader sstable : Iterables.concat(compacting, 
oldFormatSSTables))
--            remainingBytes -= sstable.getIndexSummaryOffHeapSize();
++        long remainingBytes = memoryPoolBytes - nonRedistributingOffHeapSize;
  
          logger.trace("Index summaries for compacting SSTables are using {} MB 
of space",
                       (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
@@@ -122,11 -134,10 +126,11 @@@
              for (LifecycleTransaction txn : transactions.values())
                  txn.finish();
          }
--        total = 0;
-         for (SSTableReader sstable : Iterables.concat(compacting, 
newSSTables))
 -        for (SSTableReader sstable : Iterables.concat(compacting, 
oldFormatSSTables, newSSTables))
++        total = nonRedistributingOffHeapSize;
++        for (SSTableReader sstable : newSSTables)
              total += sstable.getIndexSummaryOffHeapSize();
 -        logger.trace("Completed resizing of index summaries; current 
approximate memory used: {}",
 +        if (logger.isTraceEnabled())
 +            logger.trace("Completed resizing of index summaries; current 
approximate memory used: {}",
                       FBUtilities.prettyPrintMemory(total));
  
          return newSSTables;
@@@ -301,7 -311,7 +305,7 @@@
  
      public CompactionInfo getCompactionInfo()
      {
-         return new CompactionInfo(OperationType.INDEX_SUMMARY, 
(memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId, 
compacting);
 -        return new CompactionInfo(OperationType.INDEX_SUMMARY, 
(memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId);
++        return CompactionInfo.withoutSSTables(null, 
OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), 
memoryPoolBytes, Unit.BYTES, compactionId);
      }
  
      /** Utility class for sorting sstables by their read rates. */
diff --cc src/java/org/apache/cassandra/utils/FBUtilities.java
index 79f9605,266d428..129c0f5
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@@ -919,7 -870,43 +919,7 @@@ public class FBUtilitie
          return historyDir;
      }
  
-     public static void closeAll(List<? extends AutoCloseable> l) throws 
Exception
 -    public static void updateWithShort(MessageDigest digest, int val)
 -    {
 -        digest.update((byte) ((val >> 8) & 0xFF));
 -        digest.update((byte) (val & 0xFF));
 -    }
 -
 -    public static void updateWithByte(MessageDigest digest, int val)
 -    {
 -        digest.update((byte) (val & 0xFF));
 -    }
 -
 -    public static void updateWithInt(MessageDigest digest, int val)
 -    {
 -        digest.update((byte) ((val >>> 24) & 0xFF));
 -        digest.update((byte) ((val >>> 16) & 0xFF));
 -        digest.update((byte) ((val >>>  8) & 0xFF));
 -        digest.update((byte) ((val >>> 0) & 0xFF));
 -    }
 -
 -    public static void updateWithLong(MessageDigest digest, long val)
 -    {
 -        digest.update((byte) ((val >>> 56) & 0xFF));
 -        digest.update((byte) ((val >>> 48) & 0xFF));
 -        digest.update((byte) ((val >>> 40) & 0xFF));
 -        digest.update((byte) ((val >>> 32) & 0xFF));
 -        digest.update((byte) ((val >>> 24) & 0xFF));
 -        digest.update((byte) ((val >>> 16) & 0xFF));
 -        digest.update((byte) ((val >>>  8) & 0xFF));
 -        digest.update((byte)  ((val >>> 0) & 0xFF));
 -    }
 -
 -    public static void updateWithBoolean(MessageDigest digest, boolean val)
 -    {
 -        updateWithByte(digest, val ? 0 : 1);
 -    }
 -
+     public static void closeAll(Collection<? extends AutoCloseable> l) throws 
Exception
      {
          Exception toThrow = null;
          for (AutoCloseable c : l)
diff --cc test/unit/org/apache/cassandra/Util.java
index 3054be6,a3ad653..ba5d4d3
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -21,7 -21,8 +21,8 @@@ package org.apache.cassandra
  
  import java.io.Closeable;
  import java.io.EOFException;
++import java.io.File;
  import java.io.IOError;
 -import java.net.InetAddress;
  import java.net.UnknownHostException;
  import java.nio.ByteBuffer;
  import java.util.*;
@@@ -29,6 -30,6 +30,7 @@@ import java.util.concurrent.Callable
  import java.util.concurrent.Future;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.function.Supplier;
++import java.util.stream.Collectors;
  
  import com.google.common.base.Function;
  import com.google.common.base.Preconditions;
@@@ -40,12 -39,8 +42,13 @@@ import org.junit.Assert
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
++import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.ReplicaCollection;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.cql3.ColumnIdentifier;
  
@@@ -722,13 -708,4 +725,37 @@@ public class Uti
          return new PagingState(pk, mark, 10, 0);
      }
  
 +    public static void assertRCEquals(ReplicaCollection<?> a, 
ReplicaCollection<?> b)
 +    {
 +        assertTrue(a + " not equal to " + b, Iterables.elementsEqual(a, b));
 +    }
 +
 +    public static void assertNotRCEquals(ReplicaCollection<?> a, 
ReplicaCollection<?> b)
 +    {
 +        assertFalse(a + " equal to " + b, Iterables.elementsEqual(a, b));
 +    }
++
++    /**
++     * Makes sure that the sstables on disk are the same ones as the cfs live 
sstables (that they have the same generation)
++     */
++    public static void assertOnDiskState(ColumnFamilyStore cfs, int 
expectedSSTableCount)
++    {
++        LifecycleTransaction.waitForDeletions();
++        assertEquals(expectedSSTableCount, cfs.getLiveSSTables().size());
++        Set<Integer> liveGenerations = 
cfs.getLiveSSTables().stream().map(sstable -> 
sstable.descriptor.generation).collect(Collectors.toSet());
++        int fileCount = 0;
++        for (File f : cfs.getDirectories().getCFDirectories())
++        {
++            for (File sst : f.listFiles())
++            {
++                if (sst.getName().contains("Data"))
++                {
++                    Descriptor d = 
Descriptor.fromFilename(sst.getAbsolutePath());
++                    assertTrue(liveGenerations.contains(d.generation));
++                    fileCount++;
++                }
++            }
++        }
++        assertEquals(expectedSSTableCount, fileCount);
++    }
  }
diff --cc 
test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
index 23e393d,0000000..be5e7df
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
@@@ -1,194 -1,0 +1,196 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.compaction;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.ExecutionException;
 +
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Sets;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.cache.AutoSavingCache;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.CQLTester;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.view.View;
 +import org.apache.cassandra.db.view.ViewBuilderTask;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.service.CacheService;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
 +
 +public class ActiveCompactionsTest extends CQLTester
 +{
 +    @Test
 +    public void testSecondaryIndexTracking() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY 
KEY (pk, ck))");
 +        String idxName = createIndex("CREATE INDEX on %s(a)");
 +        getCurrentColumnFamilyStore().disableAutoCompaction();
 +        for (int i = 0; i < 5; i++)
 +        {
 +            execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)");
 +            getCurrentColumnFamilyStore().forceBlockingFlush();
 +        }
 +
 +        Index idx = 
getCurrentColumnFamilyStore().indexManager.getIndexByName(idxName);
 +        Set<SSTableReader> sstables = 
getCurrentColumnFamilyStore().getLiveSSTables();
 +        SecondaryIndexBuilder builder = 
idx.getBuildTaskSupport().getIndexBuildTask(getCurrentColumnFamilyStore(), 
Collections.singleton(idx), sstables);
 +
 +        MockActiveCompactions mockActiveCompactions = new 
MockActiveCompactions();
 +        CompactionManager.instance.submitIndexBuild(builder, 
mockActiveCompactions).get();
 +
 +        assertTrue(mockActiveCompactions.finished);
 +        assertNotNull(mockActiveCompactions.holder);
 +        assertEquals(sstables, 
mockActiveCompactions.holder.getCompactionInfo().getSSTables());
 +    }
 +
 +    @Test
 +    public void testIndexSummaryRedistributionTracking() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY 
KEY (pk, ck))");
 +        getCurrentColumnFamilyStore().disableAutoCompaction();
 +        for (int i = 0; i < 5; i++)
 +        {
 +            execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)");
 +            getCurrentColumnFamilyStore().forceBlockingFlush();
 +        }
 +        Set<SSTableReader> sstables = 
getCurrentColumnFamilyStore().getLiveSSTables();
 +        try (LifecycleTransaction txn = 
getCurrentColumnFamilyStore().getTracker().tryModify(sstables, 
OperationType.INDEX_SUMMARY))
 +        {
 +            Map<TableId, LifecycleTransaction> transactions = 
ImmutableMap.<TableId, 
LifecycleTransaction>builder().put(getCurrentColumnFamilyStore().metadata().id, 
txn).build();
-             IndexSummaryRedistribution isr = new 
IndexSummaryRedistribution(new ArrayList<>(sstables), transactions, 1000);
++            IndexSummaryRedistribution isr = new 
IndexSummaryRedistribution(transactions, 0, 1000);
 +            MockActiveCompactions mockActiveCompactions = new 
MockActiveCompactions();
 +            CompactionManager.instance.runIndexSummaryRedistribution(isr, 
mockActiveCompactions);
 +            assertTrue(mockActiveCompactions.finished);
 +            assertNotNull(mockActiveCompactions.holder);
-             assertEquals(sstables, 
mockActiveCompactions.holder.getCompactionInfo().getSSTables());
++            // index redistribution operates over all keyspaces/tables, we 
always cancel them
++            
assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty());
++            
assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((sstable)
 -> false));
 +        }
 +    }
 +
 +    @Test
 +    public void testViewBuildTracking() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY 
(k1, c1))");
 +        getCurrentColumnFamilyStore().disableAutoCompaction();
 +        for (int i = 0; i < 5; i++)
 +        {
 +            execute("INSERT INTO %s (k1, c1, val) VALUES ("+i+", 2, 3)");
 +            getCurrentColumnFamilyStore().forceBlockingFlush();
 +        }
 +        execute(String.format("CREATE MATERIALIZED VIEW %s.view1 AS SELECT 
k1, c1, val FROM %s.%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT 
NULL PRIMARY KEY (val, k1, c1)", keyspace(), keyspace(), currentTable()));
 +        View view = 
Iterables.getOnlyElement(getCurrentColumnFamilyStore().viewManager);
 +
 +        Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
 +        ViewBuilderTask vbt = new 
ViewBuilderTask(getCurrentColumnFamilyStore(), view, new Range<>(token, token), 
token, 0);
 +
 +        MockActiveCompactions mockActiveCompactions = new 
MockActiveCompactions();
 +        CompactionManager.instance.submitViewBuilder(vbt, 
mockActiveCompactions).get();
 +        assertTrue(mockActiveCompactions.finished);
 +        
assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty());
 +        // this should stop for all compactions, even if it doesn't pick any 
sstables;
 +        
assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((sstable)
 -> false));
 +    }
 +
 +    @Test
 +    public void testScrubOne() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY 
KEY (pk, ck))");
 +        getCurrentColumnFamilyStore().disableAutoCompaction();
 +        for (int i = 0; i < 5; i++)
 +        {
 +            execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 
4)");
 +            getCurrentColumnFamilyStore().forceBlockingFlush();
 +        }
 +
 +        SSTableReader sstable = 
Iterables.getFirst(getCurrentColumnFamilyStore().getLiveSSTables(), null);
 +        try (LifecycleTransaction txn = 
getCurrentColumnFamilyStore().getTracker().tryModify(sstable, 
OperationType.SCRUB))
 +        {
 +            MockActiveCompactions mockActiveCompactions = new 
MockActiveCompactions();
 +            
CompactionManager.instance.scrubOne(getCurrentColumnFamilyStore(), txn, true, 
false, false, mockActiveCompactions);
 +
 +            assertTrue(mockActiveCompactions.finished);
 +            
assertEquals(mockActiveCompactions.holder.getCompactionInfo().getSSTables(), 
Sets.newHashSet(sstable));
 +            
assertFalse(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> 
false));
 +            
assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> 
true));
 +        }
 +
 +    }
 +
 +    @Test
 +    public void testVerifyOne() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY 
KEY (pk, ck))");
 +        getCurrentColumnFamilyStore().disableAutoCompaction();
 +        for (int i = 0; i < 5; i++)
 +        {
 +            execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 
4)");
 +            getCurrentColumnFamilyStore().forceBlockingFlush();
 +        }
 +
 +        SSTableReader sstable = 
Iterables.getFirst(getCurrentColumnFamilyStore().getLiveSSTables(), null);
 +        MockActiveCompactions mockActiveCompactions = new 
MockActiveCompactions();
 +        CompactionManager.instance.verifyOne(getCurrentColumnFamilyStore(), 
sstable, new Verifier.Options.Builder().build(), mockActiveCompactions);
 +        assertTrue(mockActiveCompactions.finished);
 +        
assertEquals(mockActiveCompactions.holder.getCompactionInfo().getSSTables(), 
Sets.newHashSet(sstable));
 +        
assertFalse(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> 
false));
 +        
assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> 
true));
 +    }
 +
 +    @Test
 +    public void testSubmitCacheWrite() throws ExecutionException, 
InterruptedException
 +    {
 +        AutoSavingCache.Writer writer = 
CacheService.instance.keyCache.getWriter(100);
 +        MockActiveCompactions mockActiveCompactions = new 
MockActiveCompactions();
 +        CompactionManager.instance.submitCacheWrite(writer, 
mockActiveCompactions).get();
 +        assertTrue(mockActiveCompactions.finished);
 +        
assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty());
 +    }
 +
 +    private static class MockActiveCompactions implements 
ActiveCompactionsTracker
 +    {
 +        public CompactionInfo.Holder holder;
 +        public boolean finished = false;
 +        public void beginCompaction(CompactionInfo.Holder ci)
 +        {
 +            holder = ci;
 +        }
 +
 +        public void finishCompaction(CompactionInfo.Holder ci)
 +        {
 +            finished = true;
 +        }
 +    }
 +}
diff --cc 
test/unit/org/apache/cassandra/db/compaction/AntiCompactionBytemanTest.java
index 1673d01,ba6f3a1..38d2607
--- 
a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionBytemanTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionBytemanTest.java
@@@ -32,6 -32,6 +32,7 @@@ import com.google.common.util.concurren
  import org.junit.Test;
  import org.junit.runner.RunWith;
  
++import org.apache.cassandra.Util;
  import org.apache.cassandra.cql3.CQLTester;
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@@ -131,6 -120,6 +132,6 @@@ public class AntiCompactionBytemanTest 
          t.join();
          assertFalse(failed.get());
          
assertFalse(getCurrentColumnFamilyStore().getLiveSSTables().contains(sstableBefore));
-         AntiCompactionTest.assertOnDiskState(getCurrentColumnFamilyStore(), 
3);
 -        AntiCompactionTest.assertOnDiskState(getCurrentColumnFamilyStore(), 
2);
++        Util.assertOnDiskState(getCurrentColumnFamilyStore(), 3);
      }
  }
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 7b5164a,e2b17e8..b2618e5
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -59,16 -52,10 +59,17 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.schema.KeyspaceParams;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.streaming.PreviewKind;
  import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.UUIDGen;
  import org.apache.cassandra.utils.concurrent.Refs;
  import org.apache.cassandra.UpdateBuilder;
 +import org.apache.cassandra.utils.concurrent.Transactional;
  
 +import static 
org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
 +import static 
org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
++import static org.apache.cassandra.Util.assertOnDiskState;
  import static org.hamcrest.CoreMatchers.is;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
@@@ -458,148 -449,25 +459,127 @@@ public class AntiCompactionTes
          return 
ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> 
!s.isRepaired()));
      }
  
 -    static void assertOnDiskState(ColumnFamilyStore cfs, int 
expectedSSTableCount)
 +    /**
 +     * If the parent repair session is missing, we should still clean up
 +     */
 +    @Test
 +    public void missingParentRepairSession() throws Exception
      {
 -        LifecycleTransaction.waitForDeletions();
 -        assertEquals(expectedSSTableCount, cfs.getLiveSSTables().size());
 -        Set<Integer> liveGenerations = 
cfs.getLiveSSTables().stream().map(sstable -> 
sstable.descriptor.generation).collect(Collectors.toSet());
 -        int fileCount = 0;
 -        for (File f : cfs.getDirectories().getCFDirectories())
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 +        store.disableAutoCompaction();
 +
 +        for (int table = 0; table < 10; table++)
 +        {
 +            generateSStable(store,Integer.toString(table));
 +        }
 +        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
 +        assertEquals(10, sstables.size());
 +
 +        Range<Token> range = new Range<Token>(new 
BytesToken("-1".getBytes()), new BytesToken("-10".getBytes()));
 +        List<Range<Token>> ranges = Arrays.asList(range);
 +
 +        UUID missingRepairSession = UUIDGen.getTimeUUID();
 +        try (LifecycleTransaction txn = 
store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
 +             Refs<SSTableReader> refs = Refs.ref(sstables))
          {
 -            for (File sst : f.listFiles())
 +            Assert.assertFalse(refs.isEmpty());
 +            try
              {
 -                if (sst.getName().contains("Data"))
 -                {
 -                    Descriptor d = 
Descriptor.fromFilename(sst.getAbsolutePath());
 -                    assertTrue(liveGenerations.contains(d.generation));
 -                    fileCount++;
 -                }
 +                CompactionManager.instance.performAnticompaction(store, 
atEndpoint(ranges, NO_RANGES), refs, txn, missingRepairSession, () -> false);
 +                Assert.fail("expected RuntimeException");
              }
 +            catch (RuntimeException e)
 +            {
 +                // expected
 +            }
 +            
Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, 
txn.state());
 +            Assert.assertTrue(refs.isEmpty());
          }
 -        assertEquals(expectedSSTableCount, fileCount);
      }
  
 +    @Test
 +    public void testSSTablesToInclude()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
 +        List<SSTableReader> sstables = new ArrayList<>();
 +        sstables.add(MockSchema.sstable(1, 10, 100, cfs));
 +        sstables.add(MockSchema.sstable(2, 100, 200, cfs));
 +
 +        Range<Token> r = new Range<>(t(10), t(100)); // should include 
sstable 1 and 2 above, but none is fully contained (Range is (x, y])
 +
 +        Iterator<SSTableReader> sstableIterator = sstables.iterator();
 +        Set<SSTableReader> fullyContainedSSTables = 
CompactionManager.findSSTablesToAnticompact(sstableIterator, 
Collections.singletonList(r), UUID.randomUUID());
 +        assertTrue(fullyContainedSSTables.isEmpty());
 +        assertEquals(2, sstables.size());
 +    }
 +
 +    @Test
 +    public void testSSTablesToInclude2()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
 +        List<SSTableReader> sstables = new ArrayList<>();
 +        SSTableReader sstable1 = MockSchema.sstable(1, 10, 100, cfs);
 +        SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs);
 +        sstables.add(sstable1);
 +        sstables.add(sstable2);
 +
 +        Range<Token> r = new Range<>(t(9), t(100)); // sstable 1 is fully 
contained
 +
 +        Iterator<SSTableReader> sstableIterator = sstables.iterator();
 +        Set<SSTableReader> fullyContainedSSTables = 
CompactionManager.findSSTablesToAnticompact(sstableIterator, 
Collections.singletonList(r), UUID.randomUUID());
 +        assertEquals(Collections.singleton(sstable1), fullyContainedSSTables);
 +        assertEquals(Collections.singletonList(sstable2), sstables);
 +    }
 +
 +    @Test(expected = IllegalStateException.class)
 +    public void testSSTablesToNotInclude()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
 +        List<SSTableReader> sstables = new ArrayList<>();
 +        SSTableReader sstable1 = MockSchema.sstable(1, 0, 5, cfs);
 +        sstables.add(sstable1);
 +
 +        Range<Token> r = new Range<>(t(9), t(100)); // sstable is not 
intersecting and should not be included
 +
 +        
CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), 
sstables, atEndpoint(Collections.singletonList(r), NO_RANGES));
 +    }
 +
 +    @Test(expected = IllegalStateException.class)
 +    public void testSSTablesToNotInclude2()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
 +        List<SSTableReader> sstables = new ArrayList<>();
 +        SSTableReader sstable1 = MockSchema.sstable(1, 10, 10, cfs);
 +        SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs);
 +        sstables.add(sstable1);
 +        sstables.add(sstable2);
 +
 +        Range<Token> r = new Range<>(t(10), t(11)); // no sstable included, 
throw
 +
 +        
CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), 
sstables, atEndpoint(Collections.singletonList(r), NO_RANGES));
 +    }
 +
 +    @Test
 +    public void testSSTablesToInclude4()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
 +        List<SSTableReader> sstables = new ArrayList<>();
 +        SSTableReader sstable1 = MockSchema.sstable(1, 10, 100, cfs);
 +        SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs);
 +        sstables.add(sstable1);
 +        sstables.add(sstable2);
 +
 +        Range<Token> r = new Range<>(t(9), t(200)); // sstable 2 is fully 
contained - last token is equal
 +
 +        Iterator<SSTableReader> sstableIterator = sstables.iterator();
 +        Set<SSTableReader> fullyContainedSSTables = 
CompactionManager.findSSTablesToAnticompact(sstableIterator, 
Collections.singletonList(r), UUID.randomUUID());
 +        assertEquals(Sets.newHashSet(sstable1, sstable2), 
fullyContainedSSTables);
 +        assertTrue(sstables.isEmpty());
 +    }
 +
 +    private Token t(long t)
 +    {
 +        return new Murmur3Partitioner.LongToken(t);
 +    }
- 
-     static void assertOnDiskState(ColumnFamilyStore cfs, int 
expectedSSTableCount)
-     {
-         LifecycleTransaction.waitForDeletions();
-         assertEquals(expectedSSTableCount, cfs.getLiveSSTables().size());
-         Set<Integer> liveGenerations = 
cfs.getLiveSSTables().stream().map(sstable -> 
sstable.descriptor.generation).collect(Collectors.toSet());
-         int fileCount = 0;
-         for (File f : cfs.getDirectories().getCFDirectories())
-         {
-             for (File sst : f.listFiles())
-             {
-                 if (sst.getName().contains("Data"))
-                 {
-                     Descriptor d = 
Descriptor.fromFilename(sst.getAbsolutePath());
-                     assertTrue(liveGenerations.contains(d.generation));
-                     fileCount++;
-                 }
-             }
-         }
-         assertEquals(expectedSSTableCount, fileCount);
-     }
  }
diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index ab8a710,4edf8af..68ee3e1
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@@ -22,6 -22,7 +22,8 @@@ import java.nio.ByteBuffer
  import java.util.*;
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicReference;
+ import java.util.function.Consumer;
++import java.util.stream.Collectors;
  
  import com.google.common.base.Joiner;
  import com.google.common.collect.Sets;
@@@ -40,6 -41,6 +42,7 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.RowUpdateBuilder;
++import org.apache.cassandra.db.compaction.AntiCompactionTest;
  import org.apache.cassandra.db.compaction.CompactionInfo;
  import org.apache.cassandra.db.compaction.CompactionInterruptedException;
  import org.apache.cassandra.db.compaction.CompactionManager;
@@@ -604,15 -602,26 +607,30 @@@ public class IndexSummaryManagerTes
      @Test
      public void testCancelIndex() throws Exception
      {
+         testCancelIndexHelper((cfs) -> 
CompactionManager.instance.stopCompaction("INDEX_SUMMARY"));
+     }
+ 
+     @Test
+     public void testCancelIndexInterrupt() throws Exception
+     {
 -        testCancelIndexHelper((cfs) -> 
CompactionManager.instance.interruptCompactionFor(Collections.singleton(cfs.metadata),
 false));
++        testCancelIndexHelper((cfs) -> 
CompactionManager.instance.interruptCompactionFor(Collections.singleton(cfs.metadata()),
 (sstable) -> true, false));
+     }
+ 
+     public void testCancelIndexHelper(Consumer<ColumnFamilyStore> 
cancelFunction) throws Exception
+     {
          String ksname = KEYSPACE1;
          String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no 
key caching
          Keyspace keyspace = Keyspace.open(ksname);
          final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
--        final int numSSTables = 4;
++        cfs.disableAutoCompaction();
++        final int numSSTables = 8;
          int numRows = 256;
          createSSTables(ksname, cfname, numSSTables, numRows);
  
--        final List<SSTableReader> sstables = new 
ArrayList<>(cfs.getLiveSSTables());
++        List<SSTableReader> allSSTables = new 
ArrayList<>(cfs.getLiveSSTables());
++        List<SSTableReader> sstables = allSSTables.subList(0, 4);
++        List<SSTableReader> compacting = allSSTables.subList(4, 8);
++
          for (SSTableReader sstable : sstables)
              sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
  
@@@ -622,52 -631,52 +640,68 @@@
          final AtomicReference<CompactionInterruptedException> exception = new 
AtomicReference<>();
          // barrier to control when redistribution runs
          final CountDownLatch barrier = new CountDownLatch(1);
--
--        Thread t = NamedThreadFactory.createThread(new Runnable()
++        CompactionInfo.Holder ongoingCompaction = new CompactionInfo.Holder()
          {
--            public void run()
++            public CompactionInfo getCompactionInfo()
+             {
 -                try
++                return new CompactionInfo(cfs.metadata(), 
OperationType.UNKNOWN, 0, 0, UUID.randomUUID(), compacting);
++            }
++        };
++        try (LifecycleTransaction ignored = 
cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN))
++        {
++            
CompactionManager.instance.active.beginCompaction(ongoingCompaction);
++
++            Thread t = NamedThreadFactory.createThread(new Runnable()
 +            {
-                 try
++                public void run()
                  {
--                    // Don't leave enough space for even the minimal index 
summaries
--                    try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
++                    try
++                    {
++                        // Don't leave enough space for even the minimal 
index summaries
++                        try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
++                        {
++                            IndexSummaryManager.redistributeSummaries(new 
ObservableRedistribution(of(cfs.metadata.id, txn),
++                                                                              
                     0,
++                                                                              
                     singleSummaryOffHeapSpace,
++                                                                              
                     barrier));
++                        }
++                    }
++                    catch (CompactionInterruptedException ex)
++                    {
++                        exception.set(ex);
++                    }
++                    catch (IOException ignored)
                      {
--                        IndexSummaryManager.redistributeSummaries(new 
ObservableRedistribution(Collections.EMPTY_LIST,
-                                                                               
                 of(cfs.metadata.id, txn),
 -                                                                              
                 of(cfs.metadata.cfId, txn),
--                                                                              
                 singleSummaryOffHeapSpace,
--                                                                              
                 barrier));
                      }
                  }
--                catch (CompactionInterruptedException ex)
--                {
--                    exception.set(ex);
--                }
--                catch (IOException ignored)
--                {
--                }
--            }
--        });
--        t.start();
--        while (CompactionManager.instance.getActiveCompactions() == 0 && 
t.isAlive())
--            Thread.sleep(1);
--        // to ensure that the stop condition check in 
IndexSummaryRedistribution::redistributeSummaries
--        // is made *after* the halt request is made to the CompactionManager, 
don't allow the redistribution
--        // to proceed until stopCompaction has been called.
-         CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
 -        cancelFunction.accept(cfs);
--        // allows the redistribution to proceed
--        barrier.countDown();
--        t.join();
++            });
++
++            t.start();
++            while (CompactionManager.instance.getActiveCompactions() < 2 && 
t.isAlive())
++                Thread.sleep(1);
++            // to ensure that the stop condition check in 
IndexSummaryRedistribution::redistributeSummaries
++            // is made *after* the halt request is made to the 
CompactionManager, don't allow the redistribution
++            // to proceed until stopCompaction has been called.
++            cancelFunction.accept(cfs);
++            // allows the redistribution to proceed
++            barrier.countDown();
++            t.join();
++        }
++        finally
++        {
++            
CompactionManager.instance.active.finishCompaction(ongoingCompaction);
++        }
  
          assertNotNull("Expected compaction interrupted exception", 
exception.get());
 -        assertTrue("Expected no active compactions", 
CompactionMetrics.getCompactions().isEmpty());
 +        assertTrue("Expected no active compactions", 
CompactionManager.instance.active.getCompactions().isEmpty());
  
--        Set<SSTableReader> beforeRedistributionSSTables = new 
HashSet<>(sstables);
++        Set<SSTableReader> beforeRedistributionSSTables = new 
HashSet<>(allSSTables);
          Set<SSTableReader> afterCancelSSTables = new 
HashSet<>(cfs.getLiveSSTables());
          Set<SSTableReader> disjoint = 
Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
          assertTrue(String.format("Mismatched files before and after 
cancelling redistribution: %s",
                                   Joiner.on(",").join(disjoint)),
                     disjoint.isEmpty());
--
++        Util.assertOnDiskState(cfs, 8);
          validateData(cfs, numRows);
      }
  
@@@ -676,8 -685,8 +710,9 @@@
                                                               long 
memoryPoolBytes)
      throws IOException
      {
--        return IndexSummaryManager.redistributeSummaries(new 
IndexSummaryRedistribution(compacting,
--                                                                              
          transactions,
++        long nonRedistributingOffHeapSize = 
compacting.stream().mapToLong(SSTableReader::getIndexSummaryOffHeapSize).sum();
++        return IndexSummaryManager.redistributeSummaries(new 
IndexSummaryRedistribution(transactions,
++                                                                              
          nonRedistributingOffHeapSize,
                                                                                
          memoryPoolBytes));
      }
  
@@@ -685,12 -694,12 +720,12 @@@
      {
          CountDownLatch barrier;
  
--        ObservableRedistribution(List<SSTableReader> compacting,
-                                  Map<TableId, LifecycleTransaction> 
transactions,
 -                                 Map<UUID, LifecycleTransaction> transactions,
++        ObservableRedistribution(Map<TableId, LifecycleTransaction> 
transactions,
++                                 long nonRedistributingOffHeapSize,
                                   long memoryPoolBytes,
                                   CountDownLatch barrier)
          {
--            super(compacting, transactions, memoryPoolBytes);
++            super(transactions, nonRedistributingOffHeapSize, 
memoryPoolBytes);
              this.barrier = barrier;
          }
  


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to