Make sstable directory picking blacklist-aware again

Patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-5193


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29915e84
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29915e84
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29915e84

Branch: refs/heads/trunk
Commit: 29915e845b96c7c10bfd861ea62fd457b97aceb1
Parents: 6206d8a
Author: Aleksey Yeschenko <[email protected]>
Authored: Sun Feb 3 17:15:25 2013 +0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Sun Feb 3 17:15:25 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/db/Directories.java  |   50 +++++++++------
 src/java/org/apache/cassandra/db/Memtable.java     |   16 +++--
 .../db/compaction/AbstractCompactionTask.java      |    6 ++
 .../cassandra/db/compaction/CompactionTask.java    |   19 ++----
 .../cassandra/io/util/DiskAwareRunnable.java       |   16 +++--
 .../org/apache/cassandra/streaming/StreamIn.java   |    4 +-
 7 files changed, 66 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 901f559..6b3a705 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@
  * cli: remove default username and password (CASSANDRA-5208)
  * configure populate_io_cache_on_flush per-CF (CASSANDRA-4694)
  * allow configuration of internode socket buffer (CASSANDRA-3378)
+ * Make sstable directory picking blacklist-aware again (CASSANDRA-5193)
 
 1.2.1
  * stream undelivered hints on decommission (CASSANDRA-5128)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index f1db5ed..1f68f62 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -122,11 +122,11 @@ public class Directories
      * @param dataDirectory
      * @return SSTable location
      */
-    public File getLocationForDisk(File dataDirectory)
+    public File getLocationForDisk(DataDirectory dataDirectory)
     {
         for (File dir : sstableDirectories)
         {
-            if 
(dir.getAbsolutePath().startsWith(dataDirectory.getAbsolutePath()))
+            if 
(dir.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
                 return dir;
         }
         return null;
@@ -192,36 +192,46 @@ public class Directories
     }
 
     /**
-     * Find location which is capable of holding given {@code estimatedSize}.
-     * First it looks through for directory with no current task running and
-     * the most free space available.
-     * If no such directory is available, it just chose the one with the most
-     * free space available.
+     * Finds location which is capable of holding given {@code estimatedSize}.
+     * Picks a non-blacklisted directory with most free space and least 
current tasks.
      * If no directory can hold given {@code estimatedSize}, then returns null.
      *
      * @param estimatedSize estimated size you need to find location to fit
      * @return directory capable of given estimated size, or null if none found
      */
-    public static DataDirectory getLocationCapableOfSize(long estimatedSize)
+    public DataDirectory getLocationCapableOfSize(long estimatedSize)
     {
-        // sort by available disk space
-        SortedSet<DataDirectory> directories = 
ImmutableSortedSet.copyOf(dataFileLocations);
+        List<DataDirectory> candidates = new ArrayList<DataDirectory>();
 
-        // if there is disk with sufficient space and no activity running on 
it, then use it
-        for (DataDirectory directory : directories)
+        // pick directories with enough space and so that resulting sstable 
dirs aren't blacklisted for writes.
+        for (DataDirectory dataDir : dataFileLocations)
         {
-            long spaceAvailable = directory.getEstimatedAvailableSpace();
-            if (estimatedSize < spaceAvailable && directory.currentTasks.get() 
== 0)
-                return directory;
+            File sstableDir = getLocationForDisk(dataDir);
+
+            if (BlacklistedDirectories.isUnwritable(sstableDir))
+                continue;
+
+            // need a separate check for sstableDir itself - could be a 
mounted separate disk or SSD just for this CF.
+            if (dataDir.getEstimatedAvailableSpace() > estimatedSize && 
sstableDir.getUsableSpace() * 0.9 > estimatedSize)
+                candidates.add(dataDir);
         }
 
-        // if not, use the one that has largest free space
-        if (estimatedSize < directories.first().getEstimatedAvailableSpace())
-            return directories.first();
-        else
-            return null;
+        // sort directories by free space, in _descending_ order.
+        Collections.sort(candidates);
+
+        // sort directories by load, in _ascending_ order.
+        Collections.sort(candidates, new Comparator<DataDirectory>()
+        {
+            public int compare(DataDirectory a, DataDirectory b)
+            {
+                return a.currentTasks.get() - b.currentTasks.get();
+            }
+        });
+
+        return candidates.isEmpty() ? null : candidates.get(0);
     }
 
+
     public static File getSnapshotDirectory(Descriptor desc, String 
snapshotName)
     {
         return getOrCreate(desc.directory, SNAPSHOT_SUBDIR, snapshotName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index 990ad84..301c297 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -433,22 +433,28 @@ public class Memtable
             return estimatedSize;
         }
 
-        protected void runWith(File dataDirectory) throws Exception
+        protected void runWith(File sstableDirectory) throws Exception
         {
-            assert dataDirectory != null : "Flush task is not bound to any 
disk";
+            assert sstableDirectory != null : "Flush task is not bound to any 
disk";
 
-            SSTableReader sstable = writeSortedContents(context, 
dataDirectory);
+            SSTableReader sstable = writeSortedContents(context, 
sstableDirectory);
             cfs.replaceFlushed(Memtable.this, sstable);
             latch.countDown();
         }
 
-        private SSTableReader writeSortedContents(Future<ReplayPosition> 
context, File dataDirectory) throws ExecutionException, InterruptedException
+        protected Directories getDirectories()
+        {
+            return cfs.directories;
+        }
+
+        private SSTableReader writeSortedContents(Future<ReplayPosition> 
context, File sstableDirectory)
+        throws ExecutionException, InterruptedException
         {
             logger.info("Writing " + Memtable.this.toString());
 
             SSTableReader ssTable;
             // errors when creating the writer that may leave empty temp files.
-            SSTableWriter writer = 
createFlushWriter(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(dataDirectory)));
+            SSTableWriter writer = 
createFlushWriter(cfs.getTempSSTablePath(sstableDirectory));
             try
             {
                 // (we can't clear out the map as-we-go to free up memory,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index 3d64785..0913765 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.Collection;
 
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import 
org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
@@ -41,6 +42,11 @@ public abstract class AbstractCompactionTask extends 
DiskAwareRunnable
 
     public abstract int execute(CompactionExecutorStatsCollector collector);
 
+    protected Directories getDirectories()
+    {
+        return cfs.directories;
+    }
+
     public void unmarkSSTables()
     {
         cfs.getDataTracker().unmarkCompacting(sstables);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index ca1e2da..1b1599b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -21,22 +21,13 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
-import javax.print.attribute.IntegerSyntax;
-
-import com.google.common.base.Predicates;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.primitives.Longs;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.*;
 import 
org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.CloseableIterator;
@@ -99,11 +90,11 @@ public class CompactionTask extends AbstractCompactionTask
      * which are properly serialized.
      * Caller is in charge of marking/unmarking the sstables as compacting.
      */
-    protected void runWith(File dataDirectory) throws Exception
+    protected void runWith(File sstableDirectory) throws Exception
     {
         // The collection of sstables passed may be empty (but not null); even 
if
         // it is not empty, it may compact down to nothing if all rows are 
deleted.
-        assert sstables != null && dataDirectory != null;
+        assert sstables != null && sstableDirectory != null;
 
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
             cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" 
+ cfs.columnFamily);
@@ -156,7 +147,7 @@ public class CompactionTask extends AbstractCompactionTask
                 return;
             }
 
-            SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, 
cfs.directories.getLocationForDisk(dataDirectory), toCompact);
+            SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, 
sstableDirectory, toCompact);
             writers.add(writer);
             while (iter.hasNext())
             {
@@ -194,7 +185,7 @@ public class CompactionTask extends AbstractCompactionTask
                 {
                     // tmp = false because later we want to query it with 
descriptor from SSTableReader
                     cachedKeyMap.put(writer.descriptor.asTemporary(false), 
cachedKeys);
-                    writer = cfs.createCompactionWriter(keysPerSSTable, 
cfs.directories.getLocationForDisk(dataDirectory), toCompact);
+                    writer = cfs.createCompactionWriter(keysPerSSTable, 
sstableDirectory, toCompact);
                     writers.add(writer);
                     cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java 
b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 6c4d95a..1be4803 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -34,7 +34,7 @@ public abstract class DiskAwareRunnable extends 
WrappedRunnable
         while (true)
         {
             writeSize = getExpectedWriteSize();
-            directory = Directories.getLocationCapableOfSize(writeSize);
+            directory = getDirectories().getLocationCapableOfSize(writeSize);
             if (directory != null || !reduceScopeForLimitedSpace())
                 break;
         }
@@ -45,7 +45,7 @@ public abstract class DiskAwareRunnable extends 
WrappedRunnable
         directory.estimatedWorkingSize.addAndGet(writeSize);
         try
         {
-            runWith(directory.location);
+            runWith(getDirectories().getLocationForDisk(directory));
         }
         finally
         {
@@ -55,10 +55,16 @@ public abstract class DiskAwareRunnable extends 
WrappedRunnable
     }
 
     /**
-     * Executes this task on given {@code dataDirectory}.
-     * @param dataDirectory data directory to work on
+     * Get sstable directories for the CF.
+     * @return Directories instance for the CF.
      */
-    protected abstract void runWith(File dataDirectory) throws Exception;
+    protected abstract Directories getDirectories();
+
+    /**
+     * Executes this task on given {@code sstableDirectory}.
+     * @param sstableDirectory sstable directory to work on
+     */
+    protected abstract void runWith(File sstableDirectory) throws Exception;
 
     /**
      * Get expected write size to determine which disk to use for this task.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29915e84/src/java/org/apache/cassandra/streaming/StreamIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java 
b/src/java/org/apache/cassandra/streaming/StreamIn.java
index b8f6b90..740b430 100644
--- a/src/java/org/apache/cassandra/streaming/StreamIn.java
+++ b/src/java/org/apache/cassandra/streaming/StreamIn.java
@@ -80,10 +80,10 @@ public class StreamIn
         // new local sstable
         Table table = Table.open(remotedesc.ksname);
         ColumnFamilyStore cfStore = 
table.getColumnFamilyStore(remotedesc.cfname);
-        Directories.DataDirectory localDir = 
Directories.getLocationCapableOfSize(remote.size);
+        Directories.DataDirectory localDir = 
cfStore.directories.getLocationCapableOfSize(remote.size);
         if (localDir == null)
             throw new RuntimeException("Insufficient disk space to store " + 
remote.size + " bytes");
-        Descriptor localdesc = 
Descriptor.fromFilename(cfStore.getTempSSTablePath(cfStore.directories.getLocationForDisk(localDir.location)));
+        Descriptor localdesc = 
Descriptor.fromFilename(cfStore.getTempSSTablePath(cfStore.directories.getLocationForDisk(localDir)));
 
         return new PendingFile(localdesc, remote);
     }

Reply via email to