gianm commented on code in PR #19539: URL: https://github.com/apache/druid/pull/19539#discussion_r3345059195
########## server/src/main/java/org/apache/druid/segment/loading/external/StorageLocationVirtualStorageManager.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading.external; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.Inject; +import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.common.asyncresource.AsyncResource; +import org.apache.druid.common.asyncresource.AsyncResources; +import org.apache.druid.common.asyncresource.SettableAsyncResource; +import org.apache.druid.error.DruidException; +import org.apache.druid.io.FilePopulator; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.loading.CacheEntry; +import org.apache.druid.segment.loading.StorageLoadingThreadPool; +import org.apache.druid.segment.loading.StorageLocation; +import org.apache.druid.segment.loading.StorageLocationSelectorStrategy; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; + +/** + * Default implementation of VirtualStorageManager that delegates to StorageLocation. + * Uses weak reservations for all cached files, making them eligible for eviction. + */ +public class StorageLocationVirtualStorageManager implements VirtualStorageManager +{ + private static final EmittingLogger log = new EmittingLogger(StorageLocationVirtualStorageManager.class); + + private final StorageLocationSelectorStrategy strategy; + private final StorageLoadingThreadPool loadingThreadPool; + + /** + * Per-identifier locks to ensure only one thread populates a given identifier. + * Once a location is resolved, it's stored in the lock so subsequent threads can check it. + */ + private final ConcurrentHashMap<String, PopulationLock> populationLocks = new ConcurrentHashMap<>(); + + @Inject + public StorageLocationVirtualStorageManager( + List<StorageLocation> locations, + StorageLocationSelectorStrategy strategy, + StorageLoadingThreadPool loadingThreadPool + ) + { + this.strategy = strategy; + this.loadingThreadPool = loadingThreadPool; + log.info("Initialized VirtualStorageManager with [%d] storage locations", locations.size()); + } + + @Nullable + @Override + public CachedFile get(String identifier) + { + PopulationLock lock = populationLocks.get(identifier); + if (lock == null) { + return null; + } + + StorageLocation resolvedLocation = lock.getResolvedLocation(); + if (resolvedLocation == null) { + // Population is still in flight: the lock exists but doesn't have a location yet. + // Return that the file doesn't exist instead of blocking. + return null; + } + + StorageLocation.ReservationHold<CacheEntry> hold = + resolvedLocation.addWeakReservationHoldIfExists(new StringCacheEntryIdentifier(identifier)); + return hold == null ? null : new CachedFile(hold, identifier); + } + + @Override + public CachedFile reserveAndPopulate( + String identifier, + LongSupplier sizeSupplier, + FilePopulator populator + ) + { + // Get or create lock for this identifier + final PopulationLock lock = populationLocks.computeIfAbsent(identifier, ignored -> new PopulationLock()); + + synchronized (lock) { + StringCacheEntryIdentifier cacheId = new StringCacheEntryIdentifier(identifier); + + // If multiple threads are trying to reserve the same location, the first one will update the state on the lock + // so check that first. + StorageLocation resolvedLocation = lock.getResolvedLocation(); + if (resolvedLocation == null) { + // Determining the size often requires an external system call, so we use this supplier to defer it until + // we absolutely need it + long sizeBytes = sizeSupplier.getAsLong(); + + // Try to reserve in each location according to strategy + Iterator<StorageLocation> locationIter = strategy.getLocations(); + Throwable lastException = null; + + while (locationIter.hasNext()) { + StorageLocation location = locationIter.next(); + + File locationFile = location.getPath(); + try { + // Create cache entry that will call populator on mount + DownloadableCacheEntry entry = new DownloadableCacheEntry(cacheId, sizeBytes, populator, locationFile) + { + final AtomicBoolean mounted = new AtomicBoolean(false); + + @Override + public void mount(StorageLocation location) + { + super.mount(location); + mounted.set(true); + } + + @Override + public void unmount() + { + if (mounted.get()) { + populationLocks.remove(identifier, lock); + } + super.unmount(); + } + }; + + // Try to reserve weak space + if (!location.reserveWeak(entry)) { Review Comment: #19539 adds `StorageLocation#removeUnheldWeakEntry` which I believe can be used to solve this problem. If that PR is merged first I can update this to use it. If not then the update can be done in a follow-up PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
