This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 8fdd6c74ede5ad30a517d372f08ac553b35e04cd Merge: 9063cea 08018ab Author: Marcus Eriksson <[email protected]> AuthorDate: Tue Mar 26 10:12:48 2019 +0100 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionInfo.java | 5 - .../cassandra/db/compaction/CompactionManager.java | 2 +- .../cassandra/io/sstable/IndexSummaryManager.java | 38 +++++-- .../io/sstable/IndexSummaryRedistribution.java | 26 +++-- .../org/apache/cassandra/utils/FBUtilities.java | 2 +- test/unit/org/apache/cassandra/Util.java | 27 +++++ .../db/compaction/ActiveCompactionsTest.java | 6 +- .../db/compaction/AntiCompactionBytemanTest.java | 3 +- .../db/compaction/AntiCompactionTest.java | 22 +--- .../io/sstable/IndexSummaryManagerTest.java | 111 ++++++++++++++------- 11 files changed, 154 insertions(+), 89 deletions(-) diff --cc CHANGES.txt index 131bcb2,42fd101..a4c285f --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -364,8 -7,10 +364,9 @@@ Merged from 3.0 * Add missing commands to nodetool_completion (CASSANDRA-14916) * Anti-compaction temporarily corrupts sstable state for readers (CASSANDRA-15004) Merged from 2.2: + * Fix index summary redistribution cancellation (CASSANDRA-15045) * Refactor Circle CI configuration (CASSANDRA-14806) * Fixing invalid CQL in security documentation (CASSANDRA-15020) - * Multi-version in-JVM dtests (CASSANDRA-14937) 3.11.4 diff --cc src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index 7c950c0,93bb4c9..09bed74 --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@@ -49,19 -34,41 +49,14 @@@ public final class CompactionInf private final long total; private final Unit unit; private final UUID compactionId; + private final ImmutableSet<SSTableReader> sstables; - public static enum Unit - { - BYTES("bytes"), RANGES("ranges"), KEYS("keys"); - - private final String name; - - private Unit(String name) - { - this.name = name; - } - - @Override - public String toString() - { - return name; - } - - public static boolean isFileSize(String unit) - { - return BYTES.toString().equals(unit); - } - } - - public CompactionInfo(CFMetaData cfm, OperationType tasktype, long bytesComplete, long totalBytes, UUID compactionId) - { - this(cfm, tasktype, bytesComplete, totalBytes, Unit.BYTES, compactionId); - } - - public CompactionInfo(OperationType tasktype, long completed, long total, Unit unit, UUID compactionId) + public CompactionInfo(TableMetadata metadata, OperationType tasktype, long bytesComplete, long totalBytes, UUID compactionId, Collection<SSTableReader> sstables) { - this(null, tasktype, completed, total, unit, compactionId); + this(metadata, tasktype, bytesComplete, totalBytes, Unit.BYTES, compactionId, sstables); } - public CompactionInfo(OperationType tasktype, long completed, long total, Unit unit, UUID compactionId, Collection<SSTableReader> sstables) - { - this(null, tasktype, completed, total, unit, compactionId, sstables); - } - - public CompactionInfo(CFMetaData cfm, OperationType tasktype, long completed, long total, Unit unit, UUID compactionId) + private CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, UUID compactionId, Collection<SSTableReader> sstables) { this.tasktype = tasktype; this.completed = completed; diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index b388098,e0ec179..d387701 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -2107,11 -2078,9 +2107,11 @@@ public class CompactionManager implemen if ((info.getTaskType() == OperationType.VALIDATION) && !interruptValidation) continue; - if (Iterables.contains(columnFamilies, info.getTableMetadata())) - // cfmetadata is null for index summary redistributions which are 'global' - they involve all keyspaces/tables - if (info.getCFMetaData() == null || Iterables.contains(columnFamilies, info.getCFMetaData())) - compactionHolder.stop(); // signal compaction to stop ++ if (info.getTableMetadata() == null || Iterables.contains(columnFamilies, info.getTableMetadata())) + { + if (info.shouldStop(sstablePredicate)) + compactionHolder.stop(); + } } } diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index 3630c2a,507b6fa..2d58cf8 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@@ -42,7 -44,7 +43,8 @@@ import org.apache.cassandra.db.lifecycl import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.TableId; + import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; @@@ -186,14 -188,14 +188,17 @@@ public class IndexSummaryManager implem } /** -- * Returns a Pair of all compacting and non-compacting sstables. Non-compacting sstables will be marked as -- * compacting. ++ * Marks the non-compacting sstables as compacting for index summary redistribution for all keyspaces/tables. ++ * ++ * @return Pair containing: ++ * left: total size of the off heap index summaries for the sstables we were unable to mark compacting (they were involved in other compactions) ++ * right: the transactions, keyed by table id. */ @SuppressWarnings("resource") - private Pair<List<SSTableReader>, Map<TableId, LifecycleTransaction>> getCompactingAndNonCompactingSSTables() - private Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> getCompactingAndNonCompactingSSTables() ++ private Pair<Long, Map<TableId, LifecycleTransaction>> getRestributionTransactions() { List<SSTableReader> allCompacting = new ArrayList<>(); - Map<UUID, LifecycleTransaction> allNonCompacting = new HashMap<>(); + Map<TableId, LifecycleTransaction> allNonCompacting = new HashMap<>(); for (Keyspace ks : Keyspace.all()) { for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores()) @@@ -212,22 -214,34 +217,37 @@@ allCompacting.addAll(Sets.difference(allSSTables, nonCompacting)); } } -- return Pair.create(allCompacting, allNonCompacting); ++ long nonRedistributingOffHeapSize = allCompacting.stream().mapToLong(SSTableReader::getIndexSummaryOffHeapSize).sum(); ++ return Pair.create(nonRedistributingOffHeapSize, allNonCompacting); } public void redistributeSummaries() throws IOException { - Pair<List<SSTableReader>, Map<TableId, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables(); - Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables(); ++ Pair<Long, Map<TableId, LifecycleTransaction>> redistributionTransactionInfo = getRestributionTransactions(); ++ Map<TableId, LifecycleTransaction> transactions = redistributionTransactionInfo.right; ++ long nonRedistributingOffHeapSize = redistributionTransactionInfo.left; try { -- redistributeSummaries(new IndexSummaryRedistribution(compactingAndNonCompacting.left, -- compactingAndNonCompacting.right, ++ redistributeSummaries(new IndexSummaryRedistribution(transactions, ++ nonRedistributingOffHeapSize, this.memoryPoolBytes)); } + catch (Exception e) + { + if (!(e instanceof CompactionInterruptedException)) + logger.error("Got exception during index summary redistribution", e); + throw e; + } finally { - for (LifecycleTransaction modifier : compactingAndNonCompacting.right.values()) - modifier.close(); + try + { - FBUtilities.closeAll(compactingAndNonCompacting.right.values()); ++ FBUtilities.closeAll(transactions.values()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } } diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java index b4fca41,ab23ef3..a8fcad1 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java @@@ -58,16 -58,16 +58,22 @@@ public class IndexSummaryRedistributio static final double UPSAMPLE_THRESHOLD = 1.5; static final double DOWNSAMPLE_THESHOLD = 0.75; -- private final List<SSTableReader> compacting; - private final Map<UUID, LifecycleTransaction> transactions; + private final Map<TableId, LifecycleTransaction> transactions; ++ private final long nonRedistributingOffHeapSize; private final long memoryPoolBytes; private final UUID compactionId; private volatile long remainingSpace; - public IndexSummaryRedistribution(List<SSTableReader> compacting, Map<TableId, LifecycleTransaction> transactions, long memoryPoolBytes) - public IndexSummaryRedistribution(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) ++ /** ++ * ++ * @param transactions the transactions for the different keyspaces/tables we are to redistribute ++ * @param nonRedistributingOffHeapSize the total index summary off heap size for all sstables we were not able to mark compacting (due to them being involved in other compactions) ++ * @param memoryPoolBytes size of the memory pool ++ */ ++ public IndexSummaryRedistribution(Map<TableId, LifecycleTransaction> transactions, long nonRedistributingOffHeapSize, long memoryPoolBytes) { -- this.compacting = compacting; this.transactions = transactions; ++ this.nonRedistributingOffHeapSize = nonRedistributingOffHeapSize; this.memoryPoolBytes = memoryPoolBytes; this.compactionId = UUID.randomUUID(); } @@@ -81,8 -82,19 +87,8 @@@ redistribute.addAll(txn.originals()); } -- long total = 0; -- for (SSTableReader sstable : Iterables.concat(compacting, redistribute)) ++ long total = nonRedistributingOffHeapSize; ++ for (SSTableReader sstable : redistribute) total += sstable.getIndexSummaryOffHeapSize(); logger.trace("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB", @@@ -108,9 -120,9 +114,7 @@@ List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute); Collections.sort(sstablesByHotness, new ReadRateComparator(readRates)); -- long remainingBytes = memoryPoolBytes; - for (SSTableReader sstable : compacting) - for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables)) -- remainingBytes -= sstable.getIndexSummaryOffHeapSize(); ++ long remainingBytes = memoryPoolBytes - nonRedistributingOffHeapSize; logger.trace("Index summaries for compacting SSTables are using {} MB of space", (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0); @@@ -122,11 -134,10 +126,11 @@@ for (LifecycleTransaction txn : transactions.values()) txn.finish(); } -- total = 0; - for (SSTableReader sstable : Iterables.concat(compacting, newSSTables)) - for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables)) ++ total = nonRedistributingOffHeapSize; ++ for (SSTableReader sstable : newSSTables) total += sstable.getIndexSummaryOffHeapSize(); - logger.trace("Completed resizing of index summaries; current approximate memory used: {}", + if (logger.isTraceEnabled()) + logger.trace("Completed resizing of index summaries; current approximate memory used: {}", FBUtilities.prettyPrintMemory(total)); return newSSTables; @@@ -301,7 -311,7 +305,7 @@@ public CompactionInfo getCompactionInfo() { - return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId, compacting); - return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId); ++ return CompactionInfo.withoutSSTables(null, OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId); } /** Utility class for sorting sstables by their read rates. */ diff --cc src/java/org/apache/cassandra/utils/FBUtilities.java index 79f9605,266d428..129c0f5 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@@ -919,7 -870,43 +919,7 @@@ public class FBUtilitie return historyDir; } - public static void closeAll(List<? extends AutoCloseable> l) throws Exception - public static void updateWithShort(MessageDigest digest, int val) - { - digest.update((byte) ((val >> 8) & 0xFF)); - digest.update((byte) (val & 0xFF)); - } - - public static void updateWithByte(MessageDigest digest, int val) - { - digest.update((byte) (val & 0xFF)); - } - - public static void updateWithInt(MessageDigest digest, int val) - { - digest.update((byte) ((val >>> 24) & 0xFF)); - digest.update((byte) ((val >>> 16) & 0xFF)); - digest.update((byte) ((val >>> 8) & 0xFF)); - digest.update((byte) ((val >>> 0) & 0xFF)); - } - - public static void updateWithLong(MessageDigest digest, long val) - { - digest.update((byte) ((val >>> 56) & 0xFF)); - digest.update((byte) ((val >>> 48) & 0xFF)); - digest.update((byte) ((val >>> 40) & 0xFF)); - digest.update((byte) ((val >>> 32) & 0xFF)); - digest.update((byte) ((val >>> 24) & 0xFF)); - digest.update((byte) ((val >>> 16) & 0xFF)); - digest.update((byte) ((val >>> 8) & 0xFF)); - digest.update((byte) ((val >>> 0) & 0xFF)); - } - - public static void updateWithBoolean(MessageDigest digest, boolean val) - { - updateWithByte(digest, val ? 0 : 1); - } - + public static void closeAll(Collection<? extends AutoCloseable> l) throws Exception { Exception toThrow = null; for (AutoCloseable c : l) diff --cc test/unit/org/apache/cassandra/Util.java index 3054be6,a3ad653..ba5d4d3 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@@ -21,7 -21,8 +21,8 @@@ package org.apache.cassandra import java.io.Closeable; import java.io.EOFException; ++import java.io.File; import java.io.IOError; -import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; @@@ -29,6 -30,6 +30,7 @@@ import java.util.concurrent.Callable import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; ++import java.util.stream.Collectors; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@@ -40,12 -39,8 +42,13 @@@ import org.junit.Assert import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.compaction.ActiveCompactionsTracker; ++import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaCollection; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.ColumnIdentifier; @@@ -722,13 -708,4 +725,37 @@@ public class Uti return new PagingState(pk, mark, 10, 0); } + public static void assertRCEquals(ReplicaCollection<?> a, ReplicaCollection<?> b) + { + assertTrue(a + " not equal to " + b, Iterables.elementsEqual(a, b)); + } + + public static void assertNotRCEquals(ReplicaCollection<?> a, ReplicaCollection<?> b) + { + assertFalse(a + " equal to " + b, Iterables.elementsEqual(a, b)); + } ++ ++ /** ++ * Makes sure that the sstables on disk are the same ones as the cfs live sstables (that they have the same generation) ++ */ ++ public static void assertOnDiskState(ColumnFamilyStore cfs, int expectedSSTableCount) ++ { ++ LifecycleTransaction.waitForDeletions(); ++ assertEquals(expectedSSTableCount, cfs.getLiveSSTables().size()); ++ Set<Integer> liveGenerations = cfs.getLiveSSTables().stream().map(sstable -> sstable.descriptor.generation).collect(Collectors.toSet()); ++ int fileCount = 0; ++ for (File f : cfs.getDirectories().getCFDirectories()) ++ { ++ for (File sst : f.listFiles()) ++ { ++ if (sst.getName().contains("Data")) ++ { ++ Descriptor d = Descriptor.fromFilename(sst.getAbsolutePath()); ++ assertTrue(liveGenerations.contains(d.generation)); ++ fileCount++; ++ } ++ } ++ } ++ assertEquals(expectedSSTableCount, fileCount); ++ } } diff --cc test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java index 23e393d,0000000..be5e7df mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java @@@ -1,194 -1,0 +1,196 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import org.junit.Test; + +import org.apache.cassandra.cache.AutoSavingCache; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.view.View; +import org.apache.cassandra.db.view.ViewBuilderTask; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.index.SecondaryIndexBuilder; +import org.apache.cassandra.io.sstable.IndexSummaryRedistribution; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.CacheService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class ActiveCompactionsTest extends CQLTester +{ + @Test + public void testSecondaryIndexTracking() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))"); + String idxName = createIndex("CREATE INDEX on %s(a)"); + getCurrentColumnFamilyStore().disableAutoCompaction(); + for (int i = 0; i < 5; i++) + { + execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)"); + getCurrentColumnFamilyStore().forceBlockingFlush(); + } + + Index idx = getCurrentColumnFamilyStore().indexManager.getIndexByName(idxName); + Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + SecondaryIndexBuilder builder = idx.getBuildTaskSupport().getIndexBuildTask(getCurrentColumnFamilyStore(), Collections.singleton(idx), sstables); + + MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); + CompactionManager.instance.submitIndexBuild(builder, mockActiveCompactions).get(); + + assertTrue(mockActiveCompactions.finished); + assertNotNull(mockActiveCompactions.holder); + assertEquals(sstables, mockActiveCompactions.holder.getCompactionInfo().getSSTables()); + } + + @Test + public void testIndexSummaryRedistributionTracking() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))"); + getCurrentColumnFamilyStore().disableAutoCompaction(); + for (int i = 0; i < 5; i++) + { + execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)"); + getCurrentColumnFamilyStore().forceBlockingFlush(); + } + Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + try (LifecycleTransaction txn = getCurrentColumnFamilyStore().getTracker().tryModify(sstables, OperationType.INDEX_SUMMARY)) + { + Map<TableId, LifecycleTransaction> transactions = ImmutableMap.<TableId, LifecycleTransaction>builder().put(getCurrentColumnFamilyStore().metadata().id, txn).build(); - IndexSummaryRedistribution isr = new IndexSummaryRedistribution(new ArrayList<>(sstables), transactions, 1000); ++ IndexSummaryRedistribution isr = new IndexSummaryRedistribution(transactions, 0, 1000); + MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); + CompactionManager.instance.runIndexSummaryRedistribution(isr, mockActiveCompactions); + assertTrue(mockActiveCompactions.finished); + assertNotNull(mockActiveCompactions.holder); - assertEquals(sstables, mockActiveCompactions.holder.getCompactionInfo().getSSTables()); ++ // index redistribution operates over all keyspaces/tables, we always cancel them ++ assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty()); ++ assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((sstable) -> false)); + } + } + + @Test + public void testViewBuildTracking() throws Throwable + { + createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY (k1, c1))"); + getCurrentColumnFamilyStore().disableAutoCompaction(); + for (int i = 0; i < 5; i++) + { + execute("INSERT INTO %s (k1, c1, val) VALUES ("+i+", 2, 3)"); + getCurrentColumnFamilyStore().forceBlockingFlush(); + } + execute(String.format("CREATE MATERIALIZED VIEW %s.view1 AS SELECT k1, c1, val FROM %s.%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL PRIMARY KEY (val, k1, c1)", keyspace(), keyspace(), currentTable())); + View view = Iterables.getOnlyElement(getCurrentColumnFamilyStore().viewManager); + + Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + ViewBuilderTask vbt = new ViewBuilderTask(getCurrentColumnFamilyStore(), view, new Range<>(token, token), token, 0); + + MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); + CompactionManager.instance.submitViewBuilder(vbt, mockActiveCompactions).get(); + assertTrue(mockActiveCompactions.finished); + assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty()); + // this should stop for all compactions, even if it doesn't pick any sstables; + assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((sstable) -> false)); + } + + @Test + public void testScrubOne() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))"); + getCurrentColumnFamilyStore().disableAutoCompaction(); + for (int i = 0; i < 5; i++) + { + execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)"); + getCurrentColumnFamilyStore().forceBlockingFlush(); + } + + SSTableReader sstable = Iterables.getFirst(getCurrentColumnFamilyStore().getLiveSSTables(), null); + try (LifecycleTransaction txn = getCurrentColumnFamilyStore().getTracker().tryModify(sstable, OperationType.SCRUB)) + { + MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); + CompactionManager.instance.scrubOne(getCurrentColumnFamilyStore(), txn, true, false, false, mockActiveCompactions); + + assertTrue(mockActiveCompactions.finished); + assertEquals(mockActiveCompactions.holder.getCompactionInfo().getSSTables(), Sets.newHashSet(sstable)); + assertFalse(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> false)); + assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> true)); + } + + } + + @Test + public void testVerifyOne() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))"); + getCurrentColumnFamilyStore().disableAutoCompaction(); + for (int i = 0; i < 5; i++) + { + execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)"); + getCurrentColumnFamilyStore().forceBlockingFlush(); + } + + SSTableReader sstable = Iterables.getFirst(getCurrentColumnFamilyStore().getLiveSSTables(), null); + MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); + CompactionManager.instance.verifyOne(getCurrentColumnFamilyStore(), sstable, new Verifier.Options.Builder().build(), mockActiveCompactions); + assertTrue(mockActiveCompactions.finished); + assertEquals(mockActiveCompactions.holder.getCompactionInfo().getSSTables(), Sets.newHashSet(sstable)); + assertFalse(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> false)); + assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> true)); + } + + @Test + public void testSubmitCacheWrite() throws ExecutionException, InterruptedException + { + AutoSavingCache.Writer writer = CacheService.instance.keyCache.getWriter(100); + MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); + CompactionManager.instance.submitCacheWrite(writer, mockActiveCompactions).get(); + assertTrue(mockActiveCompactions.finished); + assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty()); + } + + private static class MockActiveCompactions implements ActiveCompactionsTracker + { + public CompactionInfo.Holder holder; + public boolean finished = false; + public void beginCompaction(CompactionInfo.Holder ci) + { + holder = ci; + } + + public void finishCompaction(CompactionInfo.Holder ci) + { + finished = true; + } + } +} diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionBytemanTest.java index 1673d01,ba6f3a1..38d2607 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionBytemanTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionBytemanTest.java @@@ -32,6 -32,6 +32,7 @@@ import com.google.common.util.concurren import org.junit.Test; import org.junit.runner.RunWith; ++import org.apache.cassandra.Util; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@@ -131,6 -120,6 +132,6 @@@ public class AntiCompactionBytemanTest t.join(); assertFalse(failed.get()); assertFalse(getCurrentColumnFamilyStore().getLiveSSTables().contains(sstableBefore)); - AntiCompactionTest.assertOnDiskState(getCurrentColumnFamilyStore(), 3); - AntiCompactionTest.assertOnDiskState(getCurrentColumnFamilyStore(), 2); ++ Util.assertOnDiskState(getCurrentColumnFamilyStore(), 3); } } diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 7b5164a,e2b17e8..b2618e5 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@@ -59,16 -52,10 +59,17 @@@ import org.apache.cassandra.io.sstable. import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Refs; import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.utils.concurrent.Transactional; +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; ++import static org.apache.cassandra.Util.assertOnDiskState; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@@ -458,148 -449,25 +459,127 @@@ public class AntiCompactionTes return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired())); } - static void assertOnDiskState(ColumnFamilyStore cfs, int expectedSSTableCount) + /** + * If the parent repair session is missing, we should still clean up + */ + @Test + public void missingParentRepairSession() throws Exception { - LifecycleTransaction.waitForDeletions(); - assertEquals(expectedSSTableCount, cfs.getLiveSSTables().size()); - Set<Integer> liveGenerations = cfs.getLiveSSTables().stream().map(sstable -> sstable.descriptor.generation).collect(Collectors.toSet()); - int fileCount = 0; - for (File f : cfs.getDirectories().getCFDirectories()) + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.disableAutoCompaction(); + + for (int table = 0; table < 10; table++) + { + generateSStable(store,Integer.toString(table)); + } + Collection<SSTableReader> sstables = getUnrepairedSSTables(store); + assertEquals(10, sstables.size()); + + Range<Token> range = new Range<Token>(new BytesToken("-1".getBytes()), new BytesToken("-10".getBytes())); + List<Range<Token>> ranges = Arrays.asList(range); + + UUID missingRepairSession = UUIDGen.getTimeUUID(); + try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + Refs<SSTableReader> refs = Refs.ref(sstables)) { - for (File sst : f.listFiles()) + Assert.assertFalse(refs.isEmpty()); + try { - if (sst.getName().contains("Data")) - { - Descriptor d = Descriptor.fromFilename(sst.getAbsolutePath()); - assertTrue(liveGenerations.contains(d.generation)); - fileCount++; - } + CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, missingRepairSession, () -> false); + Assert.fail("expected RuntimeException"); } + catch (RuntimeException e) + { + // expected + } + Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state()); + Assert.assertTrue(refs.isEmpty()); } - assertEquals(expectedSSTableCount, fileCount); } + @Test + public void testSSTablesToInclude() + { + ColumnFamilyStore cfs = MockSchema.newCFS("anticomp"); + List<SSTableReader> sstables = new ArrayList<>(); + sstables.add(MockSchema.sstable(1, 10, 100, cfs)); + sstables.add(MockSchema.sstable(2, 100, 200, cfs)); + + Range<Token> r = new Range<>(t(10), t(100)); // should include sstable 1 and 2 above, but none is fully contained (Range is (x, y]) + + Iterator<SSTableReader> sstableIterator = sstables.iterator(); + Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID()); + assertTrue(fullyContainedSSTables.isEmpty()); + assertEquals(2, sstables.size()); + } + + @Test + public void testSSTablesToInclude2() + { + ColumnFamilyStore cfs = MockSchema.newCFS("anticomp"); + List<SSTableReader> sstables = new ArrayList<>(); + SSTableReader sstable1 = MockSchema.sstable(1, 10, 100, cfs); + SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs); + sstables.add(sstable1); + sstables.add(sstable2); + + Range<Token> r = new Range<>(t(9), t(100)); // sstable 1 is fully contained + + Iterator<SSTableReader> sstableIterator = sstables.iterator(); + Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID()); + assertEquals(Collections.singleton(sstable1), fullyContainedSSTables); + assertEquals(Collections.singletonList(sstable2), sstables); + } + + @Test(expected = IllegalStateException.class) + public void testSSTablesToNotInclude() + { + ColumnFamilyStore cfs = MockSchema.newCFS("anticomp"); + List<SSTableReader> sstables = new ArrayList<>(); + SSTableReader sstable1 = MockSchema.sstable(1, 0, 5, cfs); + sstables.add(sstable1); + + Range<Token> r = new Range<>(t(9), t(100)); // sstable is not intersecting and should not be included + + CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), sstables, atEndpoint(Collections.singletonList(r), NO_RANGES)); + } + + @Test(expected = IllegalStateException.class) + public void testSSTablesToNotInclude2() + { + ColumnFamilyStore cfs = MockSchema.newCFS("anticomp"); + List<SSTableReader> sstables = new ArrayList<>(); + SSTableReader sstable1 = MockSchema.sstable(1, 10, 10, cfs); + SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs); + sstables.add(sstable1); + sstables.add(sstable2); + + Range<Token> r = new Range<>(t(10), t(11)); // no sstable included, throw + + CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), sstables, atEndpoint(Collections.singletonList(r), NO_RANGES)); + } + + @Test + public void testSSTablesToInclude4() + { + ColumnFamilyStore cfs = MockSchema.newCFS("anticomp"); + List<SSTableReader> sstables = new ArrayList<>(); + SSTableReader sstable1 = MockSchema.sstable(1, 10, 100, cfs); + SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs); + sstables.add(sstable1); + sstables.add(sstable2); + + Range<Token> r = new Range<>(t(9), t(200)); // sstable 2 is fully contained - last token is equal + + Iterator<SSTableReader> sstableIterator = sstables.iterator(); + Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID()); + assertEquals(Sets.newHashSet(sstable1, sstable2), fullyContainedSSTables); + assertTrue(sstables.isEmpty()); + } + + private Token t(long t) + { + return new Murmur3Partitioner.LongToken(t); + } - - static void assertOnDiskState(ColumnFamilyStore cfs, int expectedSSTableCount) - { - LifecycleTransaction.waitForDeletions(); - assertEquals(expectedSSTableCount, cfs.getLiveSSTables().size()); - Set<Integer> liveGenerations = cfs.getLiveSSTables().stream().map(sstable -> sstable.descriptor.generation).collect(Collectors.toSet()); - int fileCount = 0; - for (File f : cfs.getDirectories().getCFDirectories()) - { - for (File sst : f.listFiles()) - { - if (sst.getName().contains("Data")) - { - Descriptor d = Descriptor.fromFilename(sst.getAbsolutePath()); - assertTrue(liveGenerations.contains(d.generation)); - fileCount++; - } - } - } - assertEquals(expectedSSTableCount, fileCount); - } } diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index ab8a710,4edf8af..68ee3e1 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@@ -22,6 -22,7 +22,8 @@@ import java.nio.ByteBuffer import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; + import java.util.function.Consumer; ++import java.util.stream.Collectors; import com.google.common.base.Joiner; import com.google.common.collect.Sets; @@@ -40,6 -41,6 +42,7 @@@ import org.apache.cassandra.concurrent. import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowUpdateBuilder; ++import org.apache.cassandra.db.compaction.AntiCompactionTest; import org.apache.cassandra.db.compaction.CompactionInfo; import org.apache.cassandra.db.compaction.CompactionInterruptedException; import org.apache.cassandra.db.compaction.CompactionManager; @@@ -604,15 -602,26 +607,30 @@@ public class IndexSummaryManagerTes @Test public void testCancelIndex() throws Exception { + testCancelIndexHelper((cfs) -> CompactionManager.instance.stopCompaction("INDEX_SUMMARY")); + } + + @Test + public void testCancelIndexInterrupt() throws Exception + { - testCancelIndexHelper((cfs) -> CompactionManager.instance.interruptCompactionFor(Collections.singleton(cfs.metadata), false)); ++ testCancelIndexHelper((cfs) -> CompactionManager.instance.interruptCompactionFor(Collections.singleton(cfs.metadata()), (sstable) -> true, false)); + } + + public void testCancelIndexHelper(Consumer<ColumnFamilyStore> cancelFunction) throws Exception + { String ksname = KEYSPACE1; String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching Keyspace keyspace = Keyspace.open(ksname); final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); -- final int numSSTables = 4; ++ cfs.disableAutoCompaction(); ++ final int numSSTables = 8; int numRows = 256; createSSTables(ksname, cfname, numSSTables, numRows); -- final List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); ++ List<SSTableReader> allSSTables = new ArrayList<>(cfs.getLiveSSTables()); ++ List<SSTableReader> sstables = allSSTables.subList(0, 4); ++ List<SSTableReader> compacting = allSSTables.subList(4, 8); ++ for (SSTableReader sstable : sstables) sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0)); @@@ -622,52 -631,52 +640,68 @@@ final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>(); // barrier to control when redistribution runs final CountDownLatch barrier = new CountDownLatch(1); -- -- Thread t = NamedThreadFactory.createThread(new Runnable() ++ CompactionInfo.Holder ongoingCompaction = new CompactionInfo.Holder() { -- public void run() ++ public CompactionInfo getCompactionInfo() + { - try ++ return new CompactionInfo(cfs.metadata(), OperationType.UNKNOWN, 0, 0, UUID.randomUUID(), compacting); ++ } ++ }; ++ try (LifecycleTransaction ignored = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN)) ++ { ++ CompactionManager.instance.active.beginCompaction(ongoingCompaction); ++ ++ Thread t = NamedThreadFactory.createThread(new Runnable() + { - try ++ public void run() { -- // Don't leave enough space for even the minimal index summaries -- try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) ++ try ++ { ++ // Don't leave enough space for even the minimal index summaries ++ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) ++ { ++ IndexSummaryManager.redistributeSummaries(new ObservableRedistribution(of(cfs.metadata.id, txn), ++ 0, ++ singleSummaryOffHeapSpace, ++ barrier)); ++ } ++ } ++ catch (CompactionInterruptedException ex) ++ { ++ exception.set(ex); ++ } ++ catch (IOException ignored) { -- IndexSummaryManager.redistributeSummaries(new ObservableRedistribution(Collections.EMPTY_LIST, - of(cfs.metadata.id, txn), - of(cfs.metadata.cfId, txn), -- singleSummaryOffHeapSpace, -- barrier)); } } -- catch (CompactionInterruptedException ex) -- { -- exception.set(ex); -- } -- catch (IOException ignored) -- { -- } -- } -- }); -- t.start(); -- while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive()) -- Thread.sleep(1); -- // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries -- // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution -- // to proceed until stopCompaction has been called. - CompactionManager.instance.stopCompaction("INDEX_SUMMARY"); - cancelFunction.accept(cfs); -- // allows the redistribution to proceed -- barrier.countDown(); -- t.join(); ++ }); ++ ++ t.start(); ++ while (CompactionManager.instance.getActiveCompactions() < 2 && t.isAlive()) ++ Thread.sleep(1); ++ // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries ++ // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution ++ // to proceed until stopCompaction has been called. ++ cancelFunction.accept(cfs); ++ // allows the redistribution to proceed ++ barrier.countDown(); ++ t.join(); ++ } ++ finally ++ { ++ CompactionManager.instance.active.finishCompaction(ongoingCompaction); ++ } assertNotNull("Expected compaction interrupted exception", exception.get()); - assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty()); + assertTrue("Expected no active compactions", CompactionManager.instance.active.getCompactions().isEmpty()); -- Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables); ++ Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(allSSTables); Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getLiveSSTables()); Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables); assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s", Joiner.on(",").join(disjoint)), disjoint.isEmpty()); -- ++ Util.assertOnDiskState(cfs, 8); validateData(cfs, numRows); } @@@ -676,8 -685,8 +710,9 @@@ long memoryPoolBytes) throws IOException { -- return IndexSummaryManager.redistributeSummaries(new IndexSummaryRedistribution(compacting, -- transactions, ++ long nonRedistributingOffHeapSize = compacting.stream().mapToLong(SSTableReader::getIndexSummaryOffHeapSize).sum(); ++ return IndexSummaryManager.redistributeSummaries(new IndexSummaryRedistribution(transactions, ++ nonRedistributingOffHeapSize, memoryPoolBytes)); } @@@ -685,12 -694,12 +720,12 @@@ { CountDownLatch barrier; -- ObservableRedistribution(List<SSTableReader> compacting, - Map<TableId, LifecycleTransaction> transactions, - Map<UUID, LifecycleTransaction> transactions, ++ ObservableRedistribution(Map<TableId, LifecycleTransaction> transactions, ++ long nonRedistributingOffHeapSize, long memoryPoolBytes, CountDownLatch barrier) { -- super(compacting, transactions, memoryPoolBytes); ++ super(transactions, nonRedistributingOffHeapSize, memoryPoolBytes); this.barrier = barrier; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
