Updated Branches:
  refs/heads/trunk 0f52874ab -> e1fef7248

Make compaction, flush JBOD-aware
patch by yukim; reviewed by jbellis for CASSANDRA-4292


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e1fef724
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e1fef724
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e1fef724

Branch: refs/heads/trunk
Commit: e1fef724889a498e0297de897592276b9f86c5cd
Parents: 0f52874
Author: Jonathan Ellis <[email protected]>
Authored: Wed Aug 22 10:29:17 2012 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Wed Aug 22 10:29:28 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   25 +--
 src/java/org/apache/cassandra/db/Directories.java  |  106 +++++++++--
 src/java/org/apache/cassandra/db/Memtable.java     |  153 +++++++++------
 .../db/compaction/AbstractCompactionTask.java      |    3 +-
 .../cassandra/db/compaction/CompactionTask.java    |   65 ++++---
 .../apache/cassandra/db/filter/ColumnCounter.java  |   29 ++-
 .../cassandra/db/filter/SliceQueryFilter.java      |   16 +-
 .../cassandra/io/util/DiskAwareRunnable.java       |   77 ++++++++
 .../apache/cassandra/service/CassandraDaemon.java  |    4 +-
 .../org/apache/cassandra/streaming/StreamIn.java   |    6 +-
 .../unit/org/apache/cassandra/config/DefsTest.java |    1 -
 12 files changed, 343 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8fe1770..d9d67e4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-dev
+ * Make compaction, flush JBOD-aware (CASSANDRA-4292)
  * run local range scans on the read stage (CASSANDRA-3687)
  * clean up ioexceptions (CASSANDRA-2116)
  * Introduce new json format with row level deletion (CASSANDRA-4054)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 20b037e..c3c5f8e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +67,6 @@ import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import static org.apache.cassandra.config.CFMetaData.Caching;
 
@@ -538,20 +538,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return columnFamily;
     }
 
-    /*
-     * @return a temporary file name for an sstable.
-     * When the sstable object is closed, it will be renamed to a non-temporary
-     * format, so incomplete sstables can be recognized and removed on startup.
-     */
-    public String getFlushPath(long estimatedSize, Descriptor.Version version)
-    {
-        File location = directories.getDirectoryForNewSSTables(estimatedSize);
-        if (location == null)
-            throw new RuntimeException("Insufficient disk space to flush " + 
estimatedSize + " bytes");
-        return getTempSSTablePath(location, version);
-    }
-
-    private String getTempSSTablePath(File directory)
+    public String getTempSSTablePath(File directory)
     {
         return getTempSSTablePath(directory, Descriptor.Version.CURRENT);
     }
@@ -889,6 +876,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             return SSTable.getTotalBytes(sstables);
         }
 
+        // cleanup size estimation only counts bytes for keys local to this 
node
         long expectedFileSize = 0;
         Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(table.name);
         for (SSTableReader sstable : sstables)
@@ -1922,13 +1910,6 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return intern(name);
     }
 
-    public SSTableWriter createFlushWriter(long estimatedRows, long 
estimatedSize, ReplayPosition context)
-    {
-        SSTableMetadata.Collector sstableMetadataCollector = 
SSTableMetadata.createCollector().replayPosition(context);
-        String filename = getFlushPath(estimatedSize, 
Descriptor.Version.CURRENT);
-        return new SSTableWriter(filename, estimatedRows, metadata, 
partitioner, sstableMetadataCollector);
-    }
-
     public SSTableWriter createCompactionWriter(long estimatedRows, File 
location, Collection<SSTableReader> sstables)
     {
         ReplayPosition rp = ReplayPosition.getReplayPosition(sstables);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index 29fb36c..a6fac8f 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -21,8 +21,12 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.primitives.Longs;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,7 +35,6 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.MmappedSegmentedFile;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
@@ -64,13 +67,13 @@ public class Directories
     public static final String SNAPSHOT_SUBDIR = "snapshots";
     public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";
 
-    public static final File[] dataFileLocations;
+    public static final DataDirectory[] dataFileLocations;
     static
     {
         String[] locations = DatabaseDescriptor.getAllDataFileLocations();
-        dataFileLocations = new File[locations.length];
+        dataFileLocations = new DataDirectory[locations.length];
         for (int i = 0; i < locations.length; ++i)
-            dataFileLocations[i] = new File(locations[i]);
+            dataFileLocations[i] = new DataDirectory(new File(locations[i]));
     }
 
     private final String tablename;
@@ -93,7 +96,7 @@ public class Directories
         this.cfname = cfname;
         this.sstableDirectories = new File[dataFileLocations.length];
         for (int i = 0; i < dataFileLocations.length; ++i)
-            sstableDirectories[i] = new File(dataFileLocations[i], 
join(tablename, directoryName));
+            sstableDirectories[i] = new File(dataFileLocations[i].location, 
join(tablename, directoryName));
 
         if (!StorageService.instance.isClientMode())
         {
@@ -102,6 +105,22 @@ public class Directories
         }
     }
 
+    /**
+     * Returns SSTable location which is inside given data directory.
+     *
+     * @param dataDirectory
+     * @return SSTable location
+     */
+    public File getLocationForDisk(File dataDirectory)
+    {
+        for (File dir : sstableDirectories)
+        {
+            if 
(FileUtils.getCanonicalPath(dir).startsWith(FileUtils.getCanonicalPath(dataDirectory)))
+                return dir;
+        }
+        return null;
+    }
+
     public File getDirectoryForNewSSTables(long estimatedSize)
     {
         File path = getLocationWithMaximumAvailableSpace(estimatedSize);
@@ -160,6 +179,37 @@ public class Directories
         return null;
     }
 
+    /**
+     * Find location which is capable of holding given {@code estimatedSize}.
+     * First it looks through for directory with no current task running and
+     * the most free space available.
+     * If no such directory is available, it just chose the one with the most
+     * free space available.
+     * If no directory can hold given {@code estimatedSize}, then returns null.
+     *
+     * @param estimatedSize estimated size you need to find location to fit
+     * @return directory capable of given estimated size, or null if none found
+     */
+    public static DataDirectory getLocationCapableOfSize(long estimatedSize)
+    {
+        // sort by available disk space
+        SortedSet<DataDirectory> directories = 
ImmutableSortedSet.copyOf(dataFileLocations);
+
+        // if there is disk with sufficient space and no activity running on 
it, then use it
+        for (DataDirectory directory : directories)
+        {
+            long spaceAvailable = directory.getEstimatedAvailableSpace();
+            if (estimatedSize < spaceAvailable && directory.currentTasks.get() 
== 0)
+                return directory;
+        }
+
+        // if not, use the one that has largest free space
+        if (estimatedSize < directories.first().getEstimatedAvailableSpace())
+            return directories.first();
+        else
+            return null;
+    }
+
     public static File getSnapshotDirectory(Descriptor desc, String 
snapshotName)
     {
         return getOrCreate(desc.directory, SNAPSHOT_SUBDIR, snapshotName);
@@ -175,6 +225,34 @@ public class Directories
         return new SSTableLister();
     }
 
+    public static class DataDirectory implements Comparable<DataDirectory>
+    {
+        public final File location;
+        public final AtomicInteger currentTasks = new AtomicInteger();
+        public final AtomicLong estimatedWorkingSize = new AtomicLong();
+
+        public DataDirectory(File location)
+        {
+            this.location = location;
+        }
+
+        /**
+         * @return estimated available disk space for bounded directory,
+         * excluding the expected size written by tasks in the queue.
+         */
+        public long getEstimatedAvailableSpace()
+        {
+            // Load factor of 0.9 we do not want to use the entire disk that 
is too risky.
+            return (long)(0.9 * location.getUsableSpace()) - 
estimatedWorkingSize.get();
+        }
+
+        public int compareTo(DataDirectory o)
+        {
+            // we want to sort by free space in descending order
+            return -1 * Longs.compare(getEstimatedAvailableSpace(), 
o.getEstimatedAvailableSpace());
+        }
+    }
+
     public class SSTableLister
     {
         private boolean skipTemporary;
@@ -389,9 +467,9 @@ public class Directories
             return false;
 
         boolean hasSystemKeyspace = false;
-        for (File location : dataFileLocations)
+        for (DataDirectory dir : dataFileLocations)
         {
-            File systemDir = new File(location, Table.SYSTEM_TABLE);
+            File systemDir = new File(dir.location, Table.SYSTEM_TABLE);
             hasSystemKeyspace |= (systemDir.exists() && 
systemDir.isDirectory());
             File statusCFDir = new File(systemDir, 
SystemTable.SCHEMA_KEYSPACES_CF);
             if (statusCFDir.exists())
@@ -403,8 +481,8 @@ public class Directories
 
         // Check whether the migration might create too long a filename
         int longestLocation = -1;
-        for (File loc : dataFileLocations)
-            longestLocation = Math.max(longestLocation, 
FileUtils.getCanonicalPath(loc).length());
+        for (DataDirectory loc : dataFileLocations)
+            longestLocation = Math.max(longestLocation, 
FileUtils.getCanonicalPath(loc.location).length());
 
         // Check that migration won't error out halfway through from too-long 
paths.  For Windows, we need to check
         // total path length <= 255 (see 
http://msdn.microsoft.com/en-us/library/aa365247.aspx and discussion on 
CASSANDRA-2749);
@@ -450,12 +528,12 @@ public class Directories
     {
         logger.info("Upgrade from pre-1.1 version detected: migrating sstables 
to new directory layout");
 
-        for (File location : dataFileLocations)
+        for (DataDirectory dir : dataFileLocations)
         {
-            if (!location.exists() || !location.isDirectory())
+            if (!dir.location.exists() || !dir.location.isDirectory())
                 continue;
 
-            for (File ksDir : location.listFiles())
+            for (File ksDir : dir.location.listFiles())
             {
                 if (!ksDir.isDirectory())
                     continue;
@@ -527,7 +605,7 @@ public class Directories
     static void overrideDataDirectoriesForTest(String loc)
     {
         for (int i = 0; i < dataFileLocations.length; ++i)
-            dataFileLocations[i] = new File(loc);
+            dataFileLocations[i] = new DataDirectory(new File(loc));
     }
 
     // Hack for tests, don't use otherwise
@@ -535,6 +613,6 @@ public class Directories
     {
         String[] locations = DatabaseDescriptor.getAllDataFileLocations();
         for (int i = 0; i < locations.length; ++i)
-            dataFileLocations[i] = new File(locations[i]);
+            dataFileLocations[i] = new DataDirectory(new File(locations[i]));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index 8d8ad89..aa6789d 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -25,6 +25,10 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
+
+import org.apache.cassandra.io.util.DiskAwareRunnable;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.github.jamm.MemoryMeter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,13 +40,10 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.AbstractColumnIterator;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.io.sstable.SSTableMetadata;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SlabAllocator;
-import org.apache.cassandra.utils.WrappedRunnable;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
-import org.github.jamm.MemoryMeter;
 
 public class Memtable
 {
@@ -255,65 +256,9 @@ public class Memtable
         return builder.toString();
     }
 
-    private SSTableReader writeSortedContents(Future<ReplayPosition> context) 
throws ExecutionException, InterruptedException
-    {
-        logger.info("Writing " + this);
-
-        long keySize = 0;
-        for (RowPosition key : columnFamilies.keySet())
-        {
-            //  make sure we don't write non-sensical keys
-            assert key instanceof DecoratedKey;
-            keySize += ((DecoratedKey)key).key.remaining();
-        }
-        long estimatedSize = (long) ((keySize // index entries
-                                      + keySize // keys in data file
-                                      + currentSize.get()) // data
-                                     * 1.2); // bloom filter and row index 
overhead
-        SSTableReader ssTable;
-        // errors when creating the writer that may leave empty temp files.
-        SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), 
estimatedSize, context.get());
-        try
-        {
-            // (we can't clear out the map as-we-go to free up memory,
-            //  since the memtable is being used for queries in the "pending 
flush" category)
-            for (Map.Entry<RowPosition, ColumnFamily> entry : 
columnFamilies.entrySet())
-            {
-                ColumnFamily cf = entry.getValue();
-                if (cf.isMarkedForDelete())
-                {
-                    // Pedantically, you could purge column level tombstones 
that are past GcGRace when writing to the SSTable.
-                    // But it can result in unexpected behaviour where deletes 
never make it to disk,
-                    // as they are lost and so cannot override existing column 
values. So we only remove deleted columns if there
-                    // is a CF level tombstone to ensure the delete makes it 
into an SSTable.
-                    ColumnFamilyStore.removeDeletedColumnsOnly(cf, 
Integer.MIN_VALUE);
-                }
-                writer.append((DecoratedKey)entry.getKey(), cf);
-            }
-
-            ssTable = writer.closeAndOpenReader();
-            logger.info(String.format("Completed flushing %s (%d bytes) for 
commitlog position %s",
-                        ssTable.getFilename(), new 
File(ssTable.getFilename()).length(), context.get()));
-            return ssTable;
-        }
-        catch (Throwable e)
-        {
-            writer.abort();
-            throw Throwables.propagate(e);
-        }
-    }
-
     public void flushAndSignal(final CountDownLatch latch, ExecutorService 
writer, final Future<ReplayPosition> context)
     {
-        writer.execute(new WrappedRunnable()
-        {
-            public void runMayThrow() throws Exception
-            {
-                SSTableReader sstable = writeSortedContents(context);
-                cfs.replaceFlushed(Memtable.this, sstable);
-                latch.countDown();
-            }
-        });
+        writer.execute(new FlushRunnable(latch, context));
     }
 
     public String toString()
@@ -439,4 +384,90 @@ public class Memtable
     {
         return creationTime;
     }
+
+    class FlushRunnable extends DiskAwareRunnable
+    {
+        private final CountDownLatch latch;
+        private final Future<ReplayPosition> context;
+        private final long estimatedSize;
+
+        FlushRunnable(CountDownLatch latch, Future<ReplayPosition> context)
+        {
+            this.latch = latch;
+            this.context = context;
+
+            long keySize = 0;
+            for (RowPosition key : columnFamilies.keySet())
+            {
+                //  make sure we don't write non-sensical keys
+                assert key instanceof DecoratedKey;
+                keySize += ((DecoratedKey)key).key.remaining();
+            }
+            estimatedSize = (long) ((keySize // index entries
+                                    + keySize // keys in data file
+                                    + currentSize.get()) // data
+                                    * 1.2); // bloom filter and row index 
overhead
+        }
+
+        public long getExpectedWriteSize()
+        {
+            return estimatedSize;
+        }
+
+        protected void runWith(File dataDirectory) throws Exception
+        {
+            assert dataDirectory != null : "Flush task is not bound to any 
disk";
+
+            SSTableReader sstable = writeSortedContents(context, 
dataDirectory);
+            cfs.replaceFlushed(Memtable.this, sstable);
+            latch.countDown();
+        }
+
+        private SSTableReader writeSortedContents(Future<ReplayPosition> 
context, File dataDirectory) throws ExecutionException, InterruptedException
+        {
+            logger.info("Writing " + Memtable.this.toString());
+
+            SSTableReader ssTable;
+            // errors when creating the writer that may leave empty temp files.
+            SSTableWriter writer = 
createFlushWriter(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(dataDirectory)));
+            try
+            {
+                // (we can't clear out the map as-we-go to free up memory,
+                //  since the memtable is being used for queries in the 
"pending flush" category)
+                for (Map.Entry<RowPosition, ColumnFamily> entry : 
columnFamilies.entrySet())
+                {
+                    ColumnFamily cf = entry.getValue();
+                    if (cf.isMarkedForDelete())
+                    {
+                        // Pedantically, you could purge column level 
tombstones that are past GcGRace when writing to the SSTable.
+                        // But it can result in unexpected behaviour where 
deletes never make it to disk,
+                        // as they are lost and so cannot override existing 
column values. So we only remove deleted columns if there
+                        // is a CF level tombstone to ensure the delete makes 
it into an SSTable.
+                        ColumnFamilyStore.removeDeletedColumnsOnly(cf, 
Integer.MIN_VALUE);
+                    }
+                    writer.append((DecoratedKey)entry.getKey(), cf);
+                }
+
+                ssTable = writer.closeAndOpenReader();
+                logger.info(String.format("Completed flushing %s (%d bytes) 
for commitlog position %s",
+                            ssTable.getFilename(), new 
File(ssTable.getFilename()).length(), context.get()));
+                return ssTable;
+            }
+            catch (Throwable e)
+            {
+                writer.abort();
+                throw Throwables.propagate(e);
+            }
+        }
+
+        public SSTableWriter createFlushWriter(String filename) throws 
ExecutionException, InterruptedException
+        {
+            SSTableMetadata.Collector sstableMetadataCollector = 
SSTableMetadata.createCollector().replayPosition(context.get());
+            return new SSTableWriter(filename,
+                                     columnFamilies.size(),
+                                     cfs.metadata,
+                                     cfs.partitioner,
+                                     sstableMetadataCollector);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index ae4c344..fa032d9 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -23,8 +23,9 @@ import java.util.Set;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import 
org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
+import org.apache.cassandra.io.util.DiskAwareRunnable;
 
-public abstract class AbstractCompactionTask
+public abstract class AbstractCompactionTask extends DiskAwareRunnable
 {
     protected final ColumnFamilyStore cfs;
     protected Collection<SSTableReader> sstables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 7a88c68..4d2a90f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -43,11 +43,14 @@ public class CompactionTask extends AbstractCompactionTask
     protected static final Logger logger = 
LoggerFactory.getLogger(CompactionTask.class);
     protected final int gcBefore;
     protected static long totalBytesCompacted = 0;
+    private Set<SSTableReader> toCompact;
+    private CompactionExecutorStatsCollector collector;
 
     public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> 
sstables, final int gcBefore)
     {
         super(cfs, sstables);
         this.gcBefore = gcBefore;
+        toCompact = new HashSet<SSTableReader>(sstables);
     }
 
     public static synchronized long addToTotalBytesCompacted(long 
bytesCompacted)
@@ -62,37 +65,48 @@ public class CompactionTask extends AbstractCompactionTask
      */
     public int execute(CompactionExecutorStatsCollector collector)
     {
-        // The collection of sstables passed may be empty (but not null); even 
if
-        // it is not empty, it may compact down to nothing if all rows are 
deleted.
-        assert sstables != null;
+        this.collector = collector;
+        run();
+        return toCompact.size();
+    }
 
-        Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
-        if (!isCompactionInteresting(toCompact))
-            return 0;
+    public long getExpectedWriteSize()
+    {
+        return cfs.getExpectedCompactedFileSize(toCompact, compactionType);
+    }
 
-        File compactionFileLocation = 
cfs.directories.getDirectoryForNewSSTables(cfs.getExpectedCompactedFileSize(toCompact,
 compactionType));
-        if (compactionFileLocation == null && partialCompactionsAcceptable())
+    public boolean reduceScopeForLimitedSpace()
+    {
+        if (partialCompactionsAcceptable() && toCompact.size() > 1)
         {
-            // If the compaction file path is null that means we have no space 
left for this compaction.
             // Try again w/o the largest one.
-            while (compactionFileLocation == null && toCompact.size() > 1)
-            {
-                logger.warn("insufficient space to compact all requested files 
" + StringUtils.join(toCompact, ", "));
-                // Note that we have removed files that are still marked as 
compacting.
-                // This suboptimal but ok since the caller will unmark all the 
sstables at the end.
-                toCompact.remove(cfs.getMaxSizeFile(toCompact));
-                compactionFileLocation = 
cfs.directories.getDirectoryForNewSSTables(cfs.getExpectedCompactedFileSize(toCompact,
 compactionType));
-            }
+            logger.warn("insufficient space to compact all requested files " + 
StringUtils.join(toCompact, ", "));
+            // Note that we have removed files that are still marked as 
compacting.
+            // This suboptimal but ok since the caller will unmark all the 
sstables at the end.
+            return toCompact.remove(cfs.getMaxSizeFile(toCompact));
         }
-
-        if (compactionFileLocation == null)
+        else
         {
-            logger.warn("insufficient space to compact; aborting compaction");
-            return 0;
+            return false;
         }
+    }
+
+    /**
+     * For internal use and testing only.  The rest of the system should go 
through the submit* methods,
+     * which are properly serialized.
+     * Caller is in charge of marking/unmarking the sstables as compacting.
+     */
+    protected void runWith(File dataDirectory) throws Exception
+    {
+        // The collection of sstables passed may be empty (but not null); even 
if
+        // it is not empty, it may compact down to nothing if all rows are 
deleted.
+        assert sstables != null && dataDirectory != null;
+
+        if (!isCompactionInteresting(toCompact))
+            return;
 
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
-            cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + 
"compact-" + cfs.columnFamily);
+            cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" 
+ cfs.columnFamily);
 
         // sanity check: all sstables must belong to the same cfs
         for (SSTableReader sstable : toCompact)
@@ -138,10 +152,10 @@ public class CompactionTask extends AbstractCompactionTask
                 // we need to sync it (via closeAndOpen) first, so there is no 
period during which
                 // a crash could cause data loss.
                 cfs.markCompacted(toCompact, compactionType);
-                return 0;
+                return;
             }
 
-            SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, 
compactionFileLocation, toCompact);
+            SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, 
dataDirectory, toCompact);
             writers.add(writer);
             while (nni.hasNext())
             {
@@ -173,7 +187,7 @@ public class CompactionTask extends AbstractCompactionTask
                     sstables.add(toIndex);
                     if (nni.hasNext())
                     {
-                        writer = cfs.createCompactionWriter(keysPerSSTable, 
compactionFileLocation, toCompact);
+                        writer = cfs.createCompactionWriter(keysPerSSTable, 
dataDirectory, toCompact);
                         writers.add(writer);
                         cachedKeys = new HashMap<DecoratedKey, 
RowIndexEntry>();
                     }
@@ -225,7 +239,6 @@ public class CompactionTask extends AbstractCompactionTask
         logger.info(String.format("Compacted to %s.  %,d to %,d (~%d%% of 
original) bytes for %,d keys at %fMB/s.  Time: %,dms.",
                                   builder.toString(), startsize, endsize, 
(int) (ratio * 100), totalkeysWritten, mbps, dTime));
         logger.debug(String.format("CF Total Bytes Compacted: %,d", 
CompactionTask.addToTotalBytesCompacted(endsize)));
-        return toCompact.size();
     }
 
     protected boolean partialCompactionsAcceptable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java 
b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index c642b82..ccf83d9 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -29,12 +29,15 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ColumnCounter
 {
-    protected int count;
+    protected int live;
+    protected int ignored;
 
-    public void countColum(IColumn column, IColumnContainer container)
+    public void count(IColumn column, IColumnContainer container)
     {
-        if (isLive(column, container))
-            count++;
+        if (!isLive(column, container))
+            ignored++;
+        else
+            live++;
     }
 
     protected static boolean isLive(IColumn column, IColumnContainer container)
@@ -42,9 +45,14 @@ public class ColumnCounter
         return column.isLive() && 
(!container.deletionInfo().isDeleted(column));
     }
 
-    public int count()
+    public int live()
     {
-        return count;
+        return live;
+    }
+
+    public int ignored()
+    {
+        return ignored;
     }
 
     public static class GroupByPrefix extends ColumnCounter
@@ -71,14 +79,17 @@ public class ColumnCounter
             assert toGroup == 0 || type != null;
         }
 
-        public void countColum(IColumn column, IColumnContainer container)
+        public void count(IColumn column, IColumnContainer container)
         {
             if (!isLive(column, container))
+            {
+                ignored++;
                 return;
+            }
 
             if (toGroup == 0)
             {
-                count = 1;
+                live = 1;
                 return;
             }
 
@@ -101,7 +112,7 @@ public class ColumnCounter
                     return;
             }
 
-            count++;
+            live++;
             last = current;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java 
b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index f868539..9b0e3ef 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -148,15 +148,19 @@ public class SliceQueryFilter implements IFilter
 
         while (reducedColumns.hasNext())
         {
-            if (columnCounter.count() >= count)
+            if (columnCounter.live() >= count)
+            {
+                logger.debug("Read %s live columns and %s tombstoned",
+                             columnCounter.live(), columnCounter.ignored());
                 break;
+            }
 
             IColumn column = reducedColumns.next();
-            if (logger.isDebugEnabled())
-                logger.debug(String.format("collecting %s of %s: %s",
-                                           columnCounter.count(), count, 
column.getString(comparator)));
+            if (logger.isTraceEnabled())
+                logger.trace(String.format("collecting %s of %s: %s",
+                                           columnCounter.live(), count, 
column.getString(comparator)));
 
-            columnCounter.countColum(column, container);
+            columnCounter.count(column, container);
 
             // but we need to add all non-gc-able columns to the result for 
read repair:
             if (QueryFilter.isRelevant(column, container, gcBefore))
@@ -182,7 +186,7 @@ public class SliceQueryFilter implements IFilter
 
     public int lastCounted()
     {
-        return columnCounter == null ? 0 : columnCounter.count();
+        return columnCounter == null ? 0 : columnCounter.live();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java 
b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
new file mode 100644
index 0000000..6c4d95a
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public abstract class DiskAwareRunnable extends WrappedRunnable
+{
+    /**
+     * Run this task after selecting the optimal disk for it
+     */
+    protected void runMayThrow() throws Exception
+    {
+        long writeSize;
+        Directories.DataDirectory directory;
+        while (true)
+        {
+            writeSize = getExpectedWriteSize();
+            directory = Directories.getLocationCapableOfSize(writeSize);
+            if (directory != null || !reduceScopeForLimitedSpace())
+                break;
+        }
+        if (directory == null)
+            throw new RuntimeException("Insufficient disk space to write " + 
writeSize + " bytes");
+
+        directory.currentTasks.incrementAndGet();
+        directory.estimatedWorkingSize.addAndGet(writeSize);
+        try
+        {
+            runWith(directory.location);
+        }
+        finally
+        {
+            directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
+            directory.currentTasks.decrementAndGet();
+        }
+    }
+
+    /**
+     * Executes this task on given {@code dataDirectory}.
+     * @param dataDirectory data directory to work on
+     */
+    protected abstract void runWith(File dataDirectory) throws Exception;
+
+    /**
+     * Get expected write size to determine which disk to use for this task.
+     * @return expected size in bytes this task will write to disk.
+     */
+    public abstract long getExpectedWriteSize();
+
+    /**
+     * Called if no disk is available with free space for the full write size.
+     * @return true if the scope of the task was successfully reduced.
+     */
+    public boolean reduceScopeForLimitedSpace()
+    {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index af0cd4f..11e8a24 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -140,8 +140,8 @@ public class CassandraDaemon
 
         // check all directories(data, commitlog, saved cache) for existence 
and permission
         Iterable<String> dirs = 
Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
-                                                 Arrays.asList(new String[] 
{DatabaseDescriptor.getCommitLogLocation(),
-                                                                             
DatabaseDescriptor.getSavedCachesLocation()}));
+                                                 
Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
+                                                               
DatabaseDescriptor.getSavedCachesLocation()));
         for (String dataDir : dirs)
         {
             logger.debug("Checking directory {}", dataDir);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/streaming/StreamIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java 
b/src/java/org/apache/cassandra/streaming/StreamIn.java
index eb00e20..a0d605d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamIn.java
+++ b/src/java/org/apache/cassandra/streaming/StreamIn.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -79,7 +80,10 @@ public class StreamIn
         // new local sstable
         Table table = Table.open(remotedesc.ksname);
         ColumnFamilyStore cfStore = 
table.getColumnFamilyStore(remotedesc.cfname);
-        Descriptor localdesc = 
Descriptor.fromFilename(cfStore.getFlushPath(remote.size, 
Descriptor.Version.CURRENT));
+        Directories.DataDirectory localDir = 
Directories.getLocationCapableOfSize(remote.size);
+        if (localDir == null)
+            throw new RuntimeException("Insufficient disk space to store " + 
remote.size + " bytes");
+        Descriptor localdesc = 
Descriptor.fromFilename(cfStore.getTempSSTablePath(localDir.location));
 
         return new PendingFile(localdesc, remote);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/test/unit/org/apache/cassandra/config/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java 
b/test/unit/org/apache/cassandra/config/DefsTest.java
index 17595aa..cca87fb 100644
--- a/test/unit/org/apache/cassandra/config/DefsTest.java
+++ b/test/unit/org/apache/cassandra/config/DefsTest.java
@@ -217,7 +217,6 @@ public class DefsTest extends SchemaLoader
         ColumnFamilyStore store = 
Table.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
         assert store != null;
         store.forceBlockingFlush();
-        store.getFlushPath(1024, Descriptor.Version.CURRENT);
         assert store.directories.sstableLister().list().size() > 0;
 
         MigrationManager.announceColumnFamilyDrop(ks.name, cfm.cfName);

Reply via email to