This is an automated email from the ASF dual-hosted git repository.

leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new adf7baf  Fix race between canHandle() and addSegment() in 
StorageLocation (#8114)
adf7baf is described below

commit adf7bafb9fda4ce8be47f47b5fbc554e92b4608b
Author: Jihoon Son <[email protected]>
AuthorDate: Sat Jul 27 01:11:06 2019 -0700

    Fix race between canHandle() and addSegment() in StorageLocation (#8114)
    
    * Fix race between canHandle() and addSegment() in StorageLocation
    
    * add comment
    
    * add comments
    
    * fix test
    
    * address comments
    
    * remove <p> tag from javadoc
    
    * address comments
    
    * comparingLong
---
 .../indexing/worker/IntermediaryDataManager.java   |  76 +++++----
 .../loading/SegmentLoaderLocalCacheManager.java    |  41 +++--
 .../druid/segment/loading/StorageLocation.java     | 109 ++++++++-----
 ...mentLoaderLocalCacheManagerConcurrencyTest.java | 181 +++++++++++++++++++++
 .../druid/segment/loading/StorageLocationTest.java |  34 ++--
 5 files changed, 338 insertions(+), 103 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
index bfd202e..c47425a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
@@ -45,6 +45,7 @@ import org.joda.time.Period;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -231,7 +232,6 @@ public class IntermediaryDataManager
    * addSegment method.
    */
   public void addSegment(String supervisorTaskId, String subTaskId, 
DataSegment segment, File segmentFile)
-      throws IOException
   {
     final Iterator<StorageLocation> iterator = 
locationIterators.computeIfAbsent(
         supervisorTaskId,
@@ -243,7 +243,7 @@ public class IntermediaryDataManager
   public List<File> findPartitionFiles(String supervisorTaskId, Interval 
interval, int partitionId)
   {
     for (StorageLocation location : shuffleDataLocations) {
-      final File partitionDir = getPartitionDir(location, supervisorTaskId, 
interval, partitionId);
+      final File partitionDir = new File(location.getPath(), 
getPartitionDir(supervisorTaskId, interval, partitionId));
       if (partitionDir.exists()) {
         supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc());
         final File[] segmentFiles = partitionDir.listFiles();
@@ -279,53 +279,65 @@ public class IntermediaryDataManager
       String subTaskId,
       DataSegment segment,
       File segmentFile
-  ) throws IOException
-  {
-    final StorageLocation location = findLocationForSegment(cyclicIterator, 
numLocations, segment);
-    final File destFile = new File(
-        getPartitionDir(location, supervisorTaskId, segment.getInterval(), 
segment.getShardSpec().getPartitionNum()),
-        subTaskId
-    );
-    FileUtils.forceMkdirParent(destFile);
-    final long copiedBytes = 
Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
-    if (copiedBytes == 0) {
-      throw new IOE(
-          "0 bytes copied after copying a segment file from [%s] to [%s]",
-          segmentFile.getAbsolutePath(),
-          destFile.getAbsolutePath()
-      );
-    }
-    location.addFile(destFile);
-  }
-
-  private static StorageLocation findLocationForSegment(
-      Iterator<StorageLocation> cyclicIterator,
-      int numLocations,
-      DataSegment segment
   )
   {
     for (int i = 0; i < numLocations; i++) {
       final StorageLocation location = cyclicIterator.next();
-      if (location.canHandle(segment)) {
-        return location;
+      final File destFile = location.reserve(
+          getPartitionFilePath(
+              supervisorTaskId,
+              subTaskId,
+              segment.getInterval(),
+              segment.getShardSpec().getPartitionNum()
+          ),
+          segment.getId(),
+          segmentFile.length()
+      );
+      if (destFile != null) {
+        try {
+          FileUtils.forceMkdirParent(destFile);
+          final long copiedBytes = 
Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
+          if (copiedBytes == 0) {
+            throw new IOE(
+                "0 bytes copied after copying a segment file from [%s] to 
[%s]",
+                segmentFile.getAbsolutePath(),
+                destFile.getAbsolutePath()
+            );
+          } else {
+            return;
+          }
+        }
+        catch (IOException e) {
+          // Only log here to try other locations as well.
+          log.warn(e, "Failed to write segmentFile at [%s]", destFile);
+          location.removeFile(segmentFile);
+        }
       }
     }
     throw new ISE("Can't find location to handle segment[%s]", segment);
   }
 
-  private static File getPartitionDir(
-      StorageLocation location,
+  private static String getPartitionFilePath(
+      String supervisorTaskId,
+      String subTaskId,
+      Interval interval,
+      int partitionId
+  )
+  {
+    return Paths.get(getPartitionDir(supervisorTaskId, interval, partitionId), 
subTaskId).toString();
+  }
+
+  private static String getPartitionDir(
       String supervisorTaskId,
       Interval interval,
       int partitionId
   )
   {
-    return FileUtils.getFile(
-        location.getPath(),
+    return Paths.get(
         supervisorTaskId,
         interval.getStart().toString(),
         interval.getEnd().toString(),
         String.valueOf(partitionId)
-    );
+    ).toString();
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index b25c216..e0f0f0b 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -21,7 +21,6 @@ package org.apache.druid.segment.loading;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.primitives.Longs;
 import com.google.inject.Inject;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.guice.annotations.Json;
@@ -43,8 +42,9 @@ import java.util.concurrent.ConcurrentHashMap;
 public class SegmentLoaderLocalCacheManager implements SegmentLoader
 {
   private static final EmittingLogger log = new 
EmittingLogger(SegmentLoaderLocalCacheManager.class);
-  private static final Comparator<StorageLocation> COMPARATOR = (left, right) 
->
-      Longs.compare(right.available(), left.available());
+  private static final Comparator<StorageLocation> COMPARATOR = Comparator
+      .comparingLong(StorageLocation::availableSizeBytes)
+      .reversed();
 
   private final IndexIO indexIO;
   private final SegmentLoaderConfig config;
@@ -163,9 +163,7 @@ public class SegmentLoaderLocalCacheManager implements 
SegmentLoader
         if (loc == null) {
           loc = loadSegmentWithRetry(segment, storageDir);
         }
-        final File localStorageDir = new File(loc.getPath(), storageDir);
-        loc.addSegmentDir(localStorageDir, segment);
-        return localStorageDir;
+        return new File(loc.getPath(), storageDir);
       }
       finally {
         unlock(segment, lock);
@@ -181,23 +179,24 @@ public class SegmentLoaderLocalCacheManager implements 
SegmentLoader
   private StorageLocation loadSegmentWithRetry(DataSegment segment, String 
storageDirStr) throws SegmentLoadingException
   {
     for (StorageLocation loc : locations) {
-      if (loc.canHandle(segment)) {
-        File storageDir = new File(loc.getPath(), storageDirStr);
-
+      File storageDir = loc.reserve(storageDirStr, segment);
+      if (storageDir != null) {
         try {
           loadInLocationWithStartMarker(segment, storageDir);
           return loc;
         }
         catch (SegmentLoadingException e) {
-          log.makeAlert(
-              e,
-              "Failed to load segment in current location %s, try next 
location if any",
-              loc.getPath().getAbsolutePath()
-          )
-             .addData("location", loc.getPath().getAbsolutePath())
-             .emit();
-
-          cleanupCacheFiles(loc.getPath(), storageDir);
+          try {
+            log.makeAlert(
+                e,
+                "Failed to load segment in current location [%s], try next 
location if any",
+                loc.getPath().getAbsolutePath()
+            ).addData("location", loc.getPath().getAbsolutePath()).emit();
+          }
+          finally {
+            loc.removeSegmentDir(storageDir, segment);
+            cleanupCacheFiles(loc.getPath(), storageDir);
+          }
         }
       }
     }
@@ -366,4 +365,10 @@ public class SegmentLoaderLocalCacheManager implements 
SegmentLoader
   {
     return segmentLocks;
   }
+
+  @VisibleForTesting
+  public List<StorageLocation> getLocations()
+  {
+    return locations;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java 
b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index 4fb4c4e..842295e 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -19,33 +19,51 @@
 
 package org.apache.druid.segment.loading;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.FileUtils;
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import java.io.File;
 import java.util.HashSet;
 import java.util.Set;
 
 /**
+ * This class is a very simple logical representation of a local path. It 
keeps track of files stored under the
+ * {@link #path} via {@link #reserve}, so that the total size of stored files 
doesn't exceed the {@link #maxSizeBytes}
+ * and available space is always kept smaller than {@link #freeSpaceToKeep}.
+ *
+ * This class is thread-safe, so that multiple threads can update its state at 
the same time.
+ * One example usage is that a historical can use multiple threads to load 
different segments in parallel
+ * from deep storage.
 */
 public class StorageLocation
 {
   private static final Logger log = new Logger(StorageLocation.class);
 
   private final File path;
-  private final long maxSize;
+  private final long maxSizeBytes;
   private final long freeSpaceToKeep;
+
+  /**
+   * Set of files stored under the {@link #path}.
+   */
+  @GuardedBy("this")
   private final Set<File> files = new HashSet<>();
 
-  private volatile long currSize = 0;
+  /**
+   * Current total size of files in bytes.
+   */
+  @GuardedBy("this")
+  private long currSizeBytes = 0;
 
-  public StorageLocation(File path, long maxSize, @Nullable Double 
freeSpacePercent)
+  public StorageLocation(File path, long maxSizeBytes, @Nullable Double 
freeSpacePercent)
   {
     this.path = path;
-    this.maxSize = maxSize;
+    this.maxSizeBytes = maxSizeBytes;
 
     if (freeSpacePercent != null) {
       long totalSpaceInPartition = path.getTotalSpace();
@@ -66,73 +84,86 @@ public class StorageLocation
     return path;
   }
 
-  public long getMaxSize()
-  {
-    return maxSize;
-  }
-
   /**
-   * Add a new file to this location. The given file argument must be a file 
rather than directory.
+   * Remove a segment file from this location. The given file argument must be 
a file rather than directory.
    */
-  public synchronized void addFile(File file)
+  public synchronized void removeFile(File file)
   {
-    if (file.isDirectory()) {
-      throw new ISE("[%s] must be a file. Use a");
-    }
-    if (files.add(file)) {
-      currSize += FileUtils.sizeOf(file);
+    if (files.remove(file)) {
+      currSizeBytes -= FileUtils.sizeOf(file);
+    } else {
+      log.warn("File[%s] is not found under this location[%s]", file, path);
     }
   }
 
   /**
-   * Add a new segment dir to this location. The segment size is added to 
currSize.
+   * Remove a segment dir from this location. The segment size is subtracted 
from currSizeBytes.
    */
-  public synchronized void addSegmentDir(File segmentDir, DataSegment segment)
+  public synchronized void removeSegmentDir(File segmentDir, DataSegment 
segment)
   {
-    if (files.add(segmentDir)) {
-      currSize += segment.getSize();
+    if (files.remove(segmentDir)) {
+      currSizeBytes -= segment.getSize();
+    } else {
+      log.warn("SegmentDir[%s] is not found under this location[%s]", 
segmentDir, path);
     }
   }
 
   /**
-   * Remove a segment file from this location. The given file argument must be 
a file rather than directory.
+   * Reserves space to store the given segment. The segment size is added to 
currSizeBytes.
+   * If it succeeds, it returns a file for the given segmentDir in this 
storage location. Returns null otherwise.
    */
-  public synchronized void removeFile(File file)
+  @Nullable
+  public synchronized File reserve(String segmentDir, DataSegment segment)
   {
-    if (files.remove(file)) {
-      currSize -= FileUtils.sizeOf(file);
-    }
+    return reserve(segmentDir, segment.getId(), segment.getSize());
   }
 
   /**
-   * Remove a segment dir from this location. The segment size is subtracted 
from currSize.
+   * Reserves space to store the given segment.
+   * If it succeeds, it returns a file for the given segmentFilePathToAdd in 
this storage location.
+   * Returns null otherwise.
    */
-  public synchronized void removeSegmentDir(File segmentDir, DataSegment 
segment)
+  @Nullable
+  public synchronized File reserve(String segmentFilePathToAdd, SegmentId 
segmentId, long segmentSize)
   {
-    if (files.remove(segmentDir)) {
-      currSize -= segment.getSize();
+    final File segmentFileToAdd = new File(path, segmentFilePathToAdd);
+    if (files.contains(segmentFileToAdd)) {
+      return null;
+    }
+    if (canHandle(segmentId, segmentSize)) {
+      files.add(segmentFileToAdd);
+      currSizeBytes += segmentSize;
+      return segmentFileToAdd;
+    } else {
+      return null;
     }
   }
 
-  public boolean canHandle(DataSegment segment)
+  /**
+   * This method is only package-private to use it in unit tests. Production 
code must not call this method directly.
+   * Use {@link #reserve} instead.
+   */
+  @VisibleForTesting
+  @GuardedBy("this")
+  boolean canHandle(SegmentId segmentId, long segmentSize)
   {
-    if (available() < segment.getSize()) {
+    if (availableSizeBytes() < segmentSize) {
       log.warn(
           "Segment[%s:%,d] too large for storage[%s:%,d]. Check your 
druid.segmentCache.locations maxSize param",
-          segment.getId(), segment.getSize(), getPath(), available()
+          segmentId, segmentSize, getPath(), availableSizeBytes()
       );
       return false;
     }
 
     if (freeSpaceToKeep > 0) {
       long currFreeSpace = path.getFreeSpace();
-      if ((freeSpaceToKeep + segment.getSize()) > currFreeSpace) {
+      if ((freeSpaceToKeep + segmentSize) > currFreeSpace) {
         log.warn(
             "Segment[%s:%,d] too large for storage[%s:%,d] to maintain 
suggested freeSpace[%d], current freeSpace is [%d].",
-            segment.getId(),
-            segment.getSize(),
+            segmentId,
+            segmentSize,
             getPath(),
-            available(),
+            availableSizeBytes(),
             freeSpaceToKeep,
             currFreeSpace
         );
@@ -143,8 +174,8 @@ public class StorageLocation
     return true;
   }
 
-  public synchronized long available()
+  public synchronized long availableSizeBytes()
   {
-    return maxSize - currSize;
+    return maxSizeBytes - currSizeBytes;
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
new file mode 100644
index 0000000..cb08217
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.hamcrest.CoreMatchers;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+public class SegmentLoaderLocalCacheManagerConcurrencyTest
+{
+  @Rule
+  public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  private final ObjectMapper jsonMapper;
+  private final String dataSource = "test_ds";
+  private final String segmentVersion;
+
+  private File localSegmentCacheFolder;
+  private SegmentLoaderLocalCacheManager manager;
+  private ExecutorService executorService;
+
+  public SegmentLoaderLocalCacheManagerConcurrencyTest()
+  {
+    jsonMapper = new DefaultObjectMapper();
+    jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
+    jsonMapper.setInjectableValues(
+        new InjectableValues.Std().addValue(
+            LocalDataSegmentPuller.class,
+            new LocalDataSegmentPuller()
+        )
+    );
+    segmentVersion = DateTimes.nowUtc().toString();
+  }
+
+  @Before
+  public void setUp() throws Exception
+  {
+    EmittingLogger.registerEmitter(new NoopServiceEmitter());
+    localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder");
+
+    final List<StorageLocationConfig> locations = new ArrayList<>();
+    // Each segment has the size of 1000 bytes. This deep storage is capable 
of storing up to 2 segments.
+    final StorageLocationConfig locationConfig = new 
StorageLocationConfig(localSegmentCacheFolder, 2000L, null);
+    locations.add(locationConfig);
+
+    manager = new SegmentLoaderLocalCacheManager(
+        TestHelper.getTestIndexIO(),
+        new SegmentLoaderConfig().withLocations(locations),
+        jsonMapper
+    );
+    executorService = Execs.multiThreaded(4, 
"segment-loader-local-cache-manager-concurrency-test-%d");
+  }
+
+  @After
+  public void tearDown()
+  {
+    executorService.shutdownNow();
+  }
+
+  @Test
+  public void testGetSegment() throws IOException, ExecutionException, 
InterruptedException
+  {
+    final File localStorageFolder = 
tmpFolder.newFolder("local_storage_folder");
+    final List<DataSegment> segmentsToLoad = new ArrayList<>(4);
+
+    final Interval interval = Intervals.of("2019-01-01/P1D");
+    for (int partitionId = 0; partitionId < 4; partitionId++) {
+      final String segmentPath = Paths.get(
+          localStorageFolder.getCanonicalPath(),
+          dataSource,
+          StringUtils.format("%s_%s", interval.getStart().toString(), 
interval.getEnd().toString()),
+          segmentVersion,
+          String.valueOf(partitionId)
+      ).toString();
+      // manually create a local segment under localStorageFolder
+      final File localSegmentFile = new File(
+          localStorageFolder,
+          segmentPath
+      );
+      localSegmentFile.mkdirs();
+      final File indexZip = new File(localSegmentFile, "index.zip");
+      indexZip.createNewFile();
+
+      final DataSegment segment = newSegment(interval, 
partitionId).withLoadSpec(
+          ImmutableMap.of(
+              "type",
+              "local",
+              "path",
+              localSegmentFile.getAbsolutePath()
+          )
+      );
+      segmentsToLoad.add(segment);
+    }
+
+    final List<Future> futures = segmentsToLoad
+        .stream()
+        .map(segment -> executorService.submit(() -> 
manager.getSegmentFiles(segment)))
+        .collect(Collectors.toList());
+
+    expectedException.expect(ExecutionException.class);
+    
expectedException.expectCause(CoreMatchers.instanceOf(SegmentLoadingException.class));
+    expectedException.expectMessage("Failed to load segment");
+    for (Future future : futures) {
+      future.get();
+    }
+
+    System.out.println(manager.getLocations().get(0).availableSizeBytes());
+  }
+
+  private DataSegment newSegment(Interval interval, int partitionId)
+  {
+    return DataSegment.builder()
+                      .dataSource(dataSource)
+                      .interval(interval)
+                      .loadSpec(
+                          ImmutableMap.of(
+                              "type",
+                              "local",
+                              "path",
+                              "somewhere"
+                          )
+                      )
+                      .version(segmentVersion)
+                      .dimensions(ImmutableList.of())
+                      .metrics(ImmutableList.of())
+                      .shardSpec(new NumberedShardSpec(partitionId, 0))
+                      .binaryVersion(9)
+                      .size(1000L)
+                      .build();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
index cdfcd47..23da422 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.loading;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
@@ -38,18 +39,18 @@ public class StorageLocationTest
   {
     // free space ignored only maxSize matters
     StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null);
-    Assert.assertTrue(locationPlain.canHandle(makeSegment("2012/2013", 
9_000)));
-    Assert.assertFalse(locationPlain.canHandle(makeSegment("2012/2013", 
11_000)));
+    Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013"), 
9_000));
+    Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013"), 
11_000));
 
     // enough space available maxSize is the limit
     StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0);
-    Assert.assertTrue(locationFree.canHandle(makeSegment("2012/2013", 9_000)));
-    Assert.assertFalse(locationFree.canHandle(makeSegment("2012/2013", 
11_000)));
+    Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013"), 
9_000));
+    Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013"), 
11_000));
 
     // disk almost full percentage is the limit
     StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0);
-    Assert.assertTrue(locationFull.canHandle(makeSegment("2012/2013", 4_000)));
-    Assert.assertFalse(locationFull.canHandle(makeSegment("2012/2013", 
6_000)));
+    Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013"), 
4_000));
+    Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013"), 
6_000));
   }
 
   private StorageLocation fakeLocation(long total, long free, long max, Double 
percent)
@@ -71,34 +72,34 @@ public class StorageLocationTest
 
     final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23);
 
-    loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 
10));
+    loc.reserve("test1", makeSegment("2012-01-01/2012-01-02", 10));
     expectedAvail -= 10;
     verifyLoc(expectedAvail, loc);
 
-    loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 
10));
+    loc.reserve("test1", makeSegment("2012-01-01/2012-01-02", 10));
     verifyLoc(expectedAvail, loc);
 
-    loc.addSegmentDir(new File("test2"), secondSegment);
+    loc.reserve("test2", secondSegment);
     expectedAvail -= 23;
     verifyLoc(expectedAvail, loc);
 
-    loc.removeSegmentDir(new File("test1"), 
makeSegment("2012-01-01/2012-01-02", 10));
+    loc.removeSegmentDir(new File("/tmp/test1"), 
makeSegment("2012-01-01/2012-01-02", 10));
     expectedAvail += 10;
     verifyLoc(expectedAvail, loc);
 
-    loc.removeSegmentDir(new File("test1"), 
makeSegment("2012-01-01/2012-01-02", 10));
+    loc.removeSegmentDir(new File("/tmp/test1"), 
makeSegment("2012-01-01/2012-01-02", 10));
     verifyLoc(expectedAvail, loc);
 
-    loc.removeSegmentDir(new File("test2"), secondSegment);
+    loc.removeSegmentDir(new File("/tmp/test2"), secondSegment);
     expectedAvail += 23;
     verifyLoc(expectedAvail, loc);
   }
 
   private void verifyLoc(long maxSize, StorageLocation loc)
   {
-    Assert.assertEquals(maxSize, loc.available());
+    Assert.assertEquals(maxSize, loc.availableSizeBytes());
     for (int i = 0; i <= maxSize; ++i) {
-      Assert.assertTrue(String.valueOf(i), 
loc.canHandle(makeSegment("2013/2014", i)));
+      Assert.assertTrue(String.valueOf(i), 
loc.canHandle(newSegmentId("2013/2014"), i));
     }
   }
 
@@ -116,4 +117,9 @@ public class StorageLocationTest
         size
     );
   }
+
+  private SegmentId newSegmentId(String intervalString)
+  {
+    return SegmentId.of("test", Intervals.of(intervalString), "1", 0);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to