clintropolis commented on code in PR #19539:
URL: https://github.com/apache/druid/pull/19539#discussion_r3345909450


##########
processing/src/main/java/org/apache/druid/common/asyncresource/SettableAsyncResource.java:
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.common.asyncresource;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+/**
+ * Basic utility that allows creating {@link AsyncResource} wrappers. 
Analogous to JDK {@link CompletableFuture}
+ * or Guava {@link SettableFuture}. See {@link AsyncResource} for details 
about why you would want to use this
+ * instead of {@link Future}.
+ *
+ * <p>In addition to {@link #get()}, there is also {@link #release()}. 
Releasing is not allowed by instances

Review Comment:
   it feels worth perhaps elaborating on the use model for release, either here 
to describe it a bit to someone reading the main javadocs or i guess the method 
would be fine too;
   
   I would describe it as a resource ownership transfer model mainly useful for 
callers whose `T` itself is `Closeable`, and whose producer that calls `set` 
constructs the `T` such that its close method will release all associated 
resources, so that it is self-contained. Or something like that.



##########
server/src/main/java/org/apache/druid/segment/loading/external/StorageLocationVirtualStorageManager.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading.external;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.common.asyncresource.SettableAsyncResource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.io.FilePopulator;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.loading.CacheEntry;
+import org.apache.druid.segment.loading.StorageLoadingThreadPool;
+import org.apache.druid.segment.loading.StorageLocation;
+import org.apache.druid.segment.loading.StorageLocationSelectorStrategy;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
+
+/**
+ * Default implementation of VirtualStorageManager that delegates to 
StorageLocation.
+ * Uses weak reservations for all cached files, making them eligible for 
eviction.
+ */
+public class StorageLocationVirtualStorageManager implements 
VirtualStorageManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(StorageLocationVirtualStorageManager.class);
+
+  private final StorageLocationSelectorStrategy strategy;
+  private final StorageLoadingThreadPool loadingThreadPool;
+
+  /**
+   * Per-identifier locks to ensure only one thread populates a given 
identifier.
+   * Once a location is resolved, it's stored in the lock so subsequent 
threads can check it.
+   */
+  private final ConcurrentHashMap<String, PopulationLock> populationLocks = 
new ConcurrentHashMap<>();
+
+  @Inject
+  public StorageLocationVirtualStorageManager(
+      List<StorageLocation> locations,
+      StorageLocationSelectorStrategy strategy,
+      StorageLoadingThreadPool loadingThreadPool
+  )
+  {
+    this.strategy = strategy;
+    this.loadingThreadPool = loadingThreadPool;
+    log.info("Initialized VirtualStorageManager with [%d] storage locations", 
locations.size());
+  }
+
+  @Nullable
+  @Override
+  public CachedFile get(String identifier)
+  {
+    PopulationLock lock = populationLocks.get(identifier);
+    if (lock == null) {
+      return null;
+    }
+
+    StorageLocation resolvedLocation = lock.getResolvedLocation();
+    if (resolvedLocation == null) {
+      // Population is still in flight: the lock exists but doesn't have a 
location yet.
+      // Return that the file doesn't exist instead of blocking.
+      return null;
+    }
+
+    StorageLocation.ReservationHold<CacheEntry> hold =
+        resolvedLocation.addWeakReservationHoldIfExists(new 
StringCacheEntryIdentifier(identifier));
+    return hold == null ? null : new CachedFile(hold, identifier);
+  }
+
+  @Override
+  public CachedFile reserveAndPopulate(
+      String identifier,
+      LongSupplier sizeSupplier,
+      FilePopulator populator
+  )
+  {
+    // Get or create lock for this identifier
+    final PopulationLock lock = populationLocks.computeIfAbsent(identifier, 
ignored -> new PopulationLock());
+
+    synchronized (lock) {
+      StringCacheEntryIdentifier cacheId = new 
StringCacheEntryIdentifier(identifier);
+
+      // If multiple threads are trying to reserve the same location, the 
first one will update the state on the lock
+      // so check that first.
+      StorageLocation resolvedLocation = lock.getResolvedLocation();
+      if (resolvedLocation == null) {
+        // Determining the size often requires an external system call, so we 
use this supplier to defer it until
+        // we absolutely need it
+        long sizeBytes = sizeSupplier.getAsLong();
+
+        // Try to reserve in each location according to strategy
+        Iterator<StorageLocation> locationIter = strategy.getLocations();
+        Throwable lastException = null;
+
+        while (locationIter.hasNext()) {
+          StorageLocation location = locationIter.next();
+
+          File locationFile = location.getPath();
+          try {
+            // Create cache entry that will call populator on mount
+            DownloadableCacheEntry entry = new DownloadableCacheEntry(cacheId, 
sizeBytes, populator, locationFile)
+            {
+              final AtomicBoolean mounted = new AtomicBoolean(false);
+
+              @Override
+              public void mount(StorageLocation location)
+              {
+                super.mount(location);
+                mounted.set(true);
+              }
+
+              @Override
+              public void unmount()
+              {
+                if (mounted.get()) {
+                  populationLocks.remove(identifier, lock);
+                }
+                super.unmount();
+              }
+            };
+
+            // Try to reserve weak space
+            if (!location.reserveWeak(entry)) {

Review Comment:
   couldn't this call location.addWeakReservationHold which either returns a 
hold on an existing entry, or creates a new new entry under a hold if it 
doesn't exist, then this block and the later addWeakReservationHoldIfExists 
would be collapsed into the single call, and we only mount if we have a hold on 
the entry.
   
   > #19539 adds StorageLocation#removeUnheldWeakEntry which I believe can be 
used to solve this problem. 
   
   `StorageLocation#removeUnheldWeakEntry` is more for being able to remove the 
unmounted entry if mount fails so there isn't an unmounted entry left hanging 
out. In this case the comment is talking about, the entry was removed from the 
location by some other thread that inserted its own different entry and had to 
do an eviction; this entry is eligible since there isn't a hold on it. Later 
after mounting, when we try to acquire a hold for the existing entry it fails 
and we explode, leaving  the orphaned files on disk because the entry is no 
longer tracked and will never be unmounted (at least this is what I think 
happens).
   
   I guess we could also unmount the entry if we cannot acquire a hold on the 
existing thing, but that seems less ideal since it means this query has to fail 
even though it was originally able to reserve the location.
   
   I think its probably unlikely to happen in practice unless the eviction 
'hand' happens to be close by the entry in the linked list, but is possible and 
should be guarded against.



##########
server/src/main/java/org/apache/druid/segment/loading/external/StorageLocationVirtualStorageManager.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading.external;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.common.asyncresource.SettableAsyncResource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.io.FilePopulator;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.loading.CacheEntry;
+import org.apache.druid.segment.loading.StorageLoadingThreadPool;
+import org.apache.druid.segment.loading.StorageLocation;
+import org.apache.druid.segment.loading.StorageLocationSelectorStrategy;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
+
+/**
+ * Default implementation of VirtualStorageManager that delegates to 
StorageLocation.
+ * Uses weak reservations for all cached files, making them eligible for 
eviction.
+ */
+public class StorageLocationVirtualStorageManager implements 
VirtualStorageManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(StorageLocationVirtualStorageManager.class);
+
+  private final StorageLocationSelectorStrategy strategy;
+  private final StorageLoadingThreadPool loadingThreadPool;
+
+  /**
+   * Per-identifier locks to ensure only one thread populates a given 
identifier.
+   * Once a location is resolved, it's stored in the lock so subsequent 
threads can check it.
+   */
+  private final ConcurrentHashMap<String, PopulationLock> populationLocks = 
new ConcurrentHashMap<>();
+
+  @Inject
+  public StorageLocationVirtualStorageManager(
+      List<StorageLocation> locations,
+      StorageLocationSelectorStrategy strategy,
+      StorageLoadingThreadPool loadingThreadPool
+  )
+  {
+    this.strategy = strategy;
+    this.loadingThreadPool = loadingThreadPool;
+    log.info("Initialized VirtualStorageManager with [%d] storage locations", 
locations.size());
+  }
+
+  @Nullable
+  @Override
+  public CachedFile get(String identifier)
+  {
+    PopulationLock lock = populationLocks.get(identifier);
+    if (lock == null) {
+      return null;

Review Comment:
   feels worth a comment that we are just using the lock to get the location, 
at first glance it seems like perhaps it is a mistake that nothing holds the 
lock, but the problem solves itself if the acquireExistingHold fails



##########
server/src/main/java/org/apache/druid/segment/loading/external/DownloadableCacheEntry.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading.external;
+
+import com.google.common.hash.Hashing;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.io.FilePopulator;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.loading.CacheEntry;
+import org.apache.druid.segment.loading.CacheEntryIdentifier;
+import org.apache.druid.segment.loading.StorageLocation;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+/**
+ * CacheEntry implementation that calls a FilePopulator lambda when mounted.
+ */
+class DownloadableCacheEntry implements CacheEntry
+{
+  private static final Logger log = new Logger(DownloadableCacheEntry.class);
+
+  private static final int MAX_PREFIX_SUFFIX_LENGTH = 50;
+
+  private final StringCacheEntryIdentifier identifier;
+  private final long sizeBytes;
+  private final FilePopulator populator;
+  private final File locationPath;
+
+  private volatile boolean mounted = false;
+  private volatile File mountedFile = null;
+
+  /**
+   * Map for keeping track of functionality added by {@link #extend}.
+   */
+  private final ConcurrentHashMap<Class<? extends Closeable>, Closeable> 
extendMap = new ConcurrentHashMap<>();
+
+  public DownloadableCacheEntry(
+      StringCacheEntryIdentifier identifier,
+      long sizeBytes,
+      FilePopulator populator,
+      File locationPath
+  )
+  {
+    this.identifier = identifier;
+    this.sizeBytes = sizeBytes;
+    this.populator = populator;
+    this.locationPath = locationPath;
+  }
+
+  @Override
+  public CacheEntryIdentifier getId()
+  {
+    return identifier;
+  }
+
+  @Override
+  public long getSize()
+  {
+    return sizeBytes;
+  }
+
+  @Override
+  public boolean isMounted()
+  {
+    return mounted;
+  }
+
+  @Override
+  public void mount(StorageLocation location)
+  {
+    if (mounted) {
+      return; // Already mounted
+    }
+
+    // Determine file path - delegate to the same sanitization logic
+    File file = getFileForIdentifier(locationPath, identifier.value());
+
+    // Ensure parent directory exists
+    File parentDir = file.getParentFile();
+    if (!parentDir.exists()) {
+      try {
+        FileUtils.mkdirp(parentDir);
+      }
+      catch (IOException e) {
+        throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                             
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                             .build(e, "Failed to create parent directory 
[%s]", parentDir);
+      }
+    }
+
+    if (!(file.exists() && file.length() == sizeBytes)) {
+      // file doesn't exist (or is the wrong length), so we need populate it
+
+      if (file.exists()) {
+        // It wasn't the right length, let's delete it to avoid confusion for 
the populator.
+        if (!file.delete()) {
+          log.info("Problem deleting file [%s] before populating for VSM", 
file);
+        }
+      }
+
+      try {
+        populator.populate(file);
+      }
+      catch (Throwable e) {
+        // Clean up partial file on failure
+        if (file.exists()) {
+          file.delete();
+        }
+        throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                            
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                            .build(e, "Failed to populate file [%s]", file);
+      }
+
+      // Verify the file was created
+      if (!file.exists()) {
+        throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                            
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                            .build("Populator did not create file [%s]", file);
+      }
+    }
+
+    this.mountedFile = file;
+    this.mounted = true;
+  }
+
+  @Override
+  public void unmount()
+  {
+    final Closer extendCloser = CloseableUtils.forIterable(extendMap.values());
+    extendMap.clear();
+    CloseableUtils.closeAndSuppressExceptions(extendCloser, e -> log.warn(e, 
"Failed to close extended functionality"));
+
+    // StorageLocation will call this when it needs to reclaim space
+    if (mountedFile != null && mountedFile.exists()) {
+      if (!mountedFile.delete()) {
+        log.warn("Failed to delete file[%s] on unmount(). Leaving it.", 
mountedFile);
+      }
+    }
+    mountedFile = null;
+    mounted = false;
+  }
+
+  /**
+   * Get the mounted file.
+   *
+   * @return The file, or null if not mounted
+   */
+  @Nullable
+  File getFile()
+  {
+    return mountedFile;
+  }
+
+  /**
+   * Extend this cache entry with additional functionality. The lifecycle of 
the extended functionality is
+   * tracked along with the lifecycle of the cache entry. The provided 
supplier is called immediately unless
+   * there is already a mapping for the class. Either way, the class is 
returned.
+   */
+  @SuppressWarnings("unchecked")
+  public <T extends Closeable> T extend(Class<T> clazz, Supplier<T> supplier)
+  {
+    return (T) extendMap.computeIfAbsent(clazz, k -> supplier.get());
+  }
+
+  private File getFileForIdentifier(File locationPath, String identifier)
+  {
+    String sanitized = sanitizePath(identifier);
+    return new File(locationPath, sanitized);
+  }
+
+  static String sanitizePath(final String originalPath)

Review Comment:
   this method feels worth javadocs since its got a lot going on, and looking 
at the tests seems rather unobvious what the behavior is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to