Repository: cassandra Updated Branches: refs/heads/trunk c04c50c95 -> caeef1740
Check for available disk space before starting a compaction. Patch by marcuse; reviewed by JoshuaMcKenzie for CASSANDRA-8562 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c20d4158 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c20d4158 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c20d4158 Branch: refs/heads/trunk Commit: c20d415833b785bfa5f49d3dd2a7468e111fb5d0 Parents: e4fc395 Author: Marcus Eriksson <[email protected]> Authored: Mon Jan 12 11:22:23 2015 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Mon Jan 12 11:35:53 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Directories.java | 18 ++++++++++++++++++ .../cassandra/db/compaction/CompactionTask.java | 16 +++++++++++++++- .../cassandra/io/util/DiskAwareRunnable.java | 17 +---------------- 4 files changed, 35 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fc43dfa..9b20a06 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.12: + * Check for available disk space before starting a compaction (CASSANDRA-8562) * Fix DISTINCT queries with LIMITs or paging when some partitions contain only tombstones (CASSANDRA-8490) * Introduce background cache refreshing to permissions cache http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 69c7a06..8fd1762 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -308,6 +308,24 @@ public class Directories Collections.sort(candidates); } + public boolean hasAvailableDiskSpace(long estimatedSSTables, long expectedTotalWriteSize) + { + long writeSize = expectedTotalWriteSize / estimatedSSTables; + long totalAvailable = 0L; + + for (DataDirectory dataDir : dataFileLocations) + { + if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir))) + continue; + DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir); + // exclude directory if its total writeSize does not fit to data directory + if (candidate.availableSpace < writeSize) + continue; + totalAvailable += candidate.availableSpace; + } + return totalAvailable > expectedTotalWriteSize; + } + public static File getSnapshotDirectory(Descriptor desc, String snapshotName) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 38de8a9..6c6d3a2 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -100,6 +100,11 @@ public class CompactionTask extends AbstractCompactionTask if (DatabaseDescriptor.isSnapshotBeforeCompaction()) cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name); + // note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but + // since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here + long earlySSTableEstimate = Math.max(1, cfs.getExpectedCompactedFileSize(toCompact, compactionType) / strategy.getMaxSSTableBytes()); + checkAvailableDiskSpace(earlySSTableEstimate); + // sanity check: all sstables must belong to the same cfs for (SSTableReader sstable : toCompact) assert sstable.descriptor.cfname.equals(cfs.name); @@ -118,7 +123,7 @@ public class CompactionTask extends AbstractCompactionTask long totalkeysWritten = 0; long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata)); - long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes()); + long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes()); long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); if (logger.isDebugEnabled()) logger.debug("Expected bloom filter size : " + keysPerSSTable); @@ -293,6 +298,15 @@ public class CompactionTask extends AbstractCompactionTask logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); } + protected void checkAvailableDiskSpace(long estimatedSSTables) + { + while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, getExpectedWriteSize())) + { + if (!reduceScopeForLimitedSpace()) + throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, getExpectedWriteSize())); + } + } + private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable) { assert sstableDirectory != null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java index 4188f6e..925efd6 100644 --- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java +++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java @@ -24,13 +24,7 @@ public abstract class DiskAwareRunnable extends WrappedRunnable { protected Directories.DataDirectory getWriteDirectory(long writeSize) { - Directories.DataDirectory directory; - while (true) - { - directory = getDirectories().getWriteableLocation(writeSize); - if (directory != null || !reduceScopeForLimitedSpace()) - break; - } + Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize); if (directory == null) throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes"); @@ -42,13 +36,4 @@ public abstract class DiskAwareRunnable extends WrappedRunnable * @return Directories instance for the CF. */ protected abstract Directories getDirectories(); - - /** - * Called if no disk is available with free space for the full write size. - * @return true if the scope of the task was successfully reduced. - */ - public boolean reduceScopeForLimitedSpace() - { - return false; - } }
