Make sstable directory picking blacklist-aware again Patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for CASSANDRA-5193
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29915e84 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29915e84 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29915e84 Branch: refs/heads/trunk Commit: 29915e845b96c7c10bfd861ea62fd457b97aceb1 Parents: 6206d8a Author: Aleksey Yeschenko <[email protected]> Authored: Sun Feb 3 17:15:25 2013 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Sun Feb 3 17:15:25 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Directories.java | 50 +++++++++------ src/java/org/apache/cassandra/db/Memtable.java | 16 +++-- .../db/compaction/AbstractCompactionTask.java | 6 ++ .../cassandra/db/compaction/CompactionTask.java | 19 ++---- .../cassandra/io/util/DiskAwareRunnable.java | 16 +++-- .../org/apache/cassandra/streaming/StreamIn.java | 4 +- 7 files changed, 66 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 901f559..6b3a705 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ * cli: remove default username and password (CASSANDRA-5208) * configure populate_io_cache_on_flush per-CF (CASSANDRA-4694) * allow configuration of internode socket buffer (CASSANDRA-3378) + * Make sstable directory picking blacklist-aware again (CASSANDRA-5193) 1.2.1 * stream undelivered hints on decommission (CASSANDRA-5128) http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index f1db5ed..1f68f62 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -122,11 +122,11 @@ public class Directories * @param dataDirectory * @return SSTable location */ - public File getLocationForDisk(File dataDirectory) + public File getLocationForDisk(DataDirectory dataDirectory) { for (File dir : sstableDirectories) { - if (dir.getAbsolutePath().startsWith(dataDirectory.getAbsolutePath())) + if (dir.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath())) return dir; } return null; @@ -192,36 +192,46 @@ public class Directories } /** - * Find location which is capable of holding given {@code estimatedSize}. - * First it looks through for directory with no current task running and - * the most free space available. - * If no such directory is available, it just chose the one with the most - * free space available. + * Finds location which is capable of holding given {@code estimatedSize}. + * Picks a non-blacklisted directory with most free space and least current tasks. * If no directory can hold given {@code estimatedSize}, then returns null. * * @param estimatedSize estimated size you need to find location to fit * @return directory capable of given estimated size, or null if none found */ - public static DataDirectory getLocationCapableOfSize(long estimatedSize) + public DataDirectory getLocationCapableOfSize(long estimatedSize) { - // sort by available disk space - SortedSet<DataDirectory> directories = ImmutableSortedSet.copyOf(dataFileLocations); + List<DataDirectory> candidates = new ArrayList<DataDirectory>(); - // if there is disk with sufficient space and no activity running on it, then use it - for (DataDirectory directory : directories) + // pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes. + for (DataDirectory dataDir : dataFileLocations) { - long spaceAvailable = directory.getEstimatedAvailableSpace(); - if (estimatedSize < spaceAvailable && directory.currentTasks.get() == 0) - return directory; + File sstableDir = getLocationForDisk(dataDir); + + if (BlacklistedDirectories.isUnwritable(sstableDir)) + continue; + + // need a separate check for sstableDir itself - could be a mounted separate disk or SSD just for this CF. + if (dataDir.getEstimatedAvailableSpace() > estimatedSize && sstableDir.getUsableSpace() * 0.9 > estimatedSize) + candidates.add(dataDir); } - // if not, use the one that has largest free space - if (estimatedSize < directories.first().getEstimatedAvailableSpace()) - return directories.first(); - else - return null; + // sort directories by free space, in _descending_ order. + Collections.sort(candidates); + + // sort directories by load, in _ascending_ order. + Collections.sort(candidates, new Comparator<DataDirectory>() + { + public int compare(DataDirectory a, DataDirectory b) + { + return a.currentTasks.get() - b.currentTasks.get(); + } + }); + + return candidates.isEmpty() ? null : candidates.get(0); } + public static File getSnapshotDirectory(Descriptor desc, String snapshotName) { return getOrCreate(desc.directory, SNAPSHOT_SUBDIR, snapshotName); http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 990ad84..301c297 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -433,22 +433,28 @@ public class Memtable return estimatedSize; } - protected void runWith(File dataDirectory) throws Exception + protected void runWith(File sstableDirectory) throws Exception { - assert dataDirectory != null : "Flush task is not bound to any disk"; + assert sstableDirectory != null : "Flush task is not bound to any disk"; - SSTableReader sstable = writeSortedContents(context, dataDirectory); + SSTableReader sstable = writeSortedContents(context, sstableDirectory); cfs.replaceFlushed(Memtable.this, sstable); latch.countDown(); } - private SSTableReader writeSortedContents(Future<ReplayPosition> context, File dataDirectory) throws ExecutionException, InterruptedException + protected Directories getDirectories() + { + return cfs.directories; + } + + private SSTableReader writeSortedContents(Future<ReplayPosition> context, File sstableDirectory) + throws ExecutionException, InterruptedException { logger.info("Writing " + Memtable.this.toString()); SSTableReader ssTable; // errors when creating the writer that may leave empty temp files. - SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(dataDirectory))); + SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)); try { // (we can't clear out the map as-we-go to free up memory, http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index 3d64785..0913765 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import java.util.Collection; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; @@ -41,6 +42,11 @@ public abstract class AbstractCompactionTask extends DiskAwareRunnable public abstract int execute(CompactionExecutorStatsCollector collector); + protected Directories getDirectories() + { + return cfs.directories; + } + public void unmarkSSTables() { cfs.getDataTracker().unmarkCompacting(sstables); http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index ca1e2da..1b1599b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -21,22 +21,13 @@ import java.io.File; import java.io.IOException; import java.util.*; -import javax.print.attribute.IntegerSyntax; - -import com.google.common.base.Predicates; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.primitives.Longs; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.SystemTable; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.utils.CloseableIterator; @@ -99,11 +90,11 @@ public class CompactionTask extends AbstractCompactionTask * which are properly serialized. * Caller is in charge of marking/unmarking the sstables as compacting. */ - protected void runWith(File dataDirectory) throws Exception + protected void runWith(File sstableDirectory) throws Exception { // The collection of sstables passed may be empty (but not null); even if // it is not empty, it may compact down to nothing if all rows are deleted. - assert sstables != null && dataDirectory != null; + assert sstables != null && sstableDirectory != null; if (DatabaseDescriptor.isSnapshotBeforeCompaction()) cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.columnFamily); @@ -156,7 +147,7 @@ public class CompactionTask extends AbstractCompactionTask return; } - SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact); + SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, sstableDirectory, toCompact); writers.add(writer); while (iter.hasNext()) { @@ -194,7 +185,7 @@ public class CompactionTask extends AbstractCompactionTask { // tmp = false because later we want to query it with descriptor from SSTableReader cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys); - writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact); + writer = cfs.createCompactionWriter(keysPerSSTable, sstableDirectory, toCompact); writers.add(writer); cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java index 6c4d95a..1be4803 100644 --- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java +++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java @@ -34,7 +34,7 @@ public abstract class DiskAwareRunnable extends WrappedRunnable while (true) { writeSize = getExpectedWriteSize(); - directory = Directories.getLocationCapableOfSize(writeSize); + directory = getDirectories().getLocationCapableOfSize(writeSize); if (directory != null || !reduceScopeForLimitedSpace()) break; } @@ -45,7 +45,7 @@ public abstract class DiskAwareRunnable extends WrappedRunnable directory.estimatedWorkingSize.addAndGet(writeSize); try { - runWith(directory.location); + runWith(getDirectories().getLocationForDisk(directory)); } finally { @@ -55,10 +55,16 @@ public abstract class DiskAwareRunnable extends WrappedRunnable } /** - * Executes this task on given {@code dataDirectory}. - * @param dataDirectory data directory to work on + * Get sstable directories for the CF. + * @return Directories instance for the CF. */ - protected abstract void runWith(File dataDirectory) throws Exception; + protected abstract Directories getDirectories(); + + /** + * Executes this task on given {@code sstableDirectory}. + * @param sstableDirectory sstable directory to work on + */ + protected abstract void runWith(File sstableDirectory) throws Exception; /** * Get expected write size to determine which disk to use for this task. http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/streaming/StreamIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java b/src/java/org/apache/cassandra/streaming/StreamIn.java index b8f6b90..740b430 100644 --- a/src/java/org/apache/cassandra/streaming/StreamIn.java +++ b/src/java/org/apache/cassandra/streaming/StreamIn.java @@ -80,10 +80,10 @@ public class StreamIn // new local sstable Table table = Table.open(remotedesc.ksname); ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname); - Directories.DataDirectory localDir = Directories.getLocationCapableOfSize(remote.size); + Directories.DataDirectory localDir = cfStore.directories.getLocationCapableOfSize(remote.size); if (localDir == null) throw new RuntimeException("Insufficient disk space to store " + remote.size + " bytes"); - Descriptor localdesc = Descriptor.fromFilename(cfStore.getTempSSTablePath(cfStore.directories.getLocationForDisk(localDir.location))); + Descriptor localdesc = Descriptor.fromFilename(cfStore.getTempSSTablePath(cfStore.directories.getLocationForDisk(localDir))); return new PendingFile(localdesc, remote); }
