Updated Branches: refs/heads/trunk 0f52874ab -> e1fef7248
Make compaction, flush JBOD-aware patch by yukim; reviewed by jbellis for CASSANDRA-4292 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e1fef724 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e1fef724 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e1fef724 Branch: refs/heads/trunk Commit: e1fef724889a498e0297de897592276b9f86c5cd Parents: 0f52874 Author: Jonathan Ellis <[email protected]> Authored: Wed Aug 22 10:29:17 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Aug 22 10:29:28 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 25 +-- src/java/org/apache/cassandra/db/Directories.java | 106 +++++++++-- src/java/org/apache/cassandra/db/Memtable.java | 153 +++++++++------ .../db/compaction/AbstractCompactionTask.java | 3 +- .../cassandra/db/compaction/CompactionTask.java | 65 ++++--- .../apache/cassandra/db/filter/ColumnCounter.java | 29 ++- .../cassandra/db/filter/SliceQueryFilter.java | 16 +- .../cassandra/io/util/DiskAwareRunnable.java | 77 ++++++++ .../apache/cassandra/service/CassandraDaemon.java | 4 +- .../org/apache/cassandra/streaming/StreamIn.java | 6 +- .../unit/org/apache/cassandra/config/DefsTest.java | 1 - 12 files changed, 343 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8fe1770..d9d67e4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-dev + * Make compaction, flush JBOD-aware (CASSANDRA-4292) * run local range scans on the read stage (CASSANDRA-3687) * clean up ioexceptions (CASSANDRA-2116) * Introduce new json format with row level deletion (CASSANDRA-4054) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 20b037e..c3c5f8e 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; +import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,6 @@ import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.utils.*; -import org.cliffc.high_scale_lib.NonBlockingHashMap; import static org.apache.cassandra.config.CFMetaData.Caching; @@ -538,20 +538,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return columnFamily; } - /* - * @return a temporary file name for an sstable. - * When the sstable object is closed, it will be renamed to a non-temporary - * format, so incomplete sstables can be recognized and removed on startup. - */ - public String getFlushPath(long estimatedSize, Descriptor.Version version) - { - File location = directories.getDirectoryForNewSSTables(estimatedSize); - if (location == null) - throw new RuntimeException("Insufficient disk space to flush " + estimatedSize + " bytes"); - return getTempSSTablePath(location, version); - } - - private String getTempSSTablePath(File directory) + public String getTempSSTablePath(File directory) { return getTempSSTablePath(directory, Descriptor.Version.CURRENT); } @@ -889,6 +876,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return SSTable.getTotalBytes(sstables); } + // cleanup size estimation only counts bytes for keys local to this node long expectedFileSize = 0; Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(table.name); for (SSTableReader sstable : sstables) @@ -1922,13 +1910,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return intern(name); } - public SSTableWriter createFlushWriter(long estimatedRows, long estimatedSize, ReplayPosition context) - { - SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(context); - String filename = getFlushPath(estimatedSize, Descriptor.Version.CURRENT); - return new SSTableWriter(filename, estimatedRows, metadata, partitioner, sstableMetadataCollector); - } - public SSTableWriter createCompactionWriter(long estimatedRows, File location, Collection<SSTableReader> sstables) { ReplayPosition rp = ReplayPosition.getReplayPosition(sstables); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 29fb36c..a6fac8f 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -21,8 +21,12 @@ import java.io.File; import java.io.FileFilter; import java.io.IOException; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.primitives.Longs; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +35,6 @@ import org.apache.cassandra.config.*; import org.apache.cassandra.db.compaction.LeveledManifest; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.MmappedSegmentedFile; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.Pair; @@ -64,13 +67,13 @@ public class Directories public static final String SNAPSHOT_SUBDIR = "snapshots"; public static final String SECONDARY_INDEX_NAME_SEPARATOR = "."; - public static final File[] dataFileLocations; + public static final DataDirectory[] dataFileLocations; static { String[] locations = DatabaseDescriptor.getAllDataFileLocations(); - dataFileLocations = new File[locations.length]; + dataFileLocations = new DataDirectory[locations.length]; for (int i = 0; i < locations.length; ++i) - dataFileLocations[i] = new File(locations[i]); + dataFileLocations[i] = new DataDirectory(new File(locations[i])); } private final String tablename; @@ -93,7 +96,7 @@ public class Directories this.cfname = cfname; this.sstableDirectories = new File[dataFileLocations.length]; for (int i = 0; i < dataFileLocations.length; ++i) - sstableDirectories[i] = new File(dataFileLocations[i], join(tablename, directoryName)); + sstableDirectories[i] = new File(dataFileLocations[i].location, join(tablename, directoryName)); if (!StorageService.instance.isClientMode()) { @@ -102,6 +105,22 @@ public class Directories } } + /** + * Returns SSTable location which is inside given data directory. + * + * @param dataDirectory + * @return SSTable location + */ + public File getLocationForDisk(File dataDirectory) + { + for (File dir : sstableDirectories) + { + if (FileUtils.getCanonicalPath(dir).startsWith(FileUtils.getCanonicalPath(dataDirectory))) + return dir; + } + return null; + } + public File getDirectoryForNewSSTables(long estimatedSize) { File path = getLocationWithMaximumAvailableSpace(estimatedSize); @@ -160,6 +179,37 @@ public class Directories return null; } + /** + * Find location which is capable of holding given {@code estimatedSize}. + * First it looks through for directory with no current task running and + * the most free space available. + * If no such directory is available, it just chose the one with the most + * free space available. + * If no directory can hold given {@code estimatedSize}, then returns null. + * + * @param estimatedSize estimated size you need to find location to fit + * @return directory capable of given estimated size, or null if none found + */ + public static DataDirectory getLocationCapableOfSize(long estimatedSize) + { + // sort by available disk space + SortedSet<DataDirectory> directories = ImmutableSortedSet.copyOf(dataFileLocations); + + // if there is disk with sufficient space and no activity running on it, then use it + for (DataDirectory directory : directories) + { + long spaceAvailable = directory.getEstimatedAvailableSpace(); + if (estimatedSize < spaceAvailable && directory.currentTasks.get() == 0) + return directory; + } + + // if not, use the one that has largest free space + if (estimatedSize < directories.first().getEstimatedAvailableSpace()) + return directories.first(); + else + return null; + } + public static File getSnapshotDirectory(Descriptor desc, String snapshotName) { return getOrCreate(desc.directory, SNAPSHOT_SUBDIR, snapshotName); @@ -175,6 +225,34 @@ public class Directories return new SSTableLister(); } + public static class DataDirectory implements Comparable<DataDirectory> + { + public final File location; + public final AtomicInteger currentTasks = new AtomicInteger(); + public final AtomicLong estimatedWorkingSize = new AtomicLong(); + + public DataDirectory(File location) + { + this.location = location; + } + + /** + * @return estimated available disk space for bounded directory, + * excluding the expected size written by tasks in the queue. + */ + public long getEstimatedAvailableSpace() + { + // Load factor of 0.9 we do not want to use the entire disk that is too risky. + return (long)(0.9 * location.getUsableSpace()) - estimatedWorkingSize.get(); + } + + public int compareTo(DataDirectory o) + { + // we want to sort by free space in descending order + return -1 * Longs.compare(getEstimatedAvailableSpace(), o.getEstimatedAvailableSpace()); + } + } + public class SSTableLister { private boolean skipTemporary; @@ -389,9 +467,9 @@ public class Directories return false; boolean hasSystemKeyspace = false; - for (File location : dataFileLocations) + for (DataDirectory dir : dataFileLocations) { - File systemDir = new File(location, Table.SYSTEM_TABLE); + File systemDir = new File(dir.location, Table.SYSTEM_TABLE); hasSystemKeyspace |= (systemDir.exists() && systemDir.isDirectory()); File statusCFDir = new File(systemDir, SystemTable.SCHEMA_KEYSPACES_CF); if (statusCFDir.exists()) @@ -403,8 +481,8 @@ public class Directories // Check whether the migration might create too long a filename int longestLocation = -1; - for (File loc : dataFileLocations) - longestLocation = Math.max(longestLocation, FileUtils.getCanonicalPath(loc).length()); + for (DataDirectory loc : dataFileLocations) + longestLocation = Math.max(longestLocation, FileUtils.getCanonicalPath(loc.location).length()); // Check that migration won't error out halfway through from too-long paths. For Windows, we need to check // total path length <= 255 (see http://msdn.microsoft.com/en-us/library/aa365247.aspx and discussion on CASSANDRA-2749); @@ -450,12 +528,12 @@ public class Directories { logger.info("Upgrade from pre-1.1 version detected: migrating sstables to new directory layout"); - for (File location : dataFileLocations) + for (DataDirectory dir : dataFileLocations) { - if (!location.exists() || !location.isDirectory()) + if (!dir.location.exists() || !dir.location.isDirectory()) continue; - for (File ksDir : location.listFiles()) + for (File ksDir : dir.location.listFiles()) { if (!ksDir.isDirectory()) continue; @@ -527,7 +605,7 @@ public class Directories static void overrideDataDirectoriesForTest(String loc) { for (int i = 0; i < dataFileLocations.length; ++i) - dataFileLocations[i] = new File(loc); + dataFileLocations[i] = new DataDirectory(new File(loc)); } // Hack for tests, don't use otherwise @@ -535,6 +613,6 @@ public class Directories { String[] locations = DatabaseDescriptor.getAllDataFileLocations(); for (int i = 0; i < locations.length; ++i) - dataFileLocations[i] = new File(locations[i]); + dataFileLocations[i] = new DataDirectory(new File(locations[i])); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 8d8ad89..aa6789d 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -25,6 +25,10 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Function; import com.google.common.base.Throwables; + +import org.apache.cassandra.io.util.DiskAwareRunnable; +import org.cliffc.high_scale_lib.NonBlockingHashSet; +import org.github.jamm.MemoryMeter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,13 +40,10 @@ import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.AbstractColumnIterator; import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.db.filter.SliceQueryFilter; +import org.apache.cassandra.io.sstable.SSTableMetadata; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableWriter; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.SlabAllocator; -import org.apache.cassandra.utils.WrappedRunnable; -import org.cliffc.high_scale_lib.NonBlockingHashSet; -import org.github.jamm.MemoryMeter; public class Memtable { @@ -255,65 +256,9 @@ public class Memtable return builder.toString(); } - private SSTableReader writeSortedContents(Future<ReplayPosition> context) throws ExecutionException, InterruptedException - { - logger.info("Writing " + this); - - long keySize = 0; - for (RowPosition key : columnFamilies.keySet()) - { - // make sure we don't write non-sensical keys - assert key instanceof DecoratedKey; - keySize += ((DecoratedKey)key).key.remaining(); - } - long estimatedSize = (long) ((keySize // index entries - + keySize // keys in data file - + currentSize.get()) // data - * 1.2); // bloom filter and row index overhead - SSTableReader ssTable; - // errors when creating the writer that may leave empty temp files. - SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), estimatedSize, context.get()); - try - { - // (we can't clear out the map as-we-go to free up memory, - // since the memtable is being used for queries in the "pending flush" category) - for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet()) - { - ColumnFamily cf = entry.getValue(); - if (cf.isMarkedForDelete()) - { - // Pedantically, you could purge column level tombstones that are past GcGRace when writing to the SSTable. - // But it can result in unexpected behaviour where deletes never make it to disk, - // as they are lost and so cannot override existing column values. So we only remove deleted columns if there - // is a CF level tombstone to ensure the delete makes it into an SSTable. - ColumnFamilyStore.removeDeletedColumnsOnly(cf, Integer.MIN_VALUE); - } - writer.append((DecoratedKey)entry.getKey(), cf); - } - - ssTable = writer.closeAndOpenReader(); - logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s", - ssTable.getFilename(), new File(ssTable.getFilename()).length(), context.get())); - return ssTable; - } - catch (Throwable e) - { - writer.abort(); - throw Throwables.propagate(e); - } - } - public void flushAndSignal(final CountDownLatch latch, ExecutorService writer, final Future<ReplayPosition> context) { - writer.execute(new WrappedRunnable() - { - public void runMayThrow() throws Exception - { - SSTableReader sstable = writeSortedContents(context); - cfs.replaceFlushed(Memtable.this, sstable); - latch.countDown(); - } - }); + writer.execute(new FlushRunnable(latch, context)); } public String toString() @@ -439,4 +384,90 @@ public class Memtable { return creationTime; } + + class FlushRunnable extends DiskAwareRunnable + { + private final CountDownLatch latch; + private final Future<ReplayPosition> context; + private final long estimatedSize; + + FlushRunnable(CountDownLatch latch, Future<ReplayPosition> context) + { + this.latch = latch; + this.context = context; + + long keySize = 0; + for (RowPosition key : columnFamilies.keySet()) + { + // make sure we don't write non-sensical keys + assert key instanceof DecoratedKey; + keySize += ((DecoratedKey)key).key.remaining(); + } + estimatedSize = (long) ((keySize // index entries + + keySize // keys in data file + + currentSize.get()) // data + * 1.2); // bloom filter and row index overhead + } + + public long getExpectedWriteSize() + { + return estimatedSize; + } + + protected void runWith(File dataDirectory) throws Exception + { + assert dataDirectory != null : "Flush task is not bound to any disk"; + + SSTableReader sstable = writeSortedContents(context, dataDirectory); + cfs.replaceFlushed(Memtable.this, sstable); + latch.countDown(); + } + + private SSTableReader writeSortedContents(Future<ReplayPosition> context, File dataDirectory) throws ExecutionException, InterruptedException + { + logger.info("Writing " + Memtable.this.toString()); + + SSTableReader ssTable; + // errors when creating the writer that may leave empty temp files. + SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(dataDirectory))); + try + { + // (we can't clear out the map as-we-go to free up memory, + // since the memtable is being used for queries in the "pending flush" category) + for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet()) + { + ColumnFamily cf = entry.getValue(); + if (cf.isMarkedForDelete()) + { + // Pedantically, you could purge column level tombstones that are past GcGRace when writing to the SSTable. + // But it can result in unexpected behaviour where deletes never make it to disk, + // as they are lost and so cannot override existing column values. So we only remove deleted columns if there + // is a CF level tombstone to ensure the delete makes it into an SSTable. + ColumnFamilyStore.removeDeletedColumnsOnly(cf, Integer.MIN_VALUE); + } + writer.append((DecoratedKey)entry.getKey(), cf); + } + + ssTable = writer.closeAndOpenReader(); + logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s", + ssTable.getFilename(), new File(ssTable.getFilename()).length(), context.get())); + return ssTable; + } + catch (Throwable e) + { + writer.abort(); + throw Throwables.propagate(e); + } + } + + public SSTableWriter createFlushWriter(String filename) throws ExecutionException, InterruptedException + { + SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(context.get()); + return new SSTableWriter(filename, + columnFamilies.size(), + cfs.metadata, + cfs.partitioner, + sstableMetadataCollector); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index ae4c344..fa032d9 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -23,8 +23,9 @@ import java.util.Set; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; +import org.apache.cassandra.io.util.DiskAwareRunnable; -public abstract class AbstractCompactionTask +public abstract class AbstractCompactionTask extends DiskAwareRunnable { protected final ColumnFamilyStore cfs; protected Collection<SSTableReader> sstables; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 7a88c68..4d2a90f 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -43,11 +43,14 @@ public class CompactionTask extends AbstractCompactionTask protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class); protected final int gcBefore; protected static long totalBytesCompacted = 0; + private Set<SSTableReader> toCompact; + private CompactionExecutorStatsCollector collector; public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, final int gcBefore) { super(cfs, sstables); this.gcBefore = gcBefore; + toCompact = new HashSet<SSTableReader>(sstables); } public static synchronized long addToTotalBytesCompacted(long bytesCompacted) @@ -62,37 +65,48 @@ public class CompactionTask extends AbstractCompactionTask */ public int execute(CompactionExecutorStatsCollector collector) { - // The collection of sstables passed may be empty (but not null); even if - // it is not empty, it may compact down to nothing if all rows are deleted. - assert sstables != null; + this.collector = collector; + run(); + return toCompact.size(); + } - Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables); - if (!isCompactionInteresting(toCompact)) - return 0; + public long getExpectedWriteSize() + { + return cfs.getExpectedCompactedFileSize(toCompact, compactionType); + } - File compactionFileLocation = cfs.directories.getDirectoryForNewSSTables(cfs.getExpectedCompactedFileSize(toCompact, compactionType)); - if (compactionFileLocation == null && partialCompactionsAcceptable()) + public boolean reduceScopeForLimitedSpace() + { + if (partialCompactionsAcceptable() && toCompact.size() > 1) { - // If the compaction file path is null that means we have no space left for this compaction. // Try again w/o the largest one. - while (compactionFileLocation == null && toCompact.size() > 1) - { - logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", ")); - // Note that we have removed files that are still marked as compacting. - // This suboptimal but ok since the caller will unmark all the sstables at the end. - toCompact.remove(cfs.getMaxSizeFile(toCompact)); - compactionFileLocation = cfs.directories.getDirectoryForNewSSTables(cfs.getExpectedCompactedFileSize(toCompact, compactionType)); - } + logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", ")); + // Note that we have removed files that are still marked as compacting. + // This suboptimal but ok since the caller will unmark all the sstables at the end. + return toCompact.remove(cfs.getMaxSizeFile(toCompact)); } - - if (compactionFileLocation == null) + else { - logger.warn("insufficient space to compact; aborting compaction"); - return 0; + return false; } + } + + /** + * For internal use and testing only. The rest of the system should go through the submit* methods, + * which are properly serialized. + * Caller is in charge of marking/unmarking the sstables as compacting. + */ + protected void runWith(File dataDirectory) throws Exception + { + // The collection of sstables passed may be empty (but not null); even if + // it is not empty, it may compact down to nothing if all rows are deleted. + assert sstables != null && dataDirectory != null; + + if (!isCompactionInteresting(toCompact)) + return; if (DatabaseDescriptor.isSnapshotBeforeCompaction()) - cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily); + cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.columnFamily); // sanity check: all sstables must belong to the same cfs for (SSTableReader sstable : toCompact) @@ -138,10 +152,10 @@ public class CompactionTask extends AbstractCompactionTask // we need to sync it (via closeAndOpen) first, so there is no period during which // a crash could cause data loss. cfs.markCompacted(toCompact, compactionType); - return 0; + return; } - SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact); + SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, dataDirectory, toCompact); writers.add(writer); while (nni.hasNext()) { @@ -173,7 +187,7 @@ public class CompactionTask extends AbstractCompactionTask sstables.add(toIndex); if (nni.hasNext()) { - writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact); + writer = cfs.createCompactionWriter(keysPerSSTable, dataDirectory, toCompact); writers.add(writer); cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>(); } @@ -225,7 +239,6 @@ public class CompactionTask extends AbstractCompactionTask logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys at %fMB/s. Time: %,dms.", builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime)); logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); - return toCompact.size(); } protected boolean partialCompactionsAcceptable() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/filter/ColumnCounter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java index c642b82..ccf83d9 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java @@ -29,12 +29,15 @@ import org.apache.cassandra.utils.ByteBufferUtil; public class ColumnCounter { - protected int count; + protected int live; + protected int ignored; - public void countColum(IColumn column, IColumnContainer container) + public void count(IColumn column, IColumnContainer container) { - if (isLive(column, container)) - count++; + if (!isLive(column, container)) + ignored++; + else + live++; } protected static boolean isLive(IColumn column, IColumnContainer container) @@ -42,9 +45,14 @@ public class ColumnCounter return column.isLive() && (!container.deletionInfo().isDeleted(column)); } - public int count() + public int live() { - return count; + return live; + } + + public int ignored() + { + return ignored; } public static class GroupByPrefix extends ColumnCounter @@ -71,14 +79,17 @@ public class ColumnCounter assert toGroup == 0 || type != null; } - public void countColum(IColumn column, IColumnContainer container) + public void count(IColumn column, IColumnContainer container) { if (!isLive(column, container)) + { + ignored++; return; + } if (toGroup == 0) { - count = 1; + live = 1; return; } @@ -101,7 +112,7 @@ public class ColumnCounter return; } - count++; + live++; last = current; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index f868539..9b0e3ef 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -148,15 +148,19 @@ public class SliceQueryFilter implements IFilter while (reducedColumns.hasNext()) { - if (columnCounter.count() >= count) + if (columnCounter.live() >= count) + { + logger.debug("Read %s live columns and %s tombstoned", + columnCounter.live(), columnCounter.ignored()); break; + } IColumn column = reducedColumns.next(); - if (logger.isDebugEnabled()) - logger.debug(String.format("collecting %s of %s: %s", - columnCounter.count(), count, column.getString(comparator))); + if (logger.isTraceEnabled()) + logger.trace(String.format("collecting %s of %s: %s", + columnCounter.live(), count, column.getString(comparator))); - columnCounter.countColum(column, container); + columnCounter.count(column, container); // but we need to add all non-gc-able columns to the result for read repair: if (QueryFilter.isRelevant(column, container, gcBefore)) @@ -182,7 +186,7 @@ public class SliceQueryFilter implements IFilter public int lastCounted() { - return columnCounter == null ? 0 : columnCounter.count(); + return columnCounter == null ? 0 : columnCounter.live(); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java new file mode 100644 index 0000000..6c4d95a --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.io.File; + +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.utils.WrappedRunnable; + +public abstract class DiskAwareRunnable extends WrappedRunnable +{ + /** + * Run this task after selecting the optimal disk for it + */ + protected void runMayThrow() throws Exception + { + long writeSize; + Directories.DataDirectory directory; + while (true) + { + writeSize = getExpectedWriteSize(); + directory = Directories.getLocationCapableOfSize(writeSize); + if (directory != null || !reduceScopeForLimitedSpace()) + break; + } + if (directory == null) + throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes"); + + directory.currentTasks.incrementAndGet(); + directory.estimatedWorkingSize.addAndGet(writeSize); + try + { + runWith(directory.location); + } + finally + { + directory.estimatedWorkingSize.addAndGet(-1 * writeSize); + directory.currentTasks.decrementAndGet(); + } + } + + /** + * Executes this task on given {@code dataDirectory}. + * @param dataDirectory data directory to work on + */ + protected abstract void runWith(File dataDirectory) throws Exception; + + /** + * Get expected write size to determine which disk to use for this task. + * @return expected size in bytes this task will write to disk. + */ + public abstract long getExpectedWriteSize(); + + /** + * Called if no disk is available with free space for the full write size. + * @return true if the scope of the task was successfully reduced. + */ + public boolean reduceScopeForLimitedSpace() + { + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index af0cd4f..11e8a24 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -140,8 +140,8 @@ public class CassandraDaemon // check all directories(data, commitlog, saved cache) for existence and permission Iterable<String> dirs = Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()), - Arrays.asList(new String[] {DatabaseDescriptor.getCommitLogLocation(), - DatabaseDescriptor.getSavedCachesLocation()})); + Arrays.asList(DatabaseDescriptor.getCommitLogLocation(), + DatabaseDescriptor.getSavedCachesLocation())); for (String dataDir : dirs) { logger.debug("Checking directory {}", dataDir); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/src/java/org/apache/cassandra/streaming/StreamIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java b/src/java/org/apache/cassandra/streaming/StreamIn.java index eb00e20..a0d605d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamIn.java +++ b/src/java/org/apache/cassandra/streaming/StreamIn.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -79,7 +80,10 @@ public class StreamIn // new local sstable Table table = Table.open(remotedesc.ksname); ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname); - Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.size, Descriptor.Version.CURRENT)); + Directories.DataDirectory localDir = Directories.getLocationCapableOfSize(remote.size); + if (localDir == null) + throw new RuntimeException("Insufficient disk space to store " + remote.size + " bytes"); + Descriptor localdesc = Descriptor.fromFilename(cfStore.getTempSSTablePath(localDir.location)); return new PendingFile(localdesc, remote); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1fef724/test/unit/org/apache/cassandra/config/DefsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java b/test/unit/org/apache/cassandra/config/DefsTest.java index 17595aa..cca87fb 100644 --- a/test/unit/org/apache/cassandra/config/DefsTest.java +++ b/test/unit/org/apache/cassandra/config/DefsTest.java @@ -217,7 +217,6 @@ public class DefsTest extends SchemaLoader ColumnFamilyStore store = Table.open(cfm.ksName).getColumnFamilyStore(cfm.cfName); assert store != null; store.forceBlockingFlush(); - store.getFlushPath(1024, Descriptor.Version.CURRENT); assert store.directories.sstableLister().list().size() > 0; MigrationManager.announceColumnFamilyDrop(ks.name, cfm.cfName);
