clintropolis commented on code in PR #18176:
URL: https://github.com/apache/druid/pull/18176#discussion_r2289048382


##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -209,382 +278,173 @@ public void removeInfoFile(DataSegment segment)
   }
 
   @Override
-  public ReferenceCountedSegmentProvider getSegment(final DataSegment 
dataSegment) throws SegmentLoadingException
-  {
-    final File segmentFiles = getSegmentFiles(dataSegment);
-    final SegmentizerFactory factory = getSegmentFactory(segmentFiles);
-
-    final Segment segment = factory.factorize(dataSegment, segmentFiles, 
false, SegmentLazyLoadFailCallback.NOOP);
-    return ReferenceCountedSegmentProvider.wrapSegment(segment, 
dataSegment.getShardSpec());
-  }
-
-  @Override
-  public ReferenceCountedSegmentProvider getBootstrapSegment(
-      final DataSegment dataSegment,
-      final SegmentLazyLoadFailCallback loadFailed
-  ) throws SegmentLoadingException
-  {
-    final File segmentFiles = getSegmentFiles(dataSegment);
-    final SegmentizerFactory factory = getSegmentFactory(segmentFiles);
-
-    final Segment segment = factory.factorize(dataSegment, segmentFiles, 
config.isLazyLoadOnStart(), loadFailed);
-    return ReferenceCountedSegmentProvider.wrapSegment(segment, 
dataSegment.getShardSpec());
-  }
-
-  private SegmentizerFactory getSegmentFactory(final File segmentFiles) throws 
SegmentLoadingException
-  {
-    final File factoryJson = new File(segmentFiles, "factory.json");
-    final SegmentizerFactory factory;
-
-    if (factoryJson.exists()) {
-      try {
-        factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class);
-      }
-      catch (IOException e) {
-        throw new SegmentLoadingException(e, "Failed to get segment facotry 
for %s", e.getMessage());
-      }
-    } else {
-      factory = new MMappedQueryableSegmentizerFactory(indexIO);
-    }
-    return factory;
-  }
-
-  /**
-   * Returns the effective segment info directory based on the configuration 
settings.
-   * The directory is selected based on the following configurations injected 
into this class:
-   * <ul>
-   *   <li>{@link SegmentLoaderConfig#getInfoDir()} - If {@code infoDir} is 
set, it is used as the info directory.</li>
-   *   <li>{@link SegmentLoaderConfig#getLocations()} - If the info directory 
is not set, the first location from this list is used.</li>
-   *   <li>List of {@link StorageLocation}s injected - If both the info 
directory and locations list are not set, the
-   *   first storage location is used.</li>
-   * </ul>
-   *
-   * @throws DruidException if none of the configurations are set, and the 
info directory cannot be determined.
-   */
-  private File getEffectiveInfoDir()
-  {
-    final File infoDir;
-    if (config.getInfoDir() != null) {
-      infoDir = config.getInfoDir();
-    } else if (!config.getLocations().isEmpty()) {
-      infoDir = new File(config.getLocations().get(0).getPath(), "info_dir");
-    } else if (!locations.isEmpty()) {
-      infoDir = new File(locations.get(0).getPath(), "info_dir");
-    } else {
-      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
-          .ofCategory(DruidException.Category.NOT_FOUND)
-          .build("Could not determine infoDir. Make sure 
'druid.segmentCache.infoDir' "
-                 + "or 'druid.segmentCache.locations' is set correctly.");
-    }
-    return infoDir;
-  }
-
-  private static String getSegmentDir(DataSegment segment)
-  {
-    return DataSegmentPusher.getDefaultStorageDir(segment, false);
-  }
-
-  /**
-   * Checks whether a segment is already cached. It can return false even if 
{@link #reserve(DataSegment)}
-   * has been successful for a segment but is not downloaded yet.
-   */
-  boolean isSegmentCached(final DataSegment segment)
-  {
-    return findStoragePathIfCached(segment) != null;
-  }
-
-  /**
-   * This method will try to find if the segment is already downloaded on any 
location. If so, the segment path
-   * is returned. Along with that, location state is also updated with the 
segment location. Refer to
-   * {@link StorageLocation#maybeReserve(String, DataSegment)} for more 
details.
-   * If the segment files are damaged in any location, they are removed from 
the location.
-   * @param segment - Segment to check
-   * @return - Path corresponding to segment directory if found, null 
otherwise.
-   */
-  @Nullable
-  private File findStoragePathIfCached(final DataSegment segment)
+  public Optional<Segment> acquireSegment(final DataSegment dataSegment)
   {
+    final SegmentCacheEntryIdentifier cacheEntryIdentifier = new 
SegmentCacheEntryIdentifier(dataSegment.getId());
     for (StorageLocation location : locations) {
-      String storageDir = getSegmentDir(segment);
-      File localStorageDir = location.segmentDirectoryAsFile(storageDir);
-      if (localStorageDir.exists()) {
-        if (checkSegmentFilesIntact(localStorageDir)) {
-          log.warn(
-              "[%s] may be damaged. Delete all the segment files and pull from 
DeepStorage again.",
-              localStorageDir.getAbsolutePath()
-          );
-          cleanupCacheFiles(location.getPath(), localStorageDir);
-          location.removeSegmentDir(localStorageDir, segment);
-          break;
-        } else {
-          // Before returning, we also reserve the space. Refer to the 
StorageLocation#maybeReserve documentation for details.
-          location.maybeReserve(storageDir, segment);
-          return localStorageDir;
-        }
+      if (location.isReserved(cacheEntryIdentifier)) {
+        final SegmentCacheEntry cacheEntry = 
location.getCacheEntry(cacheEntryIdentifier);
+        return cacheEntry.referenceProvider.acquireReference();
       }
     }
-    return null;
+    return Optional.empty();
   }
 
-  /**
-   * check data intact.
-   * @param dir segments cache dir
-   * @return true means segment files may be damaged.
-   */
-  private boolean checkSegmentFilesIntact(File dir)
-  {
-    return checkSegmentFilesIntactWithStartMarker(dir);
-  }
-
-  /**
-   * If there is 'downloadStartMarker' existed in localStorageDir, the 
segments files might be damaged.
-   * Because each time, Druid will delete the 'downloadStartMarker' file after 
pulling and unzip the segments from DeepStorage.
-   * downloadStartMarker existed here may mean something error during download 
segments and the segment files may be damaged.
-   */
-  private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir)
-  {
-    final File downloadStartMarker = new File(localStorageDir.getPath(), 
DOWNLOAD_START_MARKER_FILE_NAME);
-    return downloadStartMarker.exists();
-  }
-
-  /**
-   * Make sure segments files in loc is intact, otherwise function like 
loadSegments will failed because of segment files is damaged.
-   * @param segment
-   * @return
-   * @throws SegmentLoadingException
-   */
   @Override
-  public File getSegmentFiles(DataSegment segment) throws 
SegmentLoadingException
+  public AcquireSegmentAction acquireSegment(
+      final DataSegment dataSegment,
+      final SegmentDescriptor descriptor
+  ) throws SegmentLoadingException
   {
-    final ReferenceCountingLock lock = createOrGetLock(segment);
-    synchronized (lock) {
+    final SegmentCacheEntryIdentifier identifier = new 
SegmentCacheEntryIdentifier(dataSegment.getId());
+    for (StorageLocation location : locations) {
+      final StorageLocation.ReservationHold<SegmentCacheEntry> hold =
+          location.addWeakReservationHoldIfExists(identifier);
       try {
-        File segmentDir = findStoragePathIfCached(segment);
-        if (segmentDir != null) {
-          return segmentDir;
+        if (hold != null) {
+          if (hold.getEntry().isMounted()) {
+            return new AcquireSegmentAction(
+                descriptor,
+                () -> 
Futures.immediateFuture(hold.getEntry().referenceProvider.acquireReference()),
+                hold
+            );
+          } else {
+            // go ahead and mount it, someone else is probably trying this as 
well, but mount is done under a segment
+            // lock and is a no-op if already mounted, and if we win we need 
it to be mounted
+            return new AcquireSegmentAction(
+                descriptor,
+                makeOnDemandLoadSupplier(dataSegment, hold.getEntry(), 
location),
+                hold
+            );
+          }
         }
-
-        return loadSegmentWithRetry(segment);
       }
-      finally {
-        unlock(segment, lock);
+      catch (Throwable t) {
+        throw CloseableUtils.closeAndWrapInCatch(t, hold);
       }
     }
-  }
-
-  /**
-   * If we have already reserved a location before, probably via {@link 
#reserve(DataSegment)}, then only that location
-   * should be tried. Otherwise, we would fetch locations using {@link 
StorageLocationSelectorStrategy} and try all
-   * of them one by one till there is success.
-   * Location may fail because of IO failure, most likely in two cases:<p>
-   * 1. druid don't have the write access to this location, most likely the 
administrator doesn't config it correctly<p>
-   * 2. disk failure, druid can't read/write to this disk anymore
-   * <p>
-   * Locations are fetched using {@link StorageLocationSelectorStrategy}.
-   */
-  private File loadSegmentWithRetry(DataSegment segment) throws 
SegmentLoadingException
-  {
-    String segmentDir = getSegmentDir(segment);
-
-    // Try the already reserved location. If location has been reserved 
outside, then we do not release the location
-    // here and simply delete any downloaded files. That is, we revert 
anything we do in this function and nothing else.
-    for (StorageLocation loc : locations) {
-      if (loc.isReserved(segmentDir)) {
-        File storageDir = loc.segmentDirectoryAsFile(segmentDir);
-        boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, 
storageDir, false);
-        if (!success) {
-          throw new SegmentLoadingException(
-              "Failed to load segment[%s] in reserved location[%s]", 
segment.getId(), loc.getPath().getAbsolutePath()
+    final Iterator<StorageLocation> iterator = strategy.getLocations();

Review Comment:
   good catch, i've repurposed the segment locks in `SegmentLocalCacheManager` 
to be used just for assigning `StorageLocation`, pushing all other locking down 
to the `StorageLocation`, which simplifies things quite a bit i think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to