jihoonson commented on a change in pull request #11398:
URL: https://github.com/apache/druid/pull/11398#discussion_r674386509



##########
File path: 
server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
##########
@@ -117,6 +118,17 @@ public synchronized File reserve(String segmentDir, 
DataSegment segment)
     return reserve(segmentDir, segment.getId().toString(), segment.getSize());
   }
 
+  public synchronized boolean isReserved(String segmentDir)
+  {
+    final File segmentFile = new File(path, segmentDir);
+    return files.contains(segmentFile);
+  }
+
+  public File segmentDirectoryAsFile(String segmentDir)
+  {
+    return new File(path, segmentDir);
+  }
+

Review comment:
       The LGTM error in 
https://lgtm.com/projects/g/apache/druid/rev/pr-1ff2ba29372f1d2b44941bb55f75b5830f808401
 seems like a false alarm. Perhaps we should suppress it for this change.

##########
File path: 
server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
##########
@@ -40,6 +40,34 @@
    */
   File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
 
+  /**
+   * Tries to reserve the space for a segment on any location. When the space 
has been reserved,
+   * {@link #getSegmentFiles(DataSegment)} should download the segment on the 
reserved location or
+   * fail otherwise.
+   *
+   * This function is useful for custom extensions. Extensions can try to 
reserve the space first and
+   * if not successful, make some space by cleaning up other segments, etc. 
There is also improved
+   * concurrency for extensions with this function. Since reserve is a cheaper 
operation to invoke
+   * till the space has been reserved. Hence it can be put inside a lock if 
required by the extensions. getSegment
+   * can't be put inside a lock since it is a time-consuming operation, on 
account of downloading the files.
+   *
+   * @param segment - Segment to reserve
+   * @return True if enough space found to store the segment, false otherwise
+   */
+  /*
+   * We only return a boolean result instead of a pointer to
+   * {@link StorageLocation} since we don't want callers to operate on {@code 
StorageLocation} directly outside {@code SegmentLoader}.
+   * {@link SegmentLoader} operates on the {@code StorageLocation} objects in 
a thread-safe manner.
+   */
+  boolean reserve(DataSegment segment);
+
+  /**
+   * Reverts the effects of {@link #reserve(DataSegment)} (DataSegment)} by 
releasing the location reserved for this segment.
+   * @param segment - Segment to release the location for.
+   * @return - True if any location was reserved and released, false otherwise.
+   */
+  boolean release(DataSegment segment);

Review comment:
       Can you clarify the contract between this method and `getSegmentFiles`? 
For example, what should happen when `release` is called if `reserve` was not 
called but `getSegmentFiles` was called?

##########
File path: 
server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
##########
@@ -40,6 +40,34 @@
    */
   File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
 
+  /**
+   * Tries to reserve the space for a segment on any location. When the space 
has been reserved,
+   * {@link #getSegmentFiles(DataSegment)} should download the segment on the 
reserved location or
+   * fail otherwise.
+   *
+   * This function is useful for custom extensions. Extensions can try to 
reserve the space first and
+   * if not successful, make some space by cleaning up other segments, etc. 
There is also improved
+   * concurrency for extensions with this function. Since reserve is a cheaper 
operation to invoke
+   * till the space has been reserved. Hence it can be put inside a lock if 
required by the extensions. getSegment
+   * can't be put inside a lock since it is a time-consuming operation, on 
account of downloading the files.
+   *
+   * @param segment - Segment to reserve
+   * @return True if enough space found to store the segment, false otherwise
+   */
+  /*
+   * We only return a boolean result instead of a pointer to
+   * {@link StorageLocation} since we don't want callers to operate on {@code 
StorageLocation} directly outside {@code SegmentLoader}.
+   * {@link SegmentLoader} operates on the {@code StorageLocation} objects in 
a thread-safe manner.
+   */
+  boolean reserve(DataSegment segment);

Review comment:
       Should `isSegmentCached` still return false after `reserve` is called? 
Would be worth to document it.

##########
File path: 
server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
##########
@@ -29,10 +30,45 @@
  * Loading segments from deep storage to local storage.
  * Implementations must be thread-safe.
  */
+@UnstableApi
 public interface SegmentLoader
 {
   boolean isSegmentLoaded(DataSegment segment);
-  Segment getSegment(DataSegment segment, boolean lazy, 
SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
+
+  /**
+   * Returns a {@link ReferenceCountingSegment} that will be added by the 
{@link org.apache.druid.server.SegmentManager}
+   * to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This 
method can be called multiple times
+   * by the {@link org.apache.druid.server.SegmentManager}
+   *
+   * Returning a {@code ReferenceCountingSegment} will let custom 
implementations keep track of reference count for
+   * segments that the custom implementations are creating. That way, custom 
implementations can know when the segment
+   * is in use or not.
+   * @param segment - {@link DataSegment} segment to download
+   * @param lazy - whether the loading is lazy
+   * @param loadFailed - Callback to invoke if the loading fails
+   * @return Segment object wrapped inside {@link ReferenceCountingSegment}.
+   * @throws SegmentLoadingException
+   */
+  ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, 
SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;

Review comment:
       I think this should be documented in the javadoc.

##########
File path: 
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
##########
@@ -198,44 +216,83 @@ public File getSegmentFiles(DataSegment segment) throws 
SegmentLoadingException
   }
 
   /**
-   * location may fail because of IO failure, most likely in two cases:<p>
+   * If we have already reserved a location before, probably via {@link 
#reserve(DataSegment)}, then only that location
+   * should be tried. Otherwise, we would fetch locations using {@link 
StorageLocationSelectorStrategy} and try all
+   * of them one by one till there is success.
+   * Location may fail because of IO failure, most likely in two cases:<p>
    * 1. druid don't have the write access to this location, most likely the 
administrator doesn't config it correctly<p>
    * 2. disk failure, druid can't read/write to this disk anymore
-   *
+   * <p>
    * Locations are fetched using {@link StorageLocationSelectorStrategy}.
    */
-  private StorageLocation loadSegmentWithRetry(DataSegment segment, String 
storageDirStr) throws SegmentLoadingException
+  private File loadSegmentWithRetry(DataSegment segment) throws 
SegmentLoadingException
   {
-    Iterator<StorageLocation> locationsIterator = strategy.getLocations();
+    String segmentDir = getSegmentDir(segment);
+
+    // Try the already reserved location. If location has been reserved 
outside, then we do not release the location
+    // here and simply delete any downloaded files. That is, we revert 
anything we do in this function and nothing else.
+    for (StorageLocation loc : locations) {
+      if (loc.isReserved(segmentDir)) {
+        File storageDir = loc.segmentDirectoryAsFile(segmentDir);
+        boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, 
storageDir, false);
+        if (!success) {
+          throw new SegmentLoadingException("Failed to load segment %s in 
reserved location [%s]", segment.getId(), loc.getPath().getAbsolutePath());
+        }
+        return storageDir;
+      }
+    }
 
+    // No location was reserved so we try all the locations
+    Iterator<StorageLocation> locationsIterator = strategy.getLocations();
     while (locationsIterator.hasNext()) {
 
       StorageLocation loc = locationsIterator.next();
 
-      File storageDir = loc.reserve(storageDirStr, segment);
+      // storageDir is the file path corresponding to segment dir
+      File storageDir = loc.reserve(segmentDir, segment);
       if (storageDir != null) {
-        try {
-          loadInLocationWithStartMarker(segment, storageDir);
-          return loc;
-        }
-        catch (SegmentLoadingException e) {
-          try {
-            log.makeAlert(
-                e,
-                "Failed to load segment in current location [%s], try next 
location if any",
-                loc.getPath().getAbsolutePath()
-            ).addData("location", loc.getPath().getAbsolutePath()).emit();
-          }
-          finally {
-            loc.removeSegmentDir(storageDir, segment);
-            cleanupCacheFiles(loc.getPath(), storageDir);
-          }
+        boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, 
storageDir, true);
+        if (success) {

Review comment:
       Seems that `loc.release` should be called when `success` is false? 
Please add some test to verify this behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to