improve JBOD disk utilization
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2291a60e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2291a60e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2291a60e Branch: refs/heads/trunk Commit: 2291a60e9eded4486528acc0a8d12a062b21fc26 Parents: 4397c34 Author: Robert Stupp <[email protected]> Authored: Wed Nov 19 16:17:05 2014 -0600 Committer: Yuki Morishita <[email protected]> Committed: Wed Nov 19 17:17:40 2014 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 5 - .../org/apache/cassandra/db/Directories.java | 161 ++++++++++++------- .../db/compaction/CompactionManager.java | 12 +- .../cassandra/db/compaction/Scrubber.java | 5 +- .../cassandra/io/util/DiskAwareRunnable.java | 14 +- .../cassandra/service/StorageService.java | 20 --- .../cassandra/streaming/StreamReader.java | 3 +- .../cassandra/streaming/StreamReceiveTask.java | 8 +- .../apache/cassandra/db/DirectoriesTest.java | 128 +++++++++++++++ 10 files changed, 252 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 41a5aaf..e008ab9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ * Fix overflow on histogram computation (CASSANDRA-8028) * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801) * Fix incremental repair not remove parent session on remote (CASSANDRA-8291) + * Improve JBOD disk utilization (CASSANDRA-7386) Merged from 2.0: * Fix some failing queries that use multi-column relations on COMPACT STORAGE tables (CASSANDRA-8264) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 7e1dd18..dec5370 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2240,11 +2240,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return directories.getSnapshotDetails(); } - public boolean hasUnreclaimedSpace() - { - return getLiveDiskSpaceUsed() < getTotalDiskSpaceUsed(); - } - public long getTotalDiskSpaceUsed() { return metric.totalDiskSpaceUsed.count(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 4319481..eb33bd8 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -29,8 +29,7 @@ import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; @@ -39,8 +38,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Iterables; -import com.google.common.primitives.Longs; -import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -96,7 +93,6 @@ public class Directories dataDirectories[i] = new DataDirectory(new File(locations[i])); } - /** * Checks whether Cassandra has RWX permissions to the specified directory. Logs an error with * the details if it does not. @@ -198,7 +194,7 @@ public class Directories for (int i = 0; i < dataDirectories.length; ++i) { // check if old SSTable directory exists - dataPaths[i] = new File(dataDirectories[i].location, join(metadata.ksName, this.metadata.cfName)); + dataPaths[i] = new File(dataDirectories[i].location, join(metadata.ksName, metadata.cfName)); } boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>() { @@ -237,11 +233,10 @@ public class Directories */ public File getLocationForDisk(DataDirectory dataDirectory) { - for (File dir : dataPaths) - { - if (dir.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath())) - return dir; - } + if (dataDirectory != null) + for (File dir : dataPaths) + if (dir.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath())) + return dir; return null; } @@ -255,65 +250,96 @@ public class Directories return null; } + /** + * Basically the same as calling {@link #getWriteableLocationAsFile(long)} with an unknown size ({@code -1L}), + * which may return any non-blacklisted directory - even a data directory that has no usable space. + * Do not use this method in production code. + * + * @throws IOError if all directories are blacklisted. + */ public File getDirectoryForNewSSTables() { - File path = getWriteableLocationAsFile(); - - // Requesting GC has a chance to free space only if we're using mmap and a non SUN jvm - if (path == null - && (DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap || DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap) - && !FileUtils.isCleanerAvailable()) - { - logger.info("Forcing GC to free up disk space. Upgrade to the Oracle JVM to avoid this"); - StorageService.instance.requestGC(); - // retry after GCing has forced unmap of compacted SSTables so they can be deleted - // Note: GCInspector will do this already, but only sun JVM supports GCInspector so far - SSTableDeletingTask.rescheduleFailedTasks(); - Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); - path = getWriteableLocationAsFile(); - } - - return path; + return getWriteableLocationAsFile(-1L); } - public File getWriteableLocationAsFile() + /** + * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space. + * + * @throws IOError if all directories are blacklisted. + */ + public File getWriteableLocationAsFile(long writeSize) { - return getLocationForDisk(getWriteableLocation()); + return getLocationForDisk(getWriteableLocation(writeSize)); } /** - * @return a non-blacklisted directory with the most free space and least current tasks. + * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space. * * @throws IOError if all directories are blacklisted. */ - public DataDirectory getWriteableLocation() + public DataDirectory getWriteableLocation(long writeSize) { - List<DataDirectory> candidates = new ArrayList<>(); + List<DataDirectoryCandidate> candidates = new ArrayList<>(); + + long totalAvailable = 0L; // pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes. + boolean tooBig = false; for (DataDirectory dataDir : dataDirectories) { if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir))) continue; - candidates.add(dataDir); + DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir); + // exclude directory if its total writeSize does not fit to data directory + if (candidate.availableSpace < writeSize) + { + tooBig = true; + continue; + } + candidates.add(candidate); + totalAvailable += candidate.availableSpace; } if (candidates.isEmpty()) - throw new IOError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out")); + if (tooBig) + return null; + else + throw new IOError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out")); - // sort directories by free space, in _descending_ order. - Collections.sort(candidates); + // shortcut for single data directory systems + if (candidates.size() == 1) + return candidates.get(0).dataDirectory; + + sortWriteableCandidates(candidates, totalAvailable); - // sort directories by load, in _ascending_ order. - Collections.sort(candidates, new Comparator<DataDirectory>() + return pickWriteableDirectory(candidates); + } + + // separated for unit testing + static DataDirectory pickWriteableDirectory(List<DataDirectoryCandidate> candidates) + { + // weighted random + double rnd = ThreadLocalRandom.current().nextDouble(); + for (DataDirectoryCandidate candidate : candidates) { - public int compare(DataDirectory a, DataDirectory b) - { - return a.currentTasks.get() - b.currentTasks.get(); - } - }); + rnd -= candidate.perc; + if (rnd <= 0) + return candidate.dataDirectory; + } + + // last resort + return candidates.get(0).dataDirectory; + } + + // separated for unit testing + static void sortWriteableCandidates(List<DataDirectoryCandidate> candidates, long totalAvailable) + { + // calculate free-space-percentage + for (DataDirectoryCandidate candidate : candidates) + candidate.calcFreePerc(totalAvailable); - return candidates.get(0); + // sort directories by perc + Collections.sort(candidates); } public static File getSnapshotDirectory(Descriptor desc, String snapshotName) @@ -336,31 +362,50 @@ public class Directories return new SSTableLister(); } - public static class DataDirectory implements Comparable<DataDirectory> + public static class DataDirectory { public final File location; - public final AtomicInteger currentTasks = new AtomicInteger(); - public final AtomicLong estimatedWorkingSize = new AtomicLong(); public DataDirectory(File location) { this.location = location; } - /** - * @return estimated available disk space for bounded directory, - * excluding the expected size written by tasks in the queue. - */ - public long getEstimatedAvailableSpace() + public long getAvailableSpace() { - // Load factor of 0.9 we do not want to use the entire disk that is too risky. - return location.getUsableSpace() - estimatedWorkingSize.get(); + return location.getUsableSpace(); } + } + + static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate> + { + final DataDirectory dataDirectory; + final long availableSpace; + double perc; - public int compareTo(DataDirectory o) + public DataDirectoryCandidate(DataDirectory dataDirectory) { - // we want to sort by free space in descending order - return -1 * Longs.compare(getEstimatedAvailableSpace(), o.getEstimatedAvailableSpace()); + this.dataDirectory = dataDirectory; + this.availableSpace = dataDirectory.getAvailableSpace(); + } + + void calcFreePerc(long totalAvailableSpace) + { + double w = availableSpace; + w /= totalAvailableSpace; + perc = w; + } + + public int compareTo(DataDirectoryCandidate o) + { + if (this == o) + return 0; + + int r = Double.compare(perc, o.perc); + if (r != 0) + return -r; + // last resort + return System.identityHashCode(this) - System.identityHashCode(o); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 272b533..61628ff 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -658,9 +658,11 @@ public class CompactionManager implements CompactionManagerMBean { assert !cfs.isIndex(); + Set<SSTableReader> sstableSet = Collections.singleton(sstable); + if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges)) { - cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP); + cfs.getDataTracker().markCompactedSSTablesReplaced(sstableSet, Collections.<SSTableReader>emptyList(), OperationType.CLEANUP); return; } if (!needsCleanup(sstable, ranges)) @@ -674,13 +676,13 @@ public class CompactionManager implements CompactionManagerMBean long totalkeysWritten = 0; int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), - (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable)))); + (int) (SSTableReader.getApproximateKeyCount(sstableSet))); if (logger.isDebugEnabled()) logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize); logger.info("Cleaning up {}", sstable); - File compactionFileLocation = cfs.directories.getDirectoryForNewSSTables(); + File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableSet, OperationType.CLEANUP)); if (compactionFileLocation == null) throw new IOException("disk full"); @@ -691,7 +693,7 @@ public class CompactionManager implements CompactionManagerMBean Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable); SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false); List<SSTableReader> finished; - try (CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs))) + try (CompactionController controller = new CompactionController(cfs, sstableSet, getDefaultGcBefore(cfs))) { writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable)); @@ -994,7 +996,7 @@ public class CompactionManager implements CompactionManagerMBean Set<SSTableReader> sstableAsSet = new HashSet<>(); sstableAsSet.add(sstable); - File destination = cfs.directories.getDirectoryForNewSSTables(); + File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false); SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 0cd71f2..2f53ab9 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -81,12 +81,13 @@ public class Scrubber implements Closeable this.skipCorrupted = skipCorrupted; this.isOffline = isOffline; + List<SSTableReader> toScrub = Collections.singletonList(sstable); + // Calculate the expected compacted filesize - this.destination = cfs.directories.getDirectoryForNewSSTables(); + this.destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB)); if (destination == null) throw new IOException("disk full"); - List<SSTableReader> toScrub = Collections.singletonList(sstable); // If we run scrub offline, we should never purge tombstone, as we cannot know if other sstable have data that the tombstone deletes. this.controller = isOffline ? new ScrubController(cfs) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java index 198a88d..6d453e5 100644 --- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java +++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java @@ -34,24 +34,14 @@ public abstract class DiskAwareRunnable extends WrappedRunnable while (true) { writeSize = getExpectedWriteSize(); - directory = getDirectories().getWriteableLocation(); + directory = getDirectories().getWriteableLocation(writeSize); if (directory != null || !reduceScopeForLimitedSpace()) break; } if (directory == null) throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes"); - directory.currentTasks.incrementAndGet(); - directory.estimatedWorkingSize.addAndGet(writeSize); - try - { - runWith(getDirectories().getLocationForDisk(directory)); - } - finally - { - directory.estimatedWorkingSize.addAndGet(-1 * writeSize); - directory.currentTasks.decrementAndGet(); - } + runWith(getDirectories().getLocationForDisk(directory)); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index ae8c798..79cea8e 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3537,26 +3537,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return isClientMode; } - public synchronized void requestGC() - { - if (hasUnreclaimedSpace()) - { - logger.info("requesting GC to free disk space"); - System.gc(); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - } - } - - private boolean hasUnreclaimedSpace() - { - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - { - if (cfs.hasUnreclaimedSpace()) - return true; - } - return false; - } - public String getOperationMode() { return operationMode.toString(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 3014549..c96a925 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -40,7 +40,6 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableWriter; -import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.messages.FileMessageHeader; import org.apache.cassandra.utils.ByteBufferUtil; @@ -116,7 +115,7 @@ public class StreamReader protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt) throws IOException { - Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(); + Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize); if (localDir == null) throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 33da3d1..aa18954 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -17,6 +17,9 @@ */ package org.apache.cassandra.streaming; +import java.io.File; +import java.io.IOError; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -113,7 +116,10 @@ public class StreamReceiveTask extends StreamTask } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - StreamLockfile lockfile = new StreamLockfile(cfs.directories.getWriteableLocationAsFile(), UUID.randomUUID()); + File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256); + if (lockfiledir == null) + throw new IOError(new IOException("All disks full")); + StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID()); lockfile.create(task.sstables); List<SSTableReader> readers = new ArrayList<>(); for (SSTableWriter writer : task.sstables) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/test/unit/org/apache/cassandra/db/DirectoriesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java index 9e6b26b..34d10d2 100644 --- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java +++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -44,7 +45,10 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ByteBufferUtil; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class DirectoriesTest { @@ -238,4 +242,128 @@ public class DirectoriesTest } } } + + @Test + public void testDiskFreeSpace() + { + DataDirectory[] dataDirectories = new DataDirectory[] + { + new DataDirectory(new File("/nearlyFullDir1")) + { + public long getAvailableSpace() + { + return 11L; + } + }, + new DataDirectory(new File("/nearlyFullDir2")) + { + public long getAvailableSpace() + { + return 10L; + } + }, + new DataDirectory(new File("/uniformDir1")) + { + public long getAvailableSpace() + { + return 1000L; + } + }, + new DataDirectory(new File("/uniformDir2")) + { + public long getAvailableSpace() + { + return 999L; + } + }, + new DataDirectory(new File("/veryFullDir")) + { + public long getAvailableSpace() + { + return 4L; + } + } + }; + + // directories should be sorted + // 1. by their free space ratio + // before weighted random is applied + List<Directories.DataDirectoryCandidate> candidates = getWriteableDirectories(dataDirectories, 0L); + assertSame(dataDirectories[2], candidates.get(0).dataDirectory); // available: 1000 + assertSame(dataDirectories[3], candidates.get(1).dataDirectory); // available: 999 + assertSame(dataDirectories[0], candidates.get(2).dataDirectory); // available: 11 + assertSame(dataDirectories[1], candidates.get(3).dataDirectory); // available: 10 + + // check for writeSize == 5 + Map<DataDirectory, DataDirectory> testMap = new IdentityHashMap<>(); + for (int i=0; ; i++) + { + candidates = getWriteableDirectories(dataDirectories, 5L); + assertEquals(4, candidates.size()); + + DataDirectory dir = Directories.pickWriteableDirectory(candidates); + testMap.put(dir, dir); + + assertFalse(testMap.size() > 4); + if (testMap.size() == 4) + { + // at least (rule of thumb) 100 iterations to see whether there are more (wrong) directories returned + if (i >= 100) + break; + } + + // random weighted writeable directory algorithm fails to return all possible directories after + // many tries + if (i >= 10000000) + fail(); + } + + // check for writeSize == 11 + testMap.clear(); + for (int i=0; ; i++) + { + candidates = getWriteableDirectories(dataDirectories, 11L); + assertEquals(3, candidates.size()); + for (Directories.DataDirectoryCandidate candidate : candidates) + assertTrue(candidate.dataDirectory.getAvailableSpace() >= 11L); + + DataDirectory dir = Directories.pickWriteableDirectory(candidates); + testMap.put(dir, dir); + + assertFalse(testMap.size() > 3); + if (testMap.size() == 3) + { + // at least (rule of thumb) 100 iterations + if (i >= 100) + break; + } + + // random weighted writeable directory algorithm fails to return all possible directories after + // many tries + if (i >= 10000000) + fail(); + } + } + + private List<Directories.DataDirectoryCandidate> getWriteableDirectories(DataDirectory[] dataDirectories, long writeSize) + { + // copied from Directories.getWriteableLocation(long) + List<Directories.DataDirectoryCandidate> candidates = new ArrayList<>(); + + long totalAvailable = 0L; + + for (DataDirectory dataDir : dataDirectories) + { + Directories.DataDirectoryCandidate candidate = new Directories.DataDirectoryCandidate(dataDir); + // exclude directory if its total writeSize does not fit to data directory + if (candidate.availableSpace < writeSize) + continue; + candidates.add(candidate); + totalAvailable += candidate.availableSpace; + } + + Directories.sortWriteableCandidates(candidates, totalAvailable); + + return candidates; + } }
