This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 4b547f14192711f1da06b454dddcf4037f3d8fcd
Merge: 2d90e3c b40d79c
Author: Marcus Eriksson <[email protected]>
AuthorDate: Thu Oct 31 14:27:29 2019 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java    |   5 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  43 +++++++--
 .../cassandra/db/compaction/CompactionInfo.java    |   8 +-
 .../db/compaction/CompactionIterator.java          |   5 +
 .../cassandra/db/compaction/CompactionManager.java |  26 +++++
 .../cassandra/db/compaction/CompactionTask.java    |   9 +-
 .../apache/cassandra/db/compaction/Scrubber.java   |   5 +
 .../apache/cassandra/db/compaction/Verifier.java   |   5 +
 .../org/apache/cassandra/db/view/ViewBuilder.java  |  10 +-
 .../cassandra/index/SecondaryIndexBuilder.java     |   4 +
 .../cassandra/io/sstable/IndexSummaryManager.java  |   2 +
 .../io/sstable/IndexSummaryRedistribution.java     |   6 ++
 .../db/compaction/AntiCompactionTest.java          |   2 +-
 .../db/compaction/CancelCompactionsTest.java       | 105 +++++++++++++++++++++
 .../io/sstable/IndexSummaryManagerTest.java        |  40 ++++++++
 16 files changed, 259 insertions(+), 17 deletions(-)

diff --cc CHANGES.txt
index c21b2cf,9a44b27..4240841
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,7 +1,8 @@@
 -3.0.20
 +3.11.6
 +Merged from 3.0:
+  * Make sure index summary redistribution does not start when compactions are 
paused (CASSANDRA-15265)
   * Ensure legacy rows have primary key livenessinfo when they contain illegal 
cells (CASSANDRA-15365)
 -Merged from 2.2
 +Merged from 2.2:
   * In-JVM DTest: Set correct internode message version for upgrade test 
(CASSANDRA-15371)
  
  
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 93bb4c9,2bae5f8..a6dbd9d
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@@ -165,9 -169,34 +165,15 @@@ public final class CompactionInfo imple
              stopRequested = true;
          }
  
+         /**
+          * if this compaction involves several/all tables we can safely check 
globalCompactionsPaused
+          * in isStopRequested() below
+          */
+         public abstract boolean isGlobal();
+ 
          public boolean isStopRequested()
          {
-             return stopRequested;
+             return stopRequested || (isGlobal() && 
CompactionManager.instance.isGlobalCompactionPaused());
          }
 -
 -        /**
 -         * report event on the size of the compaction.
 -         */
 -        public void started()
 -        {
 -            reportedSeverity = getCompactionInfo().getTotal() / load;
 -            StorageService.instance.reportSeverity(reportedSeverity);
 -        }
 -
 -        /**
 -         * remove the event complete
 -         */
 -        public void finished()
 -        {
 -            if (reportedSeverity != 0d)
 -                StorageService.instance.reportSeverity(-(reportedSeverity));
 -            reportedSeverity = 0d;
 -        }
      }
  }
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 1a9da37,2b9ee50..a08d08b
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -21,8 -21,8 +21,9 @@@ import java.io.File
  import java.io.IOException;
  import java.util.*;
  import java.util.concurrent.*;
+ import java.util.concurrent.atomic.AtomicInteger;
  import java.util.function.Predicate;
 +import java.util.stream.Collectors;
  import javax.management.openmbean.OpenDataException;
  import javax.management.openmbean.TabularData;
  
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index b2e9b8c,3437de7..2efcd11
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -177,21 -177,18 +177,24 @@@ public class CompactionTask extends Abs
                   CompactionIterator ci = new 
CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, 
taskId))
              {
                  long lastCheckObsoletion = start;
 +                inputSizeBytes = scanners.getTotalCompressedSize();
 +                double compressionRatio = scanners.getCompressionRatio();
 +                if (compressionRatio == 
MetadataCollector.NO_COMPRESSION_RATIO)
 +                    compressionRatio = 1.0;
 +
 +                long lastBytesScanned = 0;
  
-                 if (!controller.cfs.getCompactionStrategyManager().isActive())
-                     throw new 
CompactionInterruptedException(ci.getCompactionInfo());
- 
                  if (collector != null)
                      collector.beginCompaction(ci);
  
                  try (CompactionAwareWriter writer = 
getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
                  {
+                     // Note that we need to re-check this flag after calling 
beginCompaction above to avoid a window
+                     // where the compaction does not exist in 
activeCompactions but the CSM gets paused.
+                     // We already have the sstables marked compacting here so 
CompactionManager#waitForCessation will
+                     // block until the below exception is thrown and the 
transaction is cancelled.
 -                    if 
(!controller.cfs.getCompactionStrategyManager().isActive)
++                    if 
(!controller.cfs.getCompactionStrategyManager().isActive())
+                         throw new 
CompactionInterruptedException(ci.getCompactionInfo());
                      estimatedKeys = writer.estimatedKeys();
                      while (ci.hasNext())
                      {
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index f8fa548,463cc9c..622e793
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -477,9 -479,14 +477,14 @@@ public class Scrubber implements Closea
              }
              catch (Exception e)
              {
 -                throw new RuntimeException();
 +                throw new RuntimeException(e);
              }
          }
+ 
+         public boolean isGlobal()
+         {
+             return false;
+         }
      }
  
      @VisibleForTesting
diff --cc src/java/org/apache/cassandra/db/view/ViewBuilder.java
index d9c9e71,57bba29..c4314f2
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@@ -147,8 -141,7 +145,8 @@@ public class ViewBuilder extends Compac
          try (Refs<SSTableReader> sstables = 
baseCfs.selectAndReference(function).refs;
               ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
          {
 +            SystemDistributedKeyspace.startViewBuild(ksname, viewName, 
localHostId);
-             while (!isStopped && iter.hasNext())
+             while (!isStopRequested() && iter.hasNext())
              {
                  DecoratedKey key = iter.next();
                  Token token = key.getToken();
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
index 9ec8a4e,907f65f..8276626
--- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
@@@ -22,7 -31,51 +22,11 @@@ import org.apache.cassandra.db.compacti
  /**
   * Manages building an entire index from column family data. Runs on to 
compaction manager.
   */
 -public class SecondaryIndexBuilder extends CompactionInfo.Holder
 +public abstract class SecondaryIndexBuilder extends CompactionInfo.Holder
  {
 -    private final ColumnFamilyStore cfs;
 -    private final Set<Index> indexers;
 -    private final ReducingKeyIterator iter;
 -    private final UUID compactionId;
 -
 -    public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, 
ReducingKeyIterator iter)
 -    {
 -        this.cfs = cfs;
 -        this.indexers = indexers;
 -        this.iter = iter;
 -        this.compactionId = UUIDGen.getTimeUUID();
 -    }
 -
 -    public CompactionInfo getCompactionInfo()
 -    {
 -        return new CompactionInfo(cfs.metadata,
 -                                  OperationType.INDEX_BUILD,
 -                                  iter.getBytesRead(),
 -                                  iter.getTotalBytes(),
 -                                  compactionId);
 -    }
 -
 -    public void build()
 -    {
 -        try
 -        {
 -            int pageSize = cfs.indexManager.calculateIndexingPageSize();
 -            while (iter.hasNext())
 -            {
 -                if (isStopRequested())
 -                    throw new 
CompactionInterruptedException(getCompactionInfo());
 -                DecoratedKey key = iter.next();
 -                cfs.indexManager.indexPartition(key, indexers, pageSize);
 -            }
 -        }
 -        finally
 -        {
 -            iter.close();
 -        }
 -    }
 -
 +    public abstract void build();
+     public boolean isGlobal()
+     {
+         return false;
+     }
  }
diff --cc 
test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
index 0000000,68ba6bf..bcbe92d
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
@@@ -1,0 -1,98 +1,105 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.util.Collections;
++import java.util.List;
+ import java.util.concurrent.CountDownLatch;
+ 
+ import com.google.common.util.concurrent.Uninterruptibles;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.metrics.CompactionMetrics;
++
+ import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.fail;
+ 
+ public class CancelCompactionsTest extends CQLTester
+ {
+     @Test
+     public void testStandardCompactionTaskCancellation() throws Throwable
+     {
+         createTable("create table %s (id int primary key, something int)");
+         getCurrentColumnFamilyStore().disableAutoCompaction();
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             execute("insert into %s (id, something) values (?,?)", i, i);
+             getCurrentColumnFamilyStore().forceBlockingFlush();
+         }
+         AbstractCompactionTask ct = null;
+ 
 -        for (AbstractCompactionStrategy cs : 
getCurrentColumnFamilyStore().getCompactionStrategyManager().getStrategies())
++        for (List<AbstractCompactionStrategy> css : 
getCurrentColumnFamilyStore().getCompactionStrategyManager().getStrategies())
+         {
 -            ct = cs.getNextBackgroundTask(0);
++            for (AbstractCompactionStrategy cs : css)
++            {
++                ct = cs.getNextBackgroundTask(0);
++                if (ct != null)
++                    break;
++            }
+             if (ct != null)
+                 break;
+         }
+         assertNotNull(ct);
+ 
+         CountDownLatch waitForBeginCompaction = new CountDownLatch(1);
+         CountDownLatch waitForStart = new CountDownLatch(1);
+         Iterable<CFMetaData> metadatas = 
Collections.singleton(getCurrentColumnFamilyStore().metadata);
+         /*
+         Here we ask strategies to pause & interrupt compactions right before 
calling beginCompaction in CompactionTask
+         The code running in the separate thread below mimics 
CFS#runWithCompactionsDisabled but we only allow
+         the real beginCompaction to be called after pausing & interrupting.
+          */
+         Thread t = new Thread(() -> {
+             Uninterruptibles.awaitUninterruptibly(waitForBeginCompaction);
+             
getCurrentColumnFamilyStore().getCompactionStrategyManager().pause();
+             CompactionManager.instance.interruptCompactionFor(metadatas, 
false);
+             waitForStart.countDown();
+             
CompactionManager.instance.waitForCessation(Collections.singleton(getCurrentColumnFamilyStore()));
+             
getCurrentColumnFamilyStore().getCompactionStrategyManager().resume();
+         });
+         t.start();
+ 
+         try
+         {
+             ct.execute(new CompactionMetrics()
+             {
+                 @Override
+                 public void beginCompaction(CompactionInfo.Holder ci)
+                 {
+                     waitForBeginCompaction.countDown();
+                     Uninterruptibles.awaitUninterruptibly(waitForStart);
+                     super.beginCompaction(ci);
+                 }
+             });
+             fail("execute should throw CompactionInterruptedException");
+         }
+         catch (CompactionInterruptedException cie)
+         {
+             // expected
+         }
+         finally
+         {
+             ct.transaction.abort();
+             t.join();
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to