Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/caeef174
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/caeef174
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/caeef174
Branch: refs/heads/trunk
Commit: caeef1740703a309a28c8621e5b4e8107dc1edb7
Parents: c04c50c 75378c2
Author: Marcus Eriksson <[email protected]>
Authored: Mon Jan 12 18:45:22 2015 +0100
Committer: Marcus Eriksson <[email protected]>
Committed: Mon Jan 12 18:45:22 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/Directories.java | 18 ++++++++++++
.../cassandra/db/compaction/CompactionTask.java | 29 ++++++++++++++++----
.../cassandra/io/util/DiskAwareRunnable.java | 17 +-----------
4 files changed, 43 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/caeef174/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/caeef174/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Directories.java
index 4c4078c,6af3082..fa9b320
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -345,21 -342,27 +345,39 @@@ public class Directorie
Collections.sort(candidates);
}
+ public boolean hasAvailableDiskSpace(long estimatedSSTables, long
expectedTotalWriteSize)
+ {
+ long writeSize = expectedTotalWriteSize / estimatedSSTables;
+ long totalAvailable = 0L;
+
+ for (DataDirectory dataDir : dataDirectories)
+ {
+ if
(BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
+ continue;
+ DataDirectoryCandidate candidate = new
DataDirectoryCandidate(dataDir);
+ // exclude directory if its total writeSize does not fit to data
directory
+ if (candidate.availableSpace < writeSize)
+ continue;
+ totalAvailable += candidate.availableSpace;
+ }
+ return totalAvailable > expectedTotalWriteSize;
+ }
+
public static File getSnapshotDirectory(Descriptor desc, String
snapshotName)
{
- return getOrCreate(desc.directory, SNAPSHOT_SUBDIR, snapshotName);
+ return getSnapshotDirectory(desc.directory, snapshotName);
+ }
+
+ public static File getSnapshotDirectory(File location, String
snapshotName)
+ {
+ if (location.getName().startsWith(SECONDARY_INDEX_NAME_SEPARATOR))
+ {
+ return getOrCreate(location.getParentFile(), SNAPSHOT_SUBDIR,
snapshotName, location.getName());
+ }
+ else
+ {
+ return getOrCreate(location, SNAPSHOT_SUBDIR, snapshotName);
+ }
}
public File getSnapshotManifestFile(String snapshotName)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/caeef174/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 8b5058b,eda09c0..dfbdc22
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -19,7 -19,9 +19,8 @@@ package org.apache.cassandra.db.compact
import java.io.File;
import java.io.IOException;
+ import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@@ -149,10 -151,8 +157,10 @@@ public class CompactionTask extends Abs
Set<SSTableReader> actuallyCompact = Sets.difference(sstables,
controller.getFullyExpiredSSTables());
long estimatedTotalKeys =
Math.max(cfs.metadata.getMinIndexInterval(),
SSTableReader.getApproximateKeyCount(actuallyCompact));
- long estimatedSSTables = Math.max(1,
SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+ long estimatedSSTables = Math.max(1,
cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) /
strategy.getMaxSSTableBytes());
long keysPerSSTable = (long) Math.ceil((double)
estimatedTotalKeys / estimatedSSTables);
+ SSTableFormat.Type sstableFormat = getFormatType(sstables);
+
long expectedSSTableSize = Math.min(getExpectedWriteSize(),
strategy.getMaxSSTableBytes());
logger.debug("Expected bloom filter size : {}", keysPerSSTable);
@@@ -278,14 -278,24 +286,23 @@@
return minRepairedAt;
}
+ protected void checkAvailableDiskSpace(long estimatedSSTables)
+ {
+ while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables,
getExpectedWriteSize()))
+ {
+ if (!reduceScopeForLimitedSpace())
+ throw new RuntimeException(String.format("Not enough space
for compaction, estimated sstables = %d, expected write size = %d",
estimatedSSTables, getExpectedWriteSize()));
+ }
+ }
+
- private SSTableWriter createCompactionWriter(File sstableDirectory, long
keysPerSSTable, long repairedAt)
+ private SSTableWriter createCompactionWriter(File sstableDirectory, long
keysPerSSTable, long repairedAt, SSTableFormat.Type type)
{
- assert sstableDirectory != null;
- return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
- keysPerSSTable,
- repairedAt,
- cfs.metadata,
- cfs.partitioner,
- new MetadataCollector(sstables,
cfs.metadata.comparator, getLevel()));
+ return
SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory),
type),
+ keysPerSSTable,
+ repairedAt,
+ cfs.metadata,
+ cfs.partitioner,
+ new MetadataCollector(sstables, cfs.metadata.comparator,
getLevel()));
}
protected int getLevel()