This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 4b547f14192711f1da06b454dddcf4037f3d8fcd Merge: 2d90e3c b40d79c Author: Marcus Eriksson <[email protected]> AuthorDate: Thu Oct 31 14:27:29 2019 +0100 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 5 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 43 +++++++-- .../cassandra/db/compaction/CompactionInfo.java | 8 +- .../db/compaction/CompactionIterator.java | 5 + .../cassandra/db/compaction/CompactionManager.java | 26 +++++ .../cassandra/db/compaction/CompactionTask.java | 9 +- .../apache/cassandra/db/compaction/Scrubber.java | 5 + .../apache/cassandra/db/compaction/Verifier.java | 5 + .../org/apache/cassandra/db/view/ViewBuilder.java | 10 +- .../cassandra/index/SecondaryIndexBuilder.java | 4 + .../cassandra/io/sstable/IndexSummaryManager.java | 2 + .../io/sstable/IndexSummaryRedistribution.java | 6 ++ .../db/compaction/AntiCompactionTest.java | 2 +- .../db/compaction/CancelCompactionsTest.java | 105 +++++++++++++++++++++ .../io/sstable/IndexSummaryManagerTest.java | 40 ++++++++ 16 files changed, 259 insertions(+), 17 deletions(-) diff --cc CHANGES.txt index c21b2cf,9a44b27..4240841 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,7 -1,7 +1,8 @@@ -3.0.20 +3.11.6 +Merged from 3.0: + * Make sure index summary redistribution does not start when compactions are paused (CASSANDRA-15265) * Ensure legacy rows have primary key livenessinfo when they contain illegal cells (CASSANDRA-15365) -Merged from 2.2 +Merged from 2.2: * In-JVM DTest: Set correct internode message version for upgrade test (CASSANDRA-15371) diff --cc src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index 93bb4c9,2bae5f8..a6dbd9d --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@@ -165,9 -169,34 +165,15 @@@ public final class CompactionInfo imple stopRequested = true; } + /** + * if this compaction involves several/all tables we can safely check globalCompactionsPaused + * in isStopRequested() below + */ + public abstract boolean isGlobal(); + public boolean isStopRequested() { - return stopRequested; + return stopRequested || (isGlobal() && CompactionManager.instance.isGlobalCompactionPaused()); } - - /** - * report event on the size of the compaction. - */ - public void started() - { - reportedSeverity = getCompactionInfo().getTotal() / load; - StorageService.instance.reportSeverity(reportedSeverity); - } - - /** - * remove the event complete - */ - public void finished() - { - if (reportedSeverity != 0d) - StorageService.instance.reportSeverity(-(reportedSeverity)); - reportedSeverity = 0d; - } } } diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 1a9da37,2b9ee50..a08d08b --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -21,8 -21,8 +21,9 @@@ import java.io.File import java.io.IOException; import java.util.*; import java.util.concurrent.*; + import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import java.util.stream.Collectors; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index b2e9b8c,3437de7..2efcd11 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -177,21 -177,18 +177,24 @@@ public class CompactionTask extends Abs CompactionIterator ci = new CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, taskId)) { long lastCheckObsoletion = start; + inputSizeBytes = scanners.getTotalCompressedSize(); + double compressionRatio = scanners.getCompressionRatio(); + if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO) + compressionRatio = 1.0; + + long lastBytesScanned = 0; - if (!controller.cfs.getCompactionStrategyManager().isActive()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - if (collector != null) collector.beginCompaction(ci); try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact)) { + // Note that we need to re-check this flag after calling beginCompaction above to avoid a window + // where the compaction does not exist in activeCompactions but the CSM gets paused. + // We already have the sstables marked compacting here so CompactionManager#waitForCessation will + // block until the below exception is thrown and the transaction is cancelled. - if (!controller.cfs.getCompactionStrategyManager().isActive) ++ if (!controller.cfs.getCompactionStrategyManager().isActive()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); estimatedKeys = writer.estimatedKeys(); while (ci.hasNext()) { diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java index f8fa548,463cc9c..622e793 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@@ -477,9 -479,14 +477,14 @@@ public class Scrubber implements Closea } catch (Exception e) { - throw new RuntimeException(); + throw new RuntimeException(e); } } + + public boolean isGlobal() + { + return false; + } } @VisibleForTesting diff --cc src/java/org/apache/cassandra/db/view/ViewBuilder.java index d9c9e71,57bba29..c4314f2 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@@ -147,8 -141,7 +145,8 @@@ public class ViewBuilder extends Compac try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs; ReducingKeyIterator iter = new ReducingKeyIterator(sstables)) { + SystemDistributedKeyspace.startViewBuild(ksname, viewName, localHostId); - while (!isStopped && iter.hasNext()) + while (!isStopRequested() && iter.hasNext()) { DecoratedKey key = iter.next(); Token token = key.getToken(); diff --cc src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java index 9ec8a4e,907f65f..8276626 --- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java @@@ -22,7 -31,51 +22,11 @@@ import org.apache.cassandra.db.compacti /** * Manages building an entire index from column family data. Runs on to compaction manager. */ -public class SecondaryIndexBuilder extends CompactionInfo.Holder +public abstract class SecondaryIndexBuilder extends CompactionInfo.Holder { - private final ColumnFamilyStore cfs; - private final Set<Index> indexers; - private final ReducingKeyIterator iter; - private final UUID compactionId; - - public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter) - { - this.cfs = cfs; - this.indexers = indexers; - this.iter = iter; - this.compactionId = UUIDGen.getTimeUUID(); - } - - public CompactionInfo getCompactionInfo() - { - return new CompactionInfo(cfs.metadata, - OperationType.INDEX_BUILD, - iter.getBytesRead(), - iter.getTotalBytes(), - compactionId); - } - - public void build() - { - try - { - int pageSize = cfs.indexManager.calculateIndexingPageSize(); - while (iter.hasNext()) - { - if (isStopRequested()) - throw new CompactionInterruptedException(getCompactionInfo()); - DecoratedKey key = iter.next(); - cfs.indexManager.indexPartition(key, indexers, pageSize); - } - } - finally - { - iter.close(); - } - } - + public abstract void build(); + public boolean isGlobal() + { + return false; + } } diff --cc test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java index 0000000,68ba6bf..bcbe92d mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java @@@ -1,0 -1,98 +1,105 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db.compaction; + + import java.util.Collections; ++import java.util.List; + import java.util.concurrent.CountDownLatch; + + import com.google.common.util.concurrent.Uninterruptibles; + import org.junit.Test; + + import org.apache.cassandra.config.CFMetaData; + import org.apache.cassandra.cql3.CQLTester; + import org.apache.cassandra.metrics.CompactionMetrics; ++ + import static org.junit.Assert.assertNotNull; + import static org.junit.Assert.fail; + + public class CancelCompactionsTest extends CQLTester + { + @Test + public void testStandardCompactionTaskCancellation() throws Throwable + { + createTable("create table %s (id int primary key, something int)"); + getCurrentColumnFamilyStore().disableAutoCompaction(); + + for (int i = 0; i < 10; i++) + { + execute("insert into %s (id, something) values (?,?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + } + AbstractCompactionTask ct = null; + - for (AbstractCompactionStrategy cs : getCurrentColumnFamilyStore().getCompactionStrategyManager().getStrategies()) ++ for (List<AbstractCompactionStrategy> css : getCurrentColumnFamilyStore().getCompactionStrategyManager().getStrategies()) + { - ct = cs.getNextBackgroundTask(0); ++ for (AbstractCompactionStrategy cs : css) ++ { ++ ct = cs.getNextBackgroundTask(0); ++ if (ct != null) ++ break; ++ } + if (ct != null) + break; + } + assertNotNull(ct); + + CountDownLatch waitForBeginCompaction = new CountDownLatch(1); + CountDownLatch waitForStart = new CountDownLatch(1); + Iterable<CFMetaData> metadatas = Collections.singleton(getCurrentColumnFamilyStore().metadata); + /* + Here we ask strategies to pause & interrupt compactions right before calling beginCompaction in CompactionTask + The code running in the separate thread below mimics CFS#runWithCompactionsDisabled but we only allow + the real beginCompaction to be called after pausing & interrupting. + */ + Thread t = new Thread(() -> { + Uninterruptibles.awaitUninterruptibly(waitForBeginCompaction); + getCurrentColumnFamilyStore().getCompactionStrategyManager().pause(); + CompactionManager.instance.interruptCompactionFor(metadatas, false); + waitForStart.countDown(); + CompactionManager.instance.waitForCessation(Collections.singleton(getCurrentColumnFamilyStore())); + getCurrentColumnFamilyStore().getCompactionStrategyManager().resume(); + }); + t.start(); + + try + { + ct.execute(new CompactionMetrics() + { + @Override + public void beginCompaction(CompactionInfo.Holder ci) + { + waitForBeginCompaction.countDown(); + Uninterruptibles.awaitUninterruptibly(waitForStart); + super.beginCompaction(ci); + } + }); + fail("execute should throw CompactionInterruptedException"); + } + catch (CompactionInterruptedException cie) + { + // expected + } + finally + { + ct.transaction.abort(); + t.join(); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
