This is an automated email from the ASF dual-hosted git repository.
leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new adf7baf Fix race between canHandle() and addSegment() in
StorageLocation (#8114)
adf7baf is described below
commit adf7bafb9fda4ce8be47f47b5fbc554e92b4608b
Author: Jihoon Son <[email protected]>
AuthorDate: Sat Jul 27 01:11:06 2019 -0700
Fix race between canHandle() and addSegment() in StorageLocation (#8114)
* Fix race between canHandle() and addSegment() in StorageLocation
* add comment
* add comments
* fix test
* address comments
* remove <p> tag from javadoc
* address comments
* comparingLong
---
.../indexing/worker/IntermediaryDataManager.java | 76 +++++----
.../loading/SegmentLoaderLocalCacheManager.java | 41 +++--
.../druid/segment/loading/StorageLocation.java | 109 ++++++++-----
...mentLoaderLocalCacheManagerConcurrencyTest.java | 181 +++++++++++++++++++++
.../druid/segment/loading/StorageLocationTest.java | 34 ++--
5 files changed, 338 insertions(+), 103 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
index bfd202e..c47425a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
@@ -45,6 +45,7 @@ import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -231,7 +232,6 @@ public class IntermediaryDataManager
* addSegment method.
*/
public void addSegment(String supervisorTaskId, String subTaskId,
DataSegment segment, File segmentFile)
- throws IOException
{
final Iterator<StorageLocation> iterator =
locationIterators.computeIfAbsent(
supervisorTaskId,
@@ -243,7 +243,7 @@ public class IntermediaryDataManager
public List<File> findPartitionFiles(String supervisorTaskId, Interval
interval, int partitionId)
{
for (StorageLocation location : shuffleDataLocations) {
- final File partitionDir = getPartitionDir(location, supervisorTaskId,
interval, partitionId);
+ final File partitionDir = new File(location.getPath(),
getPartitionDir(supervisorTaskId, interval, partitionId));
if (partitionDir.exists()) {
supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc());
final File[] segmentFiles = partitionDir.listFiles();
@@ -279,53 +279,65 @@ public class IntermediaryDataManager
String subTaskId,
DataSegment segment,
File segmentFile
- ) throws IOException
- {
- final StorageLocation location = findLocationForSegment(cyclicIterator,
numLocations, segment);
- final File destFile = new File(
- getPartitionDir(location, supervisorTaskId, segment.getInterval(),
segment.getShardSpec().getPartitionNum()),
- subTaskId
- );
- FileUtils.forceMkdirParent(destFile);
- final long copiedBytes =
Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
- if (copiedBytes == 0) {
- throw new IOE(
- "0 bytes copied after copying a segment file from [%s] to [%s]",
- segmentFile.getAbsolutePath(),
- destFile.getAbsolutePath()
- );
- }
- location.addFile(destFile);
- }
-
- private static StorageLocation findLocationForSegment(
- Iterator<StorageLocation> cyclicIterator,
- int numLocations,
- DataSegment segment
)
{
for (int i = 0; i < numLocations; i++) {
final StorageLocation location = cyclicIterator.next();
- if (location.canHandle(segment)) {
- return location;
+ final File destFile = location.reserve(
+ getPartitionFilePath(
+ supervisorTaskId,
+ subTaskId,
+ segment.getInterval(),
+ segment.getShardSpec().getPartitionNum()
+ ),
+ segment.getId(),
+ segmentFile.length()
+ );
+ if (destFile != null) {
+ try {
+ FileUtils.forceMkdirParent(destFile);
+ final long copiedBytes =
Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
+ if (copiedBytes == 0) {
+ throw new IOE(
+ "0 bytes copied after copying a segment file from [%s] to
[%s]",
+ segmentFile.getAbsolutePath(),
+ destFile.getAbsolutePath()
+ );
+ } else {
+ return;
+ }
+ }
+ catch (IOException e) {
+ // Only log here to try other locations as well.
+ log.warn(e, "Failed to write segmentFile at [%s]", destFile);
+ location.removeFile(segmentFile);
+ }
}
}
throw new ISE("Can't find location to handle segment[%s]", segment);
}
- private static File getPartitionDir(
- StorageLocation location,
+ private static String getPartitionFilePath(
+ String supervisorTaskId,
+ String subTaskId,
+ Interval interval,
+ int partitionId
+ )
+ {
+ return Paths.get(getPartitionDir(supervisorTaskId, interval, partitionId),
subTaskId).toString();
+ }
+
+ private static String getPartitionDir(
String supervisorTaskId,
Interval interval,
int partitionId
)
{
- return FileUtils.getFile(
- location.getPath(),
+ return Paths.get(
supervisorTaskId,
interval.getStart().toString(),
interval.getEnd().toString(),
String.valueOf(partitionId)
- );
+ ).toString();
}
}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index b25c216..e0f0f0b 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -21,7 +21,6 @@ package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.apache.druid.guice.annotations.Json;
@@ -43,8 +42,9 @@ import java.util.concurrent.ConcurrentHashMap;
public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
private static final EmittingLogger log = new
EmittingLogger(SegmentLoaderLocalCacheManager.class);
- private static final Comparator<StorageLocation> COMPARATOR = (left, right)
->
- Longs.compare(right.available(), left.available());
+ private static final Comparator<StorageLocation> COMPARATOR = Comparator
+ .comparingLong(StorageLocation::availableSizeBytes)
+ .reversed();
private final IndexIO indexIO;
private final SegmentLoaderConfig config;
@@ -163,9 +163,7 @@ public class SegmentLoaderLocalCacheManager implements
SegmentLoader
if (loc == null) {
loc = loadSegmentWithRetry(segment, storageDir);
}
- final File localStorageDir = new File(loc.getPath(), storageDir);
- loc.addSegmentDir(localStorageDir, segment);
- return localStorageDir;
+ return new File(loc.getPath(), storageDir);
}
finally {
unlock(segment, lock);
@@ -181,23 +179,24 @@ public class SegmentLoaderLocalCacheManager implements
SegmentLoader
private StorageLocation loadSegmentWithRetry(DataSegment segment, String
storageDirStr) throws SegmentLoadingException
{
for (StorageLocation loc : locations) {
- if (loc.canHandle(segment)) {
- File storageDir = new File(loc.getPath(), storageDirStr);
-
+ File storageDir = loc.reserve(storageDirStr, segment);
+ if (storageDir != null) {
try {
loadInLocationWithStartMarker(segment, storageDir);
return loc;
}
catch (SegmentLoadingException e) {
- log.makeAlert(
- e,
- "Failed to load segment in current location %s, try next
location if any",
- loc.getPath().getAbsolutePath()
- )
- .addData("location", loc.getPath().getAbsolutePath())
- .emit();
-
- cleanupCacheFiles(loc.getPath(), storageDir);
+ try {
+ log.makeAlert(
+ e,
+ "Failed to load segment in current location [%s], try next
location if any",
+ loc.getPath().getAbsolutePath()
+ ).addData("location", loc.getPath().getAbsolutePath()).emit();
+ }
+ finally {
+ loc.removeSegmentDir(storageDir, segment);
+ cleanupCacheFiles(loc.getPath(), storageDir);
+ }
}
}
}
@@ -366,4 +365,10 @@ public class SegmentLoaderLocalCacheManager implements
SegmentLoader
{
return segmentLocks;
}
+
+ @VisibleForTesting
+ public List<StorageLocation> getLocations()
+ {
+ return locations;
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index 4fb4c4e..842295e 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -19,33 +19,51 @@
package org.apache.druid.segment.loading;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import java.io.File;
import java.util.HashSet;
import java.util.Set;
/**
+ * This class is a very simple logical representation of a local path. It
keeps track of files stored under the
+ * {@link #path} via {@link #reserve}, so that the total size of stored files
doesn't exceed the {@link #maxSizeBytes}
+ * and available space is always kept smaller than {@link #freeSpaceToKeep}.
+ *
+ * This class is thread-safe, so that multiple threads can update its state at
the same time.
+ * One example usage is that a historical can use multiple threads to load
different segments in parallel
+ * from deep storage.
*/
public class StorageLocation
{
private static final Logger log = new Logger(StorageLocation.class);
private final File path;
- private final long maxSize;
+ private final long maxSizeBytes;
private final long freeSpaceToKeep;
+
+ /**
+ * Set of files stored under the {@link #path}.
+ */
+ @GuardedBy("this")
private final Set<File> files = new HashSet<>();
- private volatile long currSize = 0;
+ /**
+ * Current total size of files in bytes.
+ */
+ @GuardedBy("this")
+ private long currSizeBytes = 0;
- public StorageLocation(File path, long maxSize, @Nullable Double
freeSpacePercent)
+ public StorageLocation(File path, long maxSizeBytes, @Nullable Double
freeSpacePercent)
{
this.path = path;
- this.maxSize = maxSize;
+ this.maxSizeBytes = maxSizeBytes;
if (freeSpacePercent != null) {
long totalSpaceInPartition = path.getTotalSpace();
@@ -66,73 +84,86 @@ public class StorageLocation
return path;
}
- public long getMaxSize()
- {
- return maxSize;
- }
-
/**
- * Add a new file to this location. The given file argument must be a file
rather than directory.
+ * Remove a segment file from this location. The given file argument must be
a file rather than directory.
*/
- public synchronized void addFile(File file)
+ public synchronized void removeFile(File file)
{
- if (file.isDirectory()) {
- throw new ISE("[%s] must be a file. Use a");
- }
- if (files.add(file)) {
- currSize += FileUtils.sizeOf(file);
+ if (files.remove(file)) {
+ currSizeBytes -= FileUtils.sizeOf(file);
+ } else {
+ log.warn("File[%s] is not found under this location[%s]", file, path);
}
}
/**
- * Add a new segment dir to this location. The segment size is added to
currSize.
+ * Remove a segment dir from this location. The segment size is subtracted
from currSizeBytes.
*/
- public synchronized void addSegmentDir(File segmentDir, DataSegment segment)
+ public synchronized void removeSegmentDir(File segmentDir, DataSegment
segment)
{
- if (files.add(segmentDir)) {
- currSize += segment.getSize();
+ if (files.remove(segmentDir)) {
+ currSizeBytes -= segment.getSize();
+ } else {
+ log.warn("SegmentDir[%s] is not found under this location[%s]",
segmentDir, path);
}
}
/**
- * Remove a segment file from this location. The given file argument must be
a file rather than directory.
+ * Reserves space to store the given segment. The segment size is added to
currSizeBytes.
+ * If it succeeds, it returns a file for the given segmentDir in this
storage location. Returns null otherwise.
*/
- public synchronized void removeFile(File file)
+ @Nullable
+ public synchronized File reserve(String segmentDir, DataSegment segment)
{
- if (files.remove(file)) {
- currSize -= FileUtils.sizeOf(file);
- }
+ return reserve(segmentDir, segment.getId(), segment.getSize());
}
/**
- * Remove a segment dir from this location. The segment size is subtracted
from currSize.
+ * Reserves space to store the given segment.
+ * If it succeeds, it returns a file for the given segmentFilePathToAdd in
this storage location.
+ * Returns null otherwise.
*/
- public synchronized void removeSegmentDir(File segmentDir, DataSegment
segment)
+ @Nullable
+ public synchronized File reserve(String segmentFilePathToAdd, SegmentId
segmentId, long segmentSize)
{
- if (files.remove(segmentDir)) {
- currSize -= segment.getSize();
+ final File segmentFileToAdd = new File(path, segmentFilePathToAdd);
+ if (files.contains(segmentFileToAdd)) {
+ return null;
+ }
+ if (canHandle(segmentId, segmentSize)) {
+ files.add(segmentFileToAdd);
+ currSizeBytes += segmentSize;
+ return segmentFileToAdd;
+ } else {
+ return null;
}
}
- public boolean canHandle(DataSegment segment)
+ /**
+ * This method is only package-private to use it in unit tests. Production
code must not call this method directly.
+ * Use {@link #reserve} instead.
+ */
+ @VisibleForTesting
+ @GuardedBy("this")
+ boolean canHandle(SegmentId segmentId, long segmentSize)
{
- if (available() < segment.getSize()) {
+ if (availableSizeBytes() < segmentSize) {
log.warn(
"Segment[%s:%,d] too large for storage[%s:%,d]. Check your
druid.segmentCache.locations maxSize param",
- segment.getId(), segment.getSize(), getPath(), available()
+ segmentId, segmentSize, getPath(), availableSizeBytes()
);
return false;
}
if (freeSpaceToKeep > 0) {
long currFreeSpace = path.getFreeSpace();
- if ((freeSpaceToKeep + segment.getSize()) > currFreeSpace) {
+ if ((freeSpaceToKeep + segmentSize) > currFreeSpace) {
log.warn(
"Segment[%s:%,d] too large for storage[%s:%,d] to maintain
suggested freeSpace[%d], current freeSpace is [%d].",
- segment.getId(),
- segment.getSize(),
+ segmentId,
+ segmentSize,
getPath(),
- available(),
+ availableSizeBytes(),
freeSpaceToKeep,
currFreeSpace
);
@@ -143,8 +174,8 @@ public class StorageLocation
return true;
}
- public synchronized long available()
+ public synchronized long availableSizeBytes()
{
- return maxSize - currSize;
+ return maxSizeBytes - currSizeBytes;
}
}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
new file mode 100644
index 0000000..cb08217
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.hamcrest.CoreMatchers;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+public class SegmentLoaderLocalCacheManagerConcurrencyTest
+{
+ @Rule
+ public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ private final ObjectMapper jsonMapper;
+ private final String dataSource = "test_ds";
+ private final String segmentVersion;
+
+ private File localSegmentCacheFolder;
+ private SegmentLoaderLocalCacheManager manager;
+ private ExecutorService executorService;
+
+ public SegmentLoaderLocalCacheManagerConcurrencyTest()
+ {
+ jsonMapper = new DefaultObjectMapper();
+ jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
+ jsonMapper.setInjectableValues(
+ new InjectableValues.Std().addValue(
+ LocalDataSegmentPuller.class,
+ new LocalDataSegmentPuller()
+ )
+ );
+ segmentVersion = DateTimes.nowUtc().toString();
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ EmittingLogger.registerEmitter(new NoopServiceEmitter());
+ localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder");
+
+ final List<StorageLocationConfig> locations = new ArrayList<>();
+ // Each segment has the size of 1000 bytes. This deep storage is capable
of storing up to 2 segments.
+ final StorageLocationConfig locationConfig = new
StorageLocationConfig(localSegmentCacheFolder, 2000L, null);
+ locations.add(locationConfig);
+
+ manager = new SegmentLoaderLocalCacheManager(
+ TestHelper.getTestIndexIO(),
+ new SegmentLoaderConfig().withLocations(locations),
+ jsonMapper
+ );
+ executorService = Execs.multiThreaded(4,
"segment-loader-local-cache-manager-concurrency-test-%d");
+ }
+
+ @After
+ public void tearDown()
+ {
+ executorService.shutdownNow();
+ }
+
+ @Test
+ public void testGetSegment() throws IOException, ExecutionException,
InterruptedException
+ {
+ final File localStorageFolder =
tmpFolder.newFolder("local_storage_folder");
+ final List<DataSegment> segmentsToLoad = new ArrayList<>(4);
+
+ final Interval interval = Intervals.of("2019-01-01/P1D");
+ for (int partitionId = 0; partitionId < 4; partitionId++) {
+ final String segmentPath = Paths.get(
+ localStorageFolder.getCanonicalPath(),
+ dataSource,
+ StringUtils.format("%s_%s", interval.getStart().toString(),
interval.getEnd().toString()),
+ segmentVersion,
+ String.valueOf(partitionId)
+ ).toString();
+ // manually create a local segment under localStorageFolder
+ final File localSegmentFile = new File(
+ localStorageFolder,
+ segmentPath
+ );
+ localSegmentFile.mkdirs();
+ final File indexZip = new File(localSegmentFile, "index.zip");
+ indexZip.createNewFile();
+
+ final DataSegment segment = newSegment(interval,
partitionId).withLoadSpec(
+ ImmutableMap.of(
+ "type",
+ "local",
+ "path",
+ localSegmentFile.getAbsolutePath()
+ )
+ );
+ segmentsToLoad.add(segment);
+ }
+
+ final List<Future> futures = segmentsToLoad
+ .stream()
+ .map(segment -> executorService.submit(() ->
manager.getSegmentFiles(segment)))
+ .collect(Collectors.toList());
+
+ expectedException.expect(ExecutionException.class);
+
expectedException.expectCause(CoreMatchers.instanceOf(SegmentLoadingException.class));
+ expectedException.expectMessage("Failed to load segment");
+ for (Future future : futures) {
+ future.get();
+ }
+
+ System.out.println(manager.getLocations().get(0).availableSizeBytes());
+ }
+
+ private DataSegment newSegment(Interval interval, int partitionId)
+ {
+ return DataSegment.builder()
+ .dataSource(dataSource)
+ .interval(interval)
+ .loadSpec(
+ ImmutableMap.of(
+ "type",
+ "local",
+ "path",
+ "somewhere"
+ )
+ )
+ .version(segmentVersion)
+ .dimensions(ImmutableList.of())
+ .metrics(ImmutableList.of())
+ .shardSpec(new NumberedShardSpec(partitionId, 0))
+ .binaryVersion(9)
+ .size(1000L)
+ .build();
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
index cdfcd47..23da422 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.loading;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
@@ -38,18 +39,18 @@ public class StorageLocationTest
{
// free space ignored only maxSize matters
StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null);
- Assert.assertTrue(locationPlain.canHandle(makeSegment("2012/2013",
9_000)));
- Assert.assertFalse(locationPlain.canHandle(makeSegment("2012/2013",
11_000)));
+ Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013"),
9_000));
+ Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013"),
11_000));
// enough space available maxSize is the limit
StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0);
- Assert.assertTrue(locationFree.canHandle(makeSegment("2012/2013", 9_000)));
- Assert.assertFalse(locationFree.canHandle(makeSegment("2012/2013",
11_000)));
+ Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013"),
9_000));
+ Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013"),
11_000));
// disk almost full percentage is the limit
StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0);
- Assert.assertTrue(locationFull.canHandle(makeSegment("2012/2013", 4_000)));
- Assert.assertFalse(locationFull.canHandle(makeSegment("2012/2013",
6_000)));
+ Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013"),
4_000));
+ Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013"),
6_000));
}
private StorageLocation fakeLocation(long total, long free, long max, Double
percent)
@@ -71,34 +72,34 @@ public class StorageLocationTest
final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23);
- loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02",
10));
+ loc.reserve("test1", makeSegment("2012-01-01/2012-01-02", 10));
expectedAvail -= 10;
verifyLoc(expectedAvail, loc);
- loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02",
10));
+ loc.reserve("test1", makeSegment("2012-01-01/2012-01-02", 10));
verifyLoc(expectedAvail, loc);
- loc.addSegmentDir(new File("test2"), secondSegment);
+ loc.reserve("test2", secondSegment);
expectedAvail -= 23;
verifyLoc(expectedAvail, loc);
- loc.removeSegmentDir(new File("test1"),
makeSegment("2012-01-01/2012-01-02", 10));
+ loc.removeSegmentDir(new File("/tmp/test1"),
makeSegment("2012-01-01/2012-01-02", 10));
expectedAvail += 10;
verifyLoc(expectedAvail, loc);
- loc.removeSegmentDir(new File("test1"),
makeSegment("2012-01-01/2012-01-02", 10));
+ loc.removeSegmentDir(new File("/tmp/test1"),
makeSegment("2012-01-01/2012-01-02", 10));
verifyLoc(expectedAvail, loc);
- loc.removeSegmentDir(new File("test2"), secondSegment);
+ loc.removeSegmentDir(new File("/tmp/test2"), secondSegment);
expectedAvail += 23;
verifyLoc(expectedAvail, loc);
}
private void verifyLoc(long maxSize, StorageLocation loc)
{
- Assert.assertEquals(maxSize, loc.available());
+ Assert.assertEquals(maxSize, loc.availableSizeBytes());
for (int i = 0; i <= maxSize; ++i) {
- Assert.assertTrue(String.valueOf(i),
loc.canHandle(makeSegment("2013/2014", i)));
+ Assert.assertTrue(String.valueOf(i),
loc.canHandle(newSegmentId("2013/2014"), i));
}
}
@@ -116,4 +117,9 @@ public class StorageLocationTest
size
);
}
+
+ private SegmentId newSegmentId(String intervalString)
+ {
+ return SegmentId.of("test", Intervals.of(intervalString), "1", 0);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]