Repository: cassandra Updated Branches: refs/heads/trunk f21202e83 -> 67247394d
Improve calculation of available disk space for compaction Patch by Krishna Dattu Koneru and Lerh Chuan Low; reviewed by marcuse for CASSANDRA-13068 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67247394 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67247394 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67247394 Branch: refs/heads/trunk Commit: 67247394d846cf3463d5441b836a47611ec28656 Parents: f21202e Author: Krishna Koneru <[email protected]> Authored: Thu Jun 8 10:04:17 2017 +1000 Committer: Marcus Eriksson <[email protected]> Committed: Wed Jun 21 10:34:16 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionTask.java | 117 ++++++++++-------- .../db/compaction/CompactionsBytemanTest.java | 122 +++++++++++++++++++ 3 files changed, 188 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/67247394/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0968de9..cc1bdb2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Improve calculation of available disk space for compaction (CASSANDRA-13068) * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579) * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570) * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585) http://git-wip-us.apache.org/repos/asf/cassandra/blob/67247394/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 789de1e..3d3cd3d 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -86,7 +86,7 @@ public class CompactionTask extends AbstractCompactionTask return transaction.originals().size(); } - public boolean reduceScopeForLimitedSpace(long expectedSize) + public boolean reduceScopeForLimitedSpace(Set<SSTableReader> nonExpiredSSTables, long expectedSize) { if (partialCompactionsAcceptable() && transaction.originals().size() > 1) { @@ -97,7 +97,7 @@ public class CompactionTask extends AbstractCompactionTask // Note that we have removed files that are still marked as compacting. // This suboptimal but ok since the caller will unmark all the sstables at the end. - SSTableReader removedSSTable = cfs.getMaxSizeFile(transaction.originals()); + SSTableReader removedSSTable = cfs.getMaxSizeFile(nonExpiredSSTables); transaction.cancel(removedSSTable); return true; } @@ -125,44 +125,46 @@ public class CompactionTask extends AbstractCompactionTask if (DatabaseDescriptor.isSnapshotBeforeCompaction()) cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name); - // note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but - // since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here + try (CompactionController controller = getCompactionController(transaction.originals())) + { - checkAvailableDiskSpace(); + final Set<SSTableReader> fullyExpiredSSTables = controller.getFullyExpiredSSTables(); - // sanity check: all sstables must belong to the same cfs - assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>() - { - @Override - public boolean apply(SSTableReader sstable) + // select SSTables to compact based on available disk space. + buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables); + + // sanity check: all sstables must belong to the same cfs + assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>() { - return !sstable.descriptor.cfname.equals(cfs.name); - } - }); + @Override + public boolean apply(SSTableReader sstable) + { + return !sstable.descriptor.cfname.equals(cfs.name); + } + }); - UUID taskId = transaction.opId(); + UUID taskId = transaction.opId(); - // new sstables from flush can be added during a compaction, but only the compaction can remove them, - // so in our single-threaded compaction world this is a valid way of determining if we're compacting - // all the sstables (that existed when we started) - StringBuilder ssTableLoggerMsg = new StringBuilder("["); - for (SSTableReader sstr : transaction.originals()) - { - ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel())); - } - ssTableLoggerMsg.append("]"); + // new sstables from flush can be added during a compaction, but only the compaction can remove them, + // so in our single-threaded compaction world this is a valid way of determining if we're compacting + // all the sstables (that existed when we started) + StringBuilder ssTableLoggerMsg = new StringBuilder("["); + for (SSTableReader sstr : transaction.originals()) + { + ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel())); + } + ssTableLoggerMsg.append("]"); - logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg); + logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg); - RateLimiter limiter = CompactionManager.instance.getRateLimiter(); - long start = System.nanoTime(); - long startTime = System.currentTimeMillis(); - long totalKeysWritten = 0; - long estimatedKeys = 0; - long inputSizeBytes; - try (CompactionController controller = getCompactionController(transaction.originals())) - { - Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); + RateLimiter limiter = CompactionManager.instance.getRateLimiter(); + long start = System.nanoTime(); + long startTime = System.currentTimeMillis(); + long totalKeysWritten = 0; + long estimatedKeys = 0; + long inputSizeBytes; + + Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), fullyExpiredSSTables); Collection<SSTableReader> newSStables; long[] mergedRowCounts; @@ -335,52 +337,63 @@ public class CompactionTask extends AbstractCompactionTask return ids.iterator().next(); } + /* - Checks if we have enough disk space to execute the compaction. Drops the largest sstable out of the Task until - there's enough space (in theory) to handle the compaction. Does not take into account space that will be taken by - other compactions. + * Checks if we have enough disk space to execute the compaction. Drops the largest sstable out of the Task until + * there's enough space (in theory) to handle the compaction. Does not take into account space that will be taken by + * other compactions. */ - protected void checkAvailableDiskSpace() + protected void buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTableReader> fullyExpiredSSTables) { if(!cfs.isCompactionDiskSpaceCheckEnabled() && compactionType == OperationType.COMPACTION) { logger.info("Compaction space check is disabled"); - return; + return; // try to compact all SSTables } + final Set<SSTableReader> nonExpiredSSTables = Sets.difference(transaction.originals(), fullyExpiredSSTables); CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); int sstablesRemoved = 0; - while(true) + + while(!nonExpiredSSTables.isEmpty()) { - long expectedWriteSize = cfs.getExpectedCompactedFileSize(transaction.originals(), compactionType); + // Only consider write size of non expired SSTables + long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType); long estimatedSSTables = Math.max(1, expectedWriteSize / strategy.getMaxSSTableBytes()); if(cfs.getDirectories().hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize)) - { - // we're ok now on space so now we track the failures, if any - if(sstablesRemoved > 0) - { - CompactionManager.instance.incrementCompactionsReduced(); - CompactionManager.instance.incrementSstablesDropppedFromCompactions(sstablesRemoved); - } - break; - } - if (!reduceScopeForLimitedSpace(expectedWriteSize)) + if (!reduceScopeForLimitedSpace(nonExpiredSSTables, expectedWriteSize)) { // we end up here if we can't take any more sstables out of the compaction. // usually means we've run out of disk space - String msg = String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize); + // but we can still compact expired SSTables + if(partialCompactionsAcceptable() && fullyExpiredSSTables.size() > 0 ) + { + // sanity check to make sure we compact only fully expired SSTables. + assert transaction.originals().equals(fullyExpiredSSTables); + break; + } + + String msg = String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize); logger.warn(msg); CompactionManager.instance.incrementAborted(); throw new RuntimeException(msg); } + sstablesRemoved++; logger.warn("Not enough space for compaction, {}MB estimated. Reducing scope.", - (float) expectedWriteSize / 1024 / 1024); + (float) expectedWriteSize / 1024 / 1024); } + + if(sstablesRemoved > 0) + { + CompactionManager.instance.incrementCompactionsReduced(); + CompactionManager.instance.incrementSstablesDropppedFromCompactions(sstablesRemoved); + } + } protected int getLevel() http://git-wip-us.apache.org/repos/asf/cassandra/blob/67247394/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java new file mode 100644 index 0000000..3ca01c1 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; + +import static org.junit.Assert.assertEquals; + +@RunWith(BMUnitRunner.class) +public class CompactionsBytemanTest extends CQLTester +{ + /* + Return false for the first time hasAvailableDiskSpace is called. i.e first SSTable is too big + Create 5 SSTables. After compaction, there should be 2 left - 1 as the 9 SStables which were merged, + and the other the SSTable that was 'too large' and failed the hasAvailableDiskSpace check + */ + @Test + @BMRules(rules = { @BMRule(name = "One SSTable too big for remaining disk space test", + targetClass = "Directories", + targetMethod = "hasAvailableDiskSpace", + condition = "not flagged(\"done\")", + action = "flag(\"done\"); return false;") } ) + public void testSSTableNotEnoughDiskSpaceForCompactionGetsDropped() throws Throwable + { + createLowGCGraceTable(); + final ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + for (int i = 0; i < 5; i++) + { + createPossiblyExpiredSSTable(cfs, false); + } + assertEquals(5, getCurrentColumnFamilyStore().getLiveSSTables().size()); + cfs.forceMajorCompaction(false); + assertEquals(2, getCurrentColumnFamilyStore().getLiveSSTables().size()); + dropTable("DROP TABLE %s"); + } + + /* + Always return false for hasAvailableDiskSpace. i.e node has no more space + Create 2 expired SSTables and 1 long lived one. After compaction, there should only be 1 left, + as the 2 expired ones would have been compacted away. + */ + @Test + @BMRules(rules = { @BMRule(name = "No disk space with expired SSTables test", + targetClass = "Directories", + targetMethod = "hasAvailableDiskSpace", + action = "return false;") } ) + public void testExpiredSSTablesStillGetDroppedWithNoDiskSpace() throws Throwable + { + createLowGCGraceTable(); + final ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + createPossiblyExpiredSSTable(cfs, true); + createPossiblyExpiredSSTable(cfs, true); + createPossiblyExpiredSSTable(cfs, false); + assertEquals(3, cfs.getLiveSSTables().size()); + Thread.sleep(TimeUnit.SECONDS.toMillis((long)1.5)); // give some time to expire. + cfs.forceMajorCompaction(false); + assertEquals(1, cfs.getLiveSSTables().size()); + dropTable("DROP TABLE %s"); + } + + /* + Always return false for hasAvailableDiskSpace. i.e node has no more space + Create 2 SSTables. Compaction will not succeed and will throw Runtime Exception + */ + @Test(expected = RuntimeException.class) + @BMRules(rules = { @BMRule(name = "No disk space with expired SSTables test", + targetClass = "Directories", + targetMethod = "hasAvailableDiskSpace", + action = "return false;") } ) + public void testRuntimeExceptionWhenNoDiskSpaceForCompaction() throws Throwable + { + createLowGCGraceTable(); + final ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + createPossiblyExpiredSSTable(cfs, false); + createPossiblyExpiredSSTable(cfs, false); + cfs.forceMajorCompaction(false); + dropTable("DROP TABLE %s"); + } + + private void createPossiblyExpiredSSTable(final ColumnFamilyStore cfs, final boolean expired) throws Throwable + { + if (expired) + { + execute("INSERT INTO %s (id, val) values (1, 'expired') USING TTL 1"); + Thread.sleep(TimeUnit.SECONDS.toMillis((long)1.5)); + } + else + { + execute("INSERT INTO %s (id, val) values (2, 'immortal')"); + } + cfs.forceBlockingFlush(); + } + + private void createLowGCGraceTable(){ + createTable("CREATE TABLE %s (id int PRIMARY KEY, val text) with compaction = {'class':'SizeTieredCompactionStrategy', 'enabled': 'false'} AND gc_grace_seconds=0"); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
