abhishekagarwal87 commented on a change in pull request #11398:
URL: https://github.com/apache/druid/pull/11398#discussion_r669326519



##########
File path: 
server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
##########
@@ -154,7 +166,8 @@ public synchronized File reserve(String 
segmentFilePathToAdd, String segmentId,
   {
     final File segmentFileToAdd = new File(path, segmentFilePathToAdd);
     if (files.contains(segmentFileToAdd)) {
-      return null;
+      //TODO: is this change ok?
+      return segmentFileToAdd;

Review comment:
       > What's the concern that the TODO is referencing?
   
   While I think the change is in the right direction, wanted to draw 
reviewer's attention to this change to confirm I am not breaking something 
else. 
   > Your reasoning makes sense to me, but the callers currently depend on this 
behavior to avoid duplicate work. ​For example, this line checks whether 
storageDir is null before downloading the segment. I think this method should 
return something that is indicative of whether the call reserves some space 
successfully or the call returns immediately at this line.
   
   In your example, there are checks before `reserve` to find out if location 
is already reserved. But I see the utility in signalling to the caller that 
file was already reserved. Maybe I can throw a checked exception 
`FileAlreadyReservedException`  that callers can catch and do the needful. 
   
   

##########
File path: 
server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
##########
@@ -154,7 +166,8 @@ public synchronized File reserve(String 
segmentFilePathToAdd, String segmentId,
   {
     final File segmentFileToAdd = new File(path, segmentFilePathToAdd);
     if (files.contains(segmentFileToAdd)) {
-      return null;
+      //TODO: is this change ok?
+      return segmentFileToAdd;

Review comment:
       thinking more about this,  I realize that this change is not actually 
required. Since I have already added `isReserved` in `StorageLocation` to check 
if the location is pre-reserved. For my use case, the extension is going to 
reserve the space via SegmentLoader#reserve so this change is redundant and can 
be removed.

##########
File path: 
server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
##########
@@ -203,8 +219,8 @@ public Segment getSegment(DataSegment segment, boolean 
lazy, SegmentLazyLoadFail
     } else {
       factory = new MMappedQueryableSegmentizerFactory(indexIO);
     }
-
-    return factory.factorize(segment, segmentFiles, lazy, loadFailed);
+    Segment baseSegment = factory.factorize(segment, segmentFiles, lazy, 
loadFailed);
+    return ReferenceCountingSegment.wrapSegment(baseSegment, 
segment.getShardSpec());

Review comment:
       It's not the `DataSegment` that we are re-creating. We are calling 
`factory#factorize` each time and that is going to create a new `Segment` 
object too. Though, technically, that is an immutable object too. there is 
another proposal here 
(https://github.com/apache/druid/pull/11398/files#r668987030) that allays your 
concern. 

##########
File path: 
server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
##########
@@ -29,10 +30,45 @@
  * Loading segments from deep storage to local storage.
  * Implementations must be thread-safe.
  */
+@UnstableApi
 public interface SegmentLoader
 {
   boolean isSegmentLoaded(DataSegment segment);
-  Segment getSegment(DataSegment segment, boolean lazy, 
SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
+
+  /**
+   * Returns a {@link ReferenceCountingSegment} that will be added by the 
{@link org.apache.druid.server.SegmentManager}
+   * to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This 
method can be called multiple times
+   * by the {@link org.apache.druid.server.SegmentManager}
+   *
+   * Returning a {@code ReferenceCountingSegment} will let custom 
implementations keep track of reference count for
+   * segments that the custom implementations are creating. That way, custom 
implementations can know when the segment
+   * is in use or not.
+   * @param segment - {@link DataSegment} segment to download
+   * @param lazy - whether the loading is lazy
+   * @param loadFailed - Callback to invoke if the loading fails
+   * @return Segment object wrapped inside {@link ReferenceCountingSegment}.
+   * @throws SegmentLoadingException
+   */
+  ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, 
SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
+
   File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
+
+  /**
+   * Tries to reserve the space for a segment on any location. We only return 
a boolean result instead of a pointer to
+   * {@link StorageLocation} since we don't want callers to operate on {@code 
StorageLocation} directly outside {@code SegmentLoader}.
+   * {@link SegmentLoader} operates on the {@code StorageLocation} objects in 
a thread-safe manner.
+   *
+   * @param segment - Segment to reserve
+   * @return True if enough space found to store the segment, false otherwise
+   */
+  boolean reserve(DataSegment segment);

Review comment:
       yeah.  I agree that those comments shouldn't be part of the javadoc 
itself. they add some context behind keeping the signature as it is. 
   >BTW, how do you expect these methods to be used? When should the caller use 
this method instead of getSegment()?
   
   when there is a need to de-couple the action of actually downloading the 
files vs reserving the space. With this function, extensions can try to reserve 
the space first and if not successful, make some space, clean-up segments, etc. 
There is also improved concurrency for extensions with the breakdown of 
`getSegment` into `reserve` and download operations since the former function 
is a cheap op and can be put inside a lock if required by the extensions. 
`getSegment` can't be put inside a lock since it is a time-consuming operation. 

##########
File path: 
server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
##########
@@ -29,10 +30,45 @@
  * Loading segments from deep storage to local storage.
  * Implementations must be thread-safe.
  */
+@UnstableApi
 public interface SegmentLoader
 {
   boolean isSegmentLoaded(DataSegment segment);
-  Segment getSegment(DataSegment segment, boolean lazy, 
SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
+
+  /**
+   * Returns a {@link ReferenceCountingSegment} that will be added by the 
{@link org.apache.druid.server.SegmentManager}
+   * to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This 
method can be called multiple times
+   * by the {@link org.apache.druid.server.SegmentManager}
+   *
+   * Returning a {@code ReferenceCountingSegment} will let custom 
implementations keep track of reference count for
+   * segments that the custom implementations are creating. That way, custom 
implementations can know when the segment
+   * is in use or not.
+   * @param segment - {@link DataSegment} segment to download
+   * @param lazy - whether the loading is lazy
+   * @param loadFailed - Callback to invoke if the loading fails
+   * @return Segment object wrapped inside {@link ReferenceCountingSegment}.
+   * @throws SegmentLoadingException
+   */
+  ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, 
SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
+
   File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;

Review comment:
       Hmm. How about having 
   `SegmentLoader` and `SegmentFetcher` 
   
   - SegmentLoader also loads the segment from segment files and return 
`ReferenceCountingSegment`. It will be used in `SegmentManager`
   - SegmentFetcher just downloads the files. It would also have the other 
methods such as `isSegmentLoaded` (should be renamed to `isSegmentFetched`), 
`reserve` and `release`. `SegmentFetcher` would abstract away the details about 
`StorageLocation`(s). The implementations must be thread-safe. Ingestion will 
use `SegmentFetcher` directly and `SegmentFetcher` will also be used by 
`SegmentLoader`. 
   - `SegmentLoader` is given the `File` from `segmentFetcher` that 
`SegmentLoader` will load as `ReferencingCountingSegment`. It can be called 
once if `SegmentManager` will load it only if there is no existing entry in the 
timeline. But that may impact the concurrency of `SegmentManager` if we move 
`load` inside the `datasources.compute` inside `SegmentManager`.  We either let 
`load` be called multiple times or we add another method in `SegmentLoader` 
that wraps the base segment in `ReferenceCountingSegment`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to