This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new b40d79c  Make sure index summary redistributions don't start when 
compactions are paused
b40d79c is described below

commit b40d79cf1cc6b4210edfa86d52546fd3880e22f0
Author: Marcus Eriksson <[email protected]>
AuthorDate: Wed Aug 14 14:11:57 2019 +0200

    Make sure index summary redistributions don't start when compactions are 
paused
    
    Patch by marcuse; reviewed by Benedict Elliott Smith for CASSANDRA-15265
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/cache/AutoSavingCache.java    |  5 ++
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 43 ++++++++--
 .../cassandra/db/compaction/CompactionInfo.java    |  9 +-
 .../db/compaction/CompactionIterator.java          |  5 ++
 .../cassandra/db/compaction/CompactionManager.java | 27 +++++-
 .../cassandra/db/compaction/CompactionTask.java    |  9 +-
 .../apache/cassandra/db/compaction/Scrubber.java   |  5 ++
 .../apache/cassandra/db/compaction/Verifier.java   |  5 ++
 .../org/apache/cassandra/db/view/ViewBuilder.java  | 10 +--
 .../cassandra/index/SecondaryIndexBuilder.java     |  5 ++
 .../cassandra/io/sstable/IndexSummaryManager.java  |  2 +
 .../io/sstable/IndexSummaryRedistribution.java     |  6 ++
 .../db/compaction/AntiCompactionTest.java          |  2 +-
 .../db/compaction/CancelCompactionsTest.java       | 98 ++++++++++++++++++++++
 .../io/sstable/IndexSummaryManagerTest.java        | 40 +++++++++
 16 files changed, 254 insertions(+), 18 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d58a199..9a44b27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.20
+ * Make sure index summary redistribution does not start when compactions are 
paused (CASSANDRA-15265)
  * Ensure legacy rows have primary key livenessinfo when they contain illegal 
cells (CASSANDRA-15365)
 Merged from 2.2
  * In-JVM DTest: Set correct internode message version for upgrade test 
(CASSANDRA-15371)
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java 
b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 00431b9..3da6352 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -421,6 +421,11 @@ public class AutoSavingCache<K extends CacheKey, V> 
extends InstrumentingCache<K
                 logger.warn("Could not list files in {}", savedCachesDir);
             }
         }
+
+        public boolean isGlobal()
+        {
+            return false;
+        }
     }
 
     public interface CacheSerializer<K extends CacheKey, V>
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c5e81f0..a9bd54a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -84,6 +84,7 @@ import org.json.simple.JSONObject;
 import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
 import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
 import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -2088,9 +2089,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                                                                ? 
Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs())
                                                                : 
concatWithIndexes();
 
-            for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
-                cfs.getCompactionStrategyManager().pause();
-            try
+            try (CompactionManager.CompactionPauser pause = 
CompactionManager.instance.pauseGlobalCompaction();
+                 CompactionManager.CompactionPauser pausedStrategies = 
pauseCompactionStrategies(selfWithAuxiliaryCfs))
             {
                 // interrupt in-progress compactions
                 
CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, 
interruptValidation);
@@ -2117,12 +2117,43 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                     throw new RuntimeException(e);
                 }
             }
-            finally
+        }
+    }
+
+    private static CompactionManager.CompactionPauser 
pauseCompactionStrategies(Iterable<ColumnFamilyStore> toPause)
+    {
+        ArrayList<ColumnFamilyStore> successfullyPaused = new ArrayList<>();
+        try
+        {
+            for (ColumnFamilyStore cfs : toPause)
             {
-                for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
-                    cfs.getCompactionStrategyManager().resume();
+                successfullyPaused.ensureCapacity(successfullyPaused.size() + 
1); // to avoid OOM:ing after pausing the strategies
+                cfs.getCompactionStrategyManager().pause();
+                successfullyPaused.add(cfs);
+            }
+            return () -> maybeFail(resumeAll(null, toPause));
+        }
+        catch (Throwable t)
+        {
+            resumeAll(t, successfullyPaused);
+            throw t;
+        }
+    }
+
+    private static Throwable resumeAll(Throwable accumulate, 
Iterable<ColumnFamilyStore> cfss)
+    {
+        for (ColumnFamilyStore cfs : cfss)
+        {
+            try
+            {
+                cfs.getCompactionStrategyManager().resume();
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
             }
         }
+        return accumulate;
     }
 
     public LifecycleTransaction markAllCompacting(final OperationType 
operationType)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 404c07f..2bae5f8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -169,10 +169,17 @@ public final class CompactionInfo implements Serializable
             stopRequested = true;
         }
 
+        /**
+         * if this compaction involves several/all tables we can safely check 
globalCompactionsPaused
+         * in isStopRequested() below
+         */
+        public abstract boolean isGlobal();
+
         public boolean isStopRequested()
         {
-            return stopRequested;
+            return stopRequested || (isGlobal() && 
CompactionManager.instance.isGlobalCompactionPaused());
         }
+
         /**
          * report event on the size of the compaction.
          */
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index bea365c..39cb2df 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -126,6 +126,11 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                                   compactionId);
     }
 
+    public boolean isGlobal()
+    {
+        return false;
+    }
+
     private void updateCounterFor(int rows)
     {
         assert rows > 0 && rows - 1 < mergeCounters.length;
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 694ad62..2b9ee50 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -21,8 +21,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
@@ -114,6 +114,9 @@ public class CompactionManager implements 
CompactionManagerMBean
     @VisibleForTesting
     final Multiset<ColumnFamilyStore> compactingCF = 
ConcurrentHashMultiset.create();
 
+    // used to temporarily pause non-strategy managed compactions (like index 
summary redistribution)
+    private final AtomicInteger globalCompactionPauseCount = new 
AtomicInteger(0);
+
     private final RateLimiter compactionRateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
 
     /**
@@ -1854,4 +1857,26 @@ public class CompactionManager implements 
CompactionManagerMBean
                 break;
         }
     }
+
+    /**
+     * Return whether "global" compactions should be paused, used by 
ColumnFamilyStore#runWithCompactionsDisabled
+     *
+     * a global compaction is one that includes several/all tables, currently 
only IndexSummaryBuilder
+     */
+    public boolean isGlobalCompactionPaused()
+    {
+        return globalCompactionPauseCount.get() > 0;
+    }
+
+    public CompactionPauser pauseGlobalCompaction()
+    {
+        CompactionPauser pauser = globalCompactionPauseCount::decrementAndGet;
+        globalCompactionPauseCount.incrementAndGet();
+        return pauser;
+    }
+
+    public interface CompactionPauser extends AutoCloseable
+    {
+        public void close();
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 1636ab9..3437de7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -178,14 +178,17 @@ public class CompactionTask extends AbstractCompactionTask
             {
                 long lastCheckObsoletion = start;
 
-                if (!controller.cfs.getCompactionStrategyManager().isActive)
-                    throw new 
CompactionInterruptedException(ci.getCompactionInfo());
-
                 if (collector != null)
                     collector.beginCompaction(ci);
 
                 try (CompactionAwareWriter writer = 
getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
                 {
+                    // Note that we need to re-check this flag after calling 
beginCompaction above to avoid a window
+                    // where the compaction does not exist in 
activeCompactions but the CSM gets paused.
+                    // We already have the sstables marked compacting here so 
CompactionManager#waitForCessation will
+                    // block until the below exception is thrown and the 
transaction is cancelled.
+                    if 
(!controller.cfs.getCompactionStrategyManager().isActive)
+                        throw new 
CompactionInterruptedException(ci.getCompactionInfo());
                     estimatedKeys = writer.estimatedKeys();
                     while (ci.hasNext())
                     {
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java 
b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index bc11504..463cc9c 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -482,6 +482,11 @@ public class Scrubber implements Closeable
                 throw new RuntimeException();
             }
         }
+
+        public boolean isGlobal()
+        {
+            return false;
+        }
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java 
b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 586c754..82eabd0 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -303,6 +303,11 @@ public class Verifier implements Closeable
                 throw new RuntimeException();
             }
         }
+
+        public boolean isGlobal()
+        {
+            return false;
+        }
     }
 
     private static class VerifyController extends CompactionController
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java 
b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 68cb265..57bba29 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -65,8 +65,6 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     private static final Logger logger = 
LoggerFactory.getLogger(ViewBuilder.class);
 
-    private volatile boolean isStopped = false;
-
     public ViewBuilder(ColumnFamilyStore baseCfs, View view)
     {
         this.baseCfs = baseCfs;
@@ -143,7 +141,7 @@ public class ViewBuilder extends CompactionInfo.Holder
         try (Refs<SSTableReader> sstables = 
baseCfs.selectAndReference(function).refs;
              ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
         {
-            while (!isStopped && iter.hasNext())
+            while (!isStopRequested() && iter.hasNext())
             {
                 DecoratedKey key = iter.next();
                 Token token = key.getToken();
@@ -168,7 +166,7 @@ public class ViewBuilder extends CompactionInfo.Holder
                 }
             }
 
-            if (!isStopped)
+            if (!isStopRequested())
             {
                 logger.debug("Marking view({}.{}) as built covered {} keys ", 
ksname, viewName, keysBuilt);
                 SystemKeyspace.finishViewBuildStatus(ksname, viewName);
@@ -207,8 +205,8 @@ public class ViewBuilder extends CompactionInfo.Holder
          return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, 
rangesCompleted, rangesTotal, Unit.RANGES, compactionId);
     }
 
-    public void stop()
+    public boolean isGlobal()
     {
-        isStopped = true;
+        return false;
     }
 }
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
index c627b2d..907f65f 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
@@ -73,4 +73,9 @@ public class SecondaryIndexBuilder extends 
CompactionInfo.Holder
             iter.close();
         }
     }
+
+    public boolean isGlobal()
+    {
+        return false;
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index ae79217..dea1cd6 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -224,6 +224,8 @@ public class IndexSummaryManager implements 
IndexSummaryManagerMBean
 
     public void redistributeSummaries() throws IOException
     {
+        if (CompactionManager.instance.isGlobalCompactionPaused())
+            return;
         Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> 
compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
         try
         {
diff --git 
a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index ff362e8..189ee2d 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.CompactionInfo.Unit;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -312,6 +313,11 @@ public class IndexSummaryRedistribution extends 
CompactionInfo.Holder
         return new CompactionInfo(OperationType.INDEX_SUMMARY, 
(memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId);
     }
 
+    public boolean isGlobal()
+    {
+        return true;
+    }
+
     /** Utility class for sorting sstables by their read rates. */
     private static class ReadRateComparator implements 
Comparator<SSTableReader>
     {
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java 
b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index e2b17e8..a85be24 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -449,7 +449,7 @@ public class AntiCompactionTest
         return 
ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> 
!s.isRepaired()));
     }
 
-    static void assertOnDiskState(ColumnFamilyStore cfs, int 
expectedSSTableCount)
+    public static void assertOnDiskState(ColumnFamilyStore cfs, int 
expectedSSTableCount)
     {
         LifecycleTransaction.waitForDeletions();
         assertEquals(expectedSSTableCount, cfs.getLiveSSTables().size());
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
new file mode 100644
index 0000000..68ba6bf
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.metrics.CompactionMetrics;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class CancelCompactionsTest extends CQLTester
+{
+    @Test
+    public void testStandardCompactionTaskCancellation() throws Throwable
+    {
+        createTable("create table %s (id int primary key, something int)");
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+
+        for (int i = 0; i < 10; i++)
+        {
+            execute("insert into %s (id, something) values (?,?)", i, i);
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+        AbstractCompactionTask ct = null;
+
+        for (AbstractCompactionStrategy cs : 
getCurrentColumnFamilyStore().getCompactionStrategyManager().getStrategies())
+        {
+            ct = cs.getNextBackgroundTask(0);
+            if (ct != null)
+                break;
+        }
+        assertNotNull(ct);
+
+        CountDownLatch waitForBeginCompaction = new CountDownLatch(1);
+        CountDownLatch waitForStart = new CountDownLatch(1);
+        Iterable<CFMetaData> metadatas = 
Collections.singleton(getCurrentColumnFamilyStore().metadata);
+        /*
+        Here we ask strategies to pause & interrupt compactions right before 
calling beginCompaction in CompactionTask
+        The code running in the separate thread below mimics 
CFS#runWithCompactionsDisabled but we only allow
+        the real beginCompaction to be called after pausing & interrupting.
+         */
+        Thread t = new Thread(() -> {
+            Uninterruptibles.awaitUninterruptibly(waitForBeginCompaction);
+            
getCurrentColumnFamilyStore().getCompactionStrategyManager().pause();
+            CompactionManager.instance.interruptCompactionFor(metadatas, 
false);
+            waitForStart.countDown();
+            
CompactionManager.instance.waitForCessation(Collections.singleton(getCurrentColumnFamilyStore()));
+            
getCurrentColumnFamilyStore().getCompactionStrategyManager().resume();
+        });
+        t.start();
+
+        try
+        {
+            ct.execute(new CompactionMetrics()
+            {
+                @Override
+                public void beginCompaction(CompactionInfo.Holder ci)
+                {
+                    waitForBeginCompaction.countDown();
+                    Uninterruptibles.awaitUninterruptibly(waitForStart);
+                    super.beginCompaction(ci);
+                }
+            });
+            fail("execute should throw CompactionInterruptedException");
+        }
+        catch (CompactionInterruptedException cie)
+        {
+            // expected
+        }
+        finally
+        {
+            ct.transaction.abort();
+            t.join();
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java 
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index e86348b..9eb63c5 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static com.google.common.collect.ImmutableMap.of;
 import static java.util.Arrays.asList;
+import static 
org.apache.cassandra.db.compaction.AntiCompactionTest.assertOnDiskState;
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 import static 
org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD;
 import static 
org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD;
@@ -63,6 +64,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 
 @RunWith(OrderedJUnit4ClassRunner.class)
@@ -676,9 +678,47 @@ public class IndexSummaryManagerTest
                                  Joiner.on(",").join(disjoint)),
                    disjoint.isEmpty());
 
+        assertOnDiskState(cfs, numSSTables);
         validateData(cfs, numRows);
     }
 
+    @Test
+    public void testPauseIndexSummaryManager() throws Exception
+    {
+        String ksname = KEYSPACE1;
+        String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no 
key caching
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        int numSSTables = 4;
+        int numRows = 256;
+        createSSTables(ksname, cfname, numSSTables, numRows);
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        for (SSTableReader sstable : sstables)
+            sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+
+        long singleSummaryOffHeapSpace = 
sstables.get(0).getIndexSummaryOffHeapSize();
+
+        // everything should get cut in half
+        assert sstables.size() == numSSTables;
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
+        {
+            try (AutoCloseable toresume = 
CompactionManager.instance.pauseGlobalCompaction())
+            {
+                sstables = redistributeSummaries(Collections.emptyList(), 
of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)));
+                fail("The redistribution should fail - we got paused before 
adding to active compactions, but after marking compacting");
+            }
+        }
+        catch (CompactionInterruptedException e)
+        {
+            // expected
+        }
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL, 
sstable.getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+        assertOnDiskState(cfs, numSSTables);
+    }
+
     private static List<SSTableReader> 
redistributeSummaries(List<SSTableReader> compacting,
                                                              Map<UUID, 
LifecycleTransaction> transactions,
                                                              long 
memoryPoolBytes)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to