gianm commented on code in PR #18176:
URL: https://github.com/apache/druid/pull/18176#discussion_r2246600666
##########
processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java:
##########
@@ -173,6 +175,97 @@ public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor)
);
}
+ public static ExecutorService newBlockingCached(
Review Comment:
How about folding these parameters into the existing `newBlockingThreaded`?
It's only used internally by `Execs` so it should be fine to change it.
##########
processing/src/main/java/org/apache/druid/segment/SegmentMapFunction.java:
##########
@@ -23,24 +23,22 @@
import java.util.function.Function;
/**
- * Functional interface that captures the process of acquiring a {@link
Segment} from a
- * {@link ReferenceCountedObjectProvider<Segment>} and performing any
transformations done on top of this leaf {@link Segment}
- * before it is available to query processing engines.
+ * Functional interface that captures the process of transforming a {@link
Segment} to another {@link Segment} if
+ * possible.
* <p>
- * The {@link Segment} returned by this method, if present, must always be
closed to return the reference to the base
- * {@link ReferenceCountedObjectProvider<Segment>} that it came from.
+ * The {@link Segment} returned by this method, if present, must always be
closed by the caller.
*/
@FunctionalInterface
-public interface SegmentMapFunction extends
Function<ReferenceCountedObjectProvider<Segment>, Optional<Segment>>
+public interface SegmentMapFunction extends Function<Optional<Segment>,
Optional<Segment>>
Review Comment:
when can the input be absent? Is there anything smart a SegmentMapFunction
can do with absent input other than return absent output? I'm asking because
I'm wondering if this should be `Function<Segment, Optional<Segment>>` and the
caller would just not call it for absent input.
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -209,382 +278,173 @@ public void removeInfoFile(DataSegment segment)
}
@Override
- public ReferenceCountedSegmentProvider getSegment(final DataSegment
dataSegment) throws SegmentLoadingException
- {
- final File segmentFiles = getSegmentFiles(dataSegment);
- final SegmentizerFactory factory = getSegmentFactory(segmentFiles);
-
- final Segment segment = factory.factorize(dataSegment, segmentFiles,
false, SegmentLazyLoadFailCallback.NOOP);
- return ReferenceCountedSegmentProvider.wrapSegment(segment,
dataSegment.getShardSpec());
- }
-
- @Override
- public ReferenceCountedSegmentProvider getBootstrapSegment(
- final DataSegment dataSegment,
- final SegmentLazyLoadFailCallback loadFailed
- ) throws SegmentLoadingException
- {
- final File segmentFiles = getSegmentFiles(dataSegment);
- final SegmentizerFactory factory = getSegmentFactory(segmentFiles);
-
- final Segment segment = factory.factorize(dataSegment, segmentFiles,
config.isLazyLoadOnStart(), loadFailed);
- return ReferenceCountedSegmentProvider.wrapSegment(segment,
dataSegment.getShardSpec());
- }
-
- private SegmentizerFactory getSegmentFactory(final File segmentFiles) throws
SegmentLoadingException
- {
- final File factoryJson = new File(segmentFiles, "factory.json");
- final SegmentizerFactory factory;
-
- if (factoryJson.exists()) {
- try {
- factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class);
- }
- catch (IOException e) {
- throw new SegmentLoadingException(e, "Failed to get segment facotry
for %s", e.getMessage());
- }
- } else {
- factory = new MMappedQueryableSegmentizerFactory(indexIO);
- }
- return factory;
- }
-
- /**
- * Returns the effective segment info directory based on the configuration
settings.
- * The directory is selected based on the following configurations injected
into this class:
- * <ul>
- * <li>{@link SegmentLoaderConfig#getInfoDir()} - If {@code infoDir} is
set, it is used as the info directory.</li>
- * <li>{@link SegmentLoaderConfig#getLocations()} - If the info directory
is not set, the first location from this list is used.</li>
- * <li>List of {@link StorageLocation}s injected - If both the info
directory and locations list are not set, the
- * first storage location is used.</li>
- * </ul>
- *
- * @throws DruidException if none of the configurations are set, and the
info directory cannot be determined.
- */
- private File getEffectiveInfoDir()
- {
- final File infoDir;
- if (config.getInfoDir() != null) {
- infoDir = config.getInfoDir();
- } else if (!config.getLocations().isEmpty()) {
- infoDir = new File(config.getLocations().get(0).getPath(), "info_dir");
- } else if (!locations.isEmpty()) {
- infoDir = new File(locations.get(0).getPath(), "info_dir");
- } else {
- throw DruidException.forPersona(DruidException.Persona.OPERATOR)
- .ofCategory(DruidException.Category.NOT_FOUND)
- .build("Could not determine infoDir. Make sure
'druid.segmentCache.infoDir' "
- + "or 'druid.segmentCache.locations' is set correctly.");
- }
- return infoDir;
- }
-
- private static String getSegmentDir(DataSegment segment)
- {
- return DataSegmentPusher.getDefaultStorageDir(segment, false);
- }
-
- /**
- * Checks whether a segment is already cached. It can return false even if
{@link #reserve(DataSegment)}
- * has been successful for a segment but is not downloaded yet.
- */
- boolean isSegmentCached(final DataSegment segment)
- {
- return findStoragePathIfCached(segment) != null;
- }
-
- /**
- * This method will try to find if the segment is already downloaded on any
location. If so, the segment path
- * is returned. Along with that, location state is also updated with the
segment location. Refer to
- * {@link StorageLocation#maybeReserve(String, DataSegment)} for more
details.
- * If the segment files are damaged in any location, they are removed from
the location.
- * @param segment - Segment to check
- * @return - Path corresponding to segment directory if found, null
otherwise.
- */
- @Nullable
- private File findStoragePathIfCached(final DataSegment segment)
+ public Optional<Segment> acquireSegment(final DataSegment dataSegment)
{
+ final SegmentCacheEntryIdentifier cacheEntryIdentifier = new
SegmentCacheEntryIdentifier(dataSegment.getId());
for (StorageLocation location : locations) {
- String storageDir = getSegmentDir(segment);
- File localStorageDir = location.segmentDirectoryAsFile(storageDir);
- if (localStorageDir.exists()) {
- if (checkSegmentFilesIntact(localStorageDir)) {
- log.warn(
- "[%s] may be damaged. Delete all the segment files and pull from
DeepStorage again.",
- localStorageDir.getAbsolutePath()
- );
- cleanupCacheFiles(location.getPath(), localStorageDir);
- location.removeSegmentDir(localStorageDir, segment);
- break;
- } else {
- // Before returning, we also reserve the space. Refer to the
StorageLocation#maybeReserve documentation for details.
- location.maybeReserve(storageDir, segment);
- return localStorageDir;
- }
+ if (location.isReserved(cacheEntryIdentifier)) {
+ final SegmentCacheEntry cacheEntry =
location.getCacheEntry(cacheEntryIdentifier);
+ return cacheEntry.referenceProvider.acquireReference();
}
}
- return null;
+ return Optional.empty();
}
- /**
- * check data intact.
- * @param dir segments cache dir
- * @return true means segment files may be damaged.
- */
- private boolean checkSegmentFilesIntact(File dir)
- {
- return checkSegmentFilesIntactWithStartMarker(dir);
- }
-
- /**
- * If there is 'downloadStartMarker' existed in localStorageDir, the
segments files might be damaged.
- * Because each time, Druid will delete the 'downloadStartMarker' file after
pulling and unzip the segments from DeepStorage.
- * downloadStartMarker existed here may mean something error during download
segments and the segment files may be damaged.
- */
- private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir)
- {
- final File downloadStartMarker = new File(localStorageDir.getPath(),
DOWNLOAD_START_MARKER_FILE_NAME);
- return downloadStartMarker.exists();
- }
-
- /**
- * Make sure segments files in loc is intact, otherwise function like
loadSegments will failed because of segment files is damaged.
- * @param segment
- * @return
- * @throws SegmentLoadingException
- */
@Override
- public File getSegmentFiles(DataSegment segment) throws
SegmentLoadingException
+ public AcquireSegmentAction acquireSegment(
+ final DataSegment dataSegment,
+ final SegmentDescriptor descriptor
+ ) throws SegmentLoadingException
{
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
+ final SegmentCacheEntryIdentifier identifier = new
SegmentCacheEntryIdentifier(dataSegment.getId());
+ for (StorageLocation location : locations) {
+ final StorageLocation.ReservationHold<SegmentCacheEntry> hold =
+ location.addWeakReservationHoldIfExists(identifier);
try {
- File segmentDir = findStoragePathIfCached(segment);
- if (segmentDir != null) {
- return segmentDir;
+ if (hold != null) {
+ if (hold.getEntry().isMounted()) {
+ return new AcquireSegmentAction(
+ descriptor,
+ () ->
Futures.immediateFuture(hold.getEntry().referenceProvider.acquireReference()),
+ hold
+ );
+ } else {
+ // go ahead and mount it, someone else is probably trying this as
well, but mount is done under a segment
+ // lock and is a no-op if already mounted, and if we win we need
it to be mounted
+ return new AcquireSegmentAction(
+ descriptor,
+ makeOnDemandLoadSupplier(dataSegment, hold.getEntry(),
location),
+ hold
+ );
+ }
}
-
- return loadSegmentWithRetry(segment);
}
- finally {
- unlock(segment, lock);
+ catch (Throwable t) {
+ throw CloseableUtils.closeAndWrapInCatch(t, hold);
}
}
- }
-
- /**
- * If we have already reserved a location before, probably via {@link
#reserve(DataSegment)}, then only that location
- * should be tried. Otherwise, we would fetch locations using {@link
StorageLocationSelectorStrategy} and try all
- * of them one by one till there is success.
- * Location may fail because of IO failure, most likely in two cases:<p>
- * 1. druid don't have the write access to this location, most likely the
administrator doesn't config it correctly<p>
- * 2. disk failure, druid can't read/write to this disk anymore
- * <p>
- * Locations are fetched using {@link StorageLocationSelectorStrategy}.
- */
- private File loadSegmentWithRetry(DataSegment segment) throws
SegmentLoadingException
- {
- String segmentDir = getSegmentDir(segment);
-
- // Try the already reserved location. If location has been reserved
outside, then we do not release the location
- // here and simply delete any downloaded files. That is, we revert
anything we do in this function and nothing else.
- for (StorageLocation loc : locations) {
- if (loc.isReserved(segmentDir)) {
- File storageDir = loc.segmentDirectoryAsFile(segmentDir);
- boolean success = loadInLocationWithStartMarkerQuietly(loc, segment,
storageDir, false);
- if (!success) {
- throw new SegmentLoadingException(
- "Failed to load segment[%s] in reserved location[%s]",
segment.getId(), loc.getPath().getAbsolutePath()
+ final Iterator<StorageLocation> iterator = strategy.getLocations();
+ while (iterator.hasNext()) {
+ final StorageLocation location = iterator.next();
+ final StorageLocation.ReservationHold<SegmentCacheEntry> hold =
location.addWeakReservationHold(
+ identifier,
+ () -> new SegmentCacheEntry(dataSegment)
+ );
+ try {
+ if (hold != null) {
+ return new AcquireSegmentAction(
+ descriptor,
+ makeOnDemandLoadSupplier(dataSegment, hold.getEntry(), location),
+ hold
);
}
- return storageDir;
}
- }
-
- // No location was reserved so we try all the locations
- Iterator<StorageLocation> locationsIterator = strategy.getLocations();
- while (locationsIterator.hasNext()) {
-
- StorageLocation loc = locationsIterator.next();
-
- // storageDir is the file path corresponding to segment dir
- File storageDir = loc.reserve(segmentDir, segment);
- if (storageDir != null) {
- boolean success = loadInLocationWithStartMarkerQuietly(loc, segment,
storageDir, true);
- if (success) {
- return storageDir;
- }
+ catch (Throwable t) {
+ throw CloseableUtils.closeAndWrapInCatch(t, hold);
}
}
- throw new SegmentLoadingException("Failed to load segment[%s] in all
locations.", segment.getId());
+ throw new SegmentLoadingException(
+ "Unable to load segment[%s] on demand, ensure enough disk space has
been allocated to load all segments involved in the query",
+ dataSegment.getId()
+ );
}
- /**
- * A helper method over {@link #loadInLocationWithStartMarker(DataSegment,
File)} that catches the {@link SegmentLoadingException}
- * and emits alerts.
- * @param loc - {@link StorageLocation} where segment is to be downloaded in.
- * @param segment - {@link DataSegment} to download
- * @param storageDir - {@link File} pointing to segment directory
- * @param releaseLocation - Whether to release the location in case of
failures
- * @return - True if segment was downloaded successfully, false otherwise.
- */
- private boolean loadInLocationWithStartMarkerQuietly(StorageLocation loc,
DataSegment segment, File storageDir, boolean releaseLocation)
+ @Override
+ public void load(final DataSegment dataSegment) throws
SegmentLoadingException
{
- try {
- loadInLocationWithStartMarker(segment, storageDir);
- return true;
+ if (config.isVirtualStorageFabric()) {
Review Comment:
Make it an error to enable vsf and also set
`numThreadsToLoadSegmentsIntoPageCacheOnDownload`? Seems like in vsf mode, that
config is ignored.
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java:
##########
@@ -39,116 +43,96 @@ public interface SegmentCacheManager
boolean canHandleSegments();
/**
- * Return a list of cached segments from local disk, if any. This should be
called only
- * when {@link #canHandleSegments()} is true.
+ * Return a list of cached segments from local disk, if any. This should be
called only when
+ * {@link #canHandleSegments()} is true.
*/
List<DataSegment> getCachedSegments() throws IOException;
/**
- * Store a segment info file for the supplied segment on disk. This
operation is idempotent when called
- * multiple times for a given segment.
+ * Store a segment info file for the supplied segment on disk. This
operation is idempotent when called multiple
+ * times for a given segment.
*/
void storeInfoFile(DataSegment segment) throws IOException;
/**
- * Remove the segment info file for the supplied segment from disk. If the
file cannot be
- * deleted, do nothing.
+ * Remove the segment info file for the supplied segment from disk. If the
file cannot be deleted, do nothing.
*
- * @see SegmentCacheManager#cleanup(DataSegment)
+ * @see SegmentCacheManager#drop(DataSegment)
*/
void removeInfoFile(DataSegment segment);
+
/**
- * Returns a {@link ReferenceCountedSegmentProvider} that will be added by
the {@link org.apache.druid.server.SegmentManager}
- * to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This
method can be called multiple times
- * by the {@link org.apache.druid.server.SegmentManager} and implementation
can either return same {@link ReferenceCountedSegmentProvider}
- * or a different {@link ReferenceCountedSegmentProvider}. Caller should not
assume any particular behavior.
- * <p>
- * Returning a {@code ReferenceCountingSegment} will let custom
implementations keep track of reference count for
- * segments that the custom implementations are creating. That way, custom
implementations can know when the segment
- * is in use or not.
- * </p>
- * @param segment Segment to get on each download after service bootstrap
- * @throws SegmentLoadingException If there is an error in loading the
segment
- * @see SegmentCacheManager#getBootstrapSegment(DataSegment,
SegmentLazyLoadFailCallback)
+ * Given a {@link DataSegment}, which contains the instructions for where
and how to fetch a {@link Segment} from
+ * deep storage, this method tries to load and subsequently serve it to
callers via
+ * {@link #acquireSegment(DataSegment)} or {@link
#acquireSegment(DataSegment, SegmentDescriptor)}. If the segment
+ * cannot be loaded either due to error or insufficient storage space, this
method throws a
+ * {@link SegmentLoadingException}.
+ *
+ * @param segment Segment to get on each download (after service bootstrap)
+ * @throws SegmentLoadingException If there is an error in loading the
segment or insufficient storage space
+ * @see SegmentCacheManager#bootstrap(DataSegment,
SegmentLazyLoadFailCallback)
*/
- ReferenceCountedSegmentProvider getSegment(DataSegment segment) throws
SegmentLoadingException;
+ void load(DataSegment segment) throws SegmentLoadingException;
/**
- * Similar to {@link #getSegment(DataSegment)}, this method returns a {@link
ReferenceCountedSegmentProvider} that will be
- * added by the {@link org.apache.druid.server.SegmentManager} to the {@link
org.apache.druid.timeline.VersionedIntervalTimeline}
- * during startup on data nodes.
- * @param segment Segment to retrieve during service bootstrap
+ * Similar to {@link #load(DataSegment)}, this method loads segments during
startup on data nodes. Implementations of
+ * this method may be configured to use a larger than normal work pool that
only exists during startup and is shutdown
+ * after startup by calling {@link #shutdownBootstrap()}
+ *
+ * @param segment Segment to retrieve during service bootstrap
* @param loadFailed Callback to execute when segment lazy load failed. This
applies only when
* {@code lazyLoadOnStart} is enabled
- * @throws SegmentLoadingException - If there is an error in loading the
segment
- * @see SegmentCacheManager#getSegment(DataSegment)
+ * @throws SegmentLoadingException - If there is an error in loading the
segment or insufficient storage space
+ * @see SegmentCacheManager#load(DataSegment)
+ * @see SegmentCacheManager#shutdownBootstrap()
*/
- ReferenceCountedSegmentProvider getBootstrapSegment(
- DataSegment segment,
- SegmentLazyLoadFailCallback loadFailed
- ) throws SegmentLoadingException;
+ void bootstrap(DataSegment segment, SegmentLazyLoadFailCallback loadFailed)
throws SegmentLoadingException;
/**
- * This method fetches the files for the given segment if the segment is not
downloaded already. It
- * is not required to {@link #reserve(DataSegment)} before calling this
method. If caller has not reserved
- * the space explicitly via {@link #reserve(DataSegment)}, the
implementation should reserve space on caller's
- * behalf.
- * If the space has been explicitly reserved already
- * - implementation should use only the reserved space to store segment
files.
- * - implementation should not release the location in case of download
erros and leave it to the caller.
- * @throws SegmentLoadingException if there is an error in downloading files
- */
- File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
-
- /**
- * Asynchronously load the supplied segment into the page cache on each
download after the service finishes bootstrapping.
- * Equivalent to `cat segment_files > /dev/null` to force loading the
segment index files into page cache so that
- * later when the segment is queried, they are already in page cache and
only a minor page fault needs to be triggered
- * instead of a major page fault to make the query latency more consistent.
+ * Cleanup the segment files cache space used by the segment, releasing the
{@link StorageLocation} reservation
*
- * @see SegmentCacheManager#loadSegmentIntoPageCacheOnBootstrap(DataSegment)
+ * @see SegmentCacheManager#removeInfoFile(DataSegment)
*/
- void loadSegmentIntoPageCache(DataSegment segment);
+ void drop(DataSegment segment);
/**
- * Similar to {@link #loadSegmentIntoPageCache(DataSegment)}, but
asynchronously load the supplied segment into the
- * page cache during service bootstrap.
- *
- * @see SegmentCacheManager#loadSegmentIntoPageCache(DataSegment)
+ * Applies a {@link SegmentMapFunction} to a {@link Segment} if it is
available in the cache. If not present in any
+ * storage location, this method will not attempt to download the {@link
DataSegment} from deep storage. The
+ * {@link Segment} returned by this method is considered an open reference,
cache implementations must not allow it
+ * to be dropped until it has been closed. As such, the returned {@link
Segment} must be closed when the caller is
+ * finished doing segment things.
*/
- void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment);
+ Optional<Segment> acquireSegment(DataSegment dataSegment);
/**
- * Shutdown any previously set up bootstrap executor to save resources.
- * This should be called after loading bootstrap segments into the page
cache.
+ * Returns a {@link AcquireSegmentAction} for a given {@link DataSegment}
and {@link SegmentDescriptor}, which returns
+ * the {@link Segment} if already present in the cache, or tries to fetch
from deep storage and map if not. The
+ * {@link Segment} returned by this method is considered an open reference,
cache implementations must not allow it
+ * to be dropped until it has been closed. As such, the returned {@link
Segment} must be closed when the caller is
+ * finished doing segment things.
*/
- void shutdownBootstrap();
-
- boolean reserve(DataSegment segment);
+ AcquireSegmentAction acquireSegment(
+ DataSegment dataSegment,
+ SegmentDescriptor descriptor
Review Comment:
It's kind of weird that this method takes a `SegmentDescriptor`. All it does
with it is return it back to the caller in the `AcquireSegmentAction`, so the
caller can have it bundled together with the segment future. I suppose it's
fine but I wonder if it would make things better to remove it.
##########
processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java:
##########
@@ -364,4 +374,85 @@ private ColumnAnalysis analyzeArrayColumn(final
ColumnCapabilities capabilities)
{
return ColumnAnalysis.builder().withCapabilities(capabilities).build();
}
+
+ public static Segment makeVirtualPlaceholderSegment(DataSegment dataSegment)
Review Comment:
IMO, even though this is only needed by `SegmentAnalyzer`, it would be best
to move it to its own file. It's only tangentially related to the main
functionality of `SegmentAnalyzer`.
##########
processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java:
##########
@@ -173,6 +175,97 @@ public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor)
);
}
+ public static ExecutorService newBlockingCached(
+ final String nameFormat,
+ int minThreads,
+ int maxThreads,
+ long keepAliveTime,
+ TimeUnit keepAliveTimeUnit,
+ final Integer priority
+ )
+ {
+ final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+ if (minThreads == maxThreads) {
+ return new ThreadPoolExecutor(
+ minThreads,
+ maxThreads,
+ keepAliveTime,
+ keepAliveTimeUnit,
+ queue,
+ makeThreadFactory(nameFormat, priority),
+ (r, executor) -> {
+ if (executor.isShutdown()) {
+ throw new RejectedExecutionException("Executor is shutdown,
rejecting task");
+ }
+ try {
+ executor.getQueue().put(r);
+ }
+ catch (InterruptedException e) {
+ throw new RejectedExecutionException("Got Interrupted while
adding to the Queue", e);
+ }
+ }
+ );
+ }
+ return new ThreadPoolExecutor(
+ minThreads,
+ maxThreads,
+ keepAliveTime,
+ keepAliveTimeUnit,
+ queue,
+ makeThreadFactory(nameFormat, priority),
+ (r, executor) -> {
+ if (executor.isShutdown()) {
+ throw new RejectedExecutionException("Executor is shutdown,
rejecting task");
+ }
+ try {
+ executor.getQueue().put(r);
+ }
+ catch (InterruptedException e) {
+ throw new RejectedExecutionException("Got Interrupted while adding
to the Queue", e);
+ }
+ }
+ )
+ {
+ private int running = 0;
+
+ @Override
+ public void execute(Runnable command)
+ {
+ synchronized (this) {
+ running++;
+ growIfNeeded();
Review Comment:
what's all this for? Don't the builtin caching thread pools handle this sort
of thing on their own?
##########
processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java:
##########
@@ -173,6 +175,97 @@ public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor)
);
}
+ public static ExecutorService newBlockingCached(
+ final String nameFormat,
+ int minThreads,
+ int maxThreads,
+ long keepAliveTime,
+ TimeUnit keepAliveTimeUnit,
+ final Integer priority
+ )
+ {
+ final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+ if (minThreads == maxThreads) {
+ return new ThreadPoolExecutor(
+ minThreads,
+ maxThreads,
+ keepAliveTime,
+ keepAliveTimeUnit,
+ queue,
+ makeThreadFactory(nameFormat, priority),
+ (r, executor) -> {
+ if (executor.isShutdown()) {
+ throw new RejectedExecutionException("Executor is shutdown,
rejecting task");
+ }
+ try {
+ executor.getQueue().put(r);
+ }
+ catch (InterruptedException e) {
+ throw new RejectedExecutionException("Got Interrupted while
adding to the Queue", e);
+ }
+ }
+ );
+ }
+ return new ThreadPoolExecutor(
+ minThreads,
+ maxThreads,
+ keepAliveTime,
+ keepAliveTimeUnit,
+ queue,
+ makeThreadFactory(nameFormat, priority),
+ (r, executor) -> {
+ if (executor.isShutdown()) {
+ throw new RejectedExecutionException("Executor is shutdown,
rejecting task");
+ }
+ try {
+ executor.getQueue().put(r);
+ }
+ catch (InterruptedException e) {
+ throw new RejectedExecutionException("Got Interrupted while adding
to the Queue", e);
+ }
+ }
+ )
+ {
+ private int running = 0;
Review Comment:
`@GuardedBy("this")`
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java:
##########
@@ -39,116 +43,96 @@ public interface SegmentCacheManager
boolean canHandleSegments();
/**
- * Return a list of cached segments from local disk, if any. This should be
called only
- * when {@link #canHandleSegments()} is true.
+ * Return a list of cached segments from local disk, if any. This should be
called only when
+ * {@link #canHandleSegments()} is true.
*/
List<DataSegment> getCachedSegments() throws IOException;
/**
- * Store a segment info file for the supplied segment on disk. This
operation is idempotent when called
- * multiple times for a given segment.
+ * Store a segment info file for the supplied segment on disk. This
operation is idempotent when called multiple
+ * times for a given segment.
*/
void storeInfoFile(DataSegment segment) throws IOException;
/**
- * Remove the segment info file for the supplied segment from disk. If the
file cannot be
- * deleted, do nothing.
+ * Remove the segment info file for the supplied segment from disk. If the
file cannot be deleted, do nothing.
*
- * @see SegmentCacheManager#cleanup(DataSegment)
+ * @see SegmentCacheManager#drop(DataSegment)
*/
void removeInfoFile(DataSegment segment);
+
/**
- * Returns a {@link ReferenceCountedSegmentProvider} that will be added by
the {@link org.apache.druid.server.SegmentManager}
- * to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This
method can be called multiple times
- * by the {@link org.apache.druid.server.SegmentManager} and implementation
can either return same {@link ReferenceCountedSegmentProvider}
- * or a different {@link ReferenceCountedSegmentProvider}. Caller should not
assume any particular behavior.
- * <p>
- * Returning a {@code ReferenceCountingSegment} will let custom
implementations keep track of reference count for
- * segments that the custom implementations are creating. That way, custom
implementations can know when the segment
- * is in use or not.
- * </p>
- * @param segment Segment to get on each download after service bootstrap
- * @throws SegmentLoadingException If there is an error in loading the
segment
- * @see SegmentCacheManager#getBootstrapSegment(DataSegment,
SegmentLazyLoadFailCallback)
+ * Given a {@link DataSegment}, which contains the instructions for where
and how to fetch a {@link Segment} from
+ * deep storage, this method tries to load and subsequently serve it to
callers via
+ * {@link #acquireSegment(DataSegment)} or {@link
#acquireSegment(DataSegment, SegmentDescriptor)}. If the segment
+ * cannot be loaded either due to error or insufficient storage space, this
method throws a
+ * {@link SegmentLoadingException}.
+ *
+ * @param segment Segment to get on each download (after service bootstrap)
+ * @throws SegmentLoadingException If there is an error in loading the
segment or insufficient storage space
+ * @see SegmentCacheManager#bootstrap(DataSegment,
SegmentLazyLoadFailCallback)
*/
- ReferenceCountedSegmentProvider getSegment(DataSegment segment) throws
SegmentLoadingException;
+ void load(DataSegment segment) throws SegmentLoadingException;
/**
- * Similar to {@link #getSegment(DataSegment)}, this method returns a {@link
ReferenceCountedSegmentProvider} that will be
- * added by the {@link org.apache.druid.server.SegmentManager} to the {@link
org.apache.druid.timeline.VersionedIntervalTimeline}
- * during startup on data nodes.
- * @param segment Segment to retrieve during service bootstrap
+ * Similar to {@link #load(DataSegment)}, this method loads segments during
startup on data nodes. Implementations of
+ * this method may be configured to use a larger than normal work pool that
only exists during startup and is shutdown
+ * after startup by calling {@link #shutdownBootstrap()}
+ *
+ * @param segment Segment to retrieve during service bootstrap
* @param loadFailed Callback to execute when segment lazy load failed. This
applies only when
* {@code lazyLoadOnStart} is enabled
- * @throws SegmentLoadingException - If there is an error in loading the
segment
- * @see SegmentCacheManager#getSegment(DataSegment)
+ * @throws SegmentLoadingException - If there is an error in loading the
segment or insufficient storage space
+ * @see SegmentCacheManager#load(DataSegment)
+ * @see SegmentCacheManager#shutdownBootstrap()
*/
- ReferenceCountedSegmentProvider getBootstrapSegment(
- DataSegment segment,
- SegmentLazyLoadFailCallback loadFailed
- ) throws SegmentLoadingException;
+ void bootstrap(DataSegment segment, SegmentLazyLoadFailCallback loadFailed)
throws SegmentLoadingException;
Review Comment:
`loadOnBootstrap`? I was thinking having both start with `load` emphasizes
the similarities.
##########
server/src/main/java/org/apache/druid/segment/loading/CacheEntry.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Common interface for items stored in a {@link StorageLocation}
+ */
+public interface CacheEntry
+{
+ /**
+ * Unique identifier for the cache entry
+ */
+ CacheEntryIdentifier getId();
+
+ /**
+ * Size in bytes of the cache entry
+ */
+ long getSize();
+
+ boolean isMounted();
+
+ /**
+ * Materializes the cache entry into the assigned {@link StorageLocation}.
If a cache entry is already mounted in the
Review Comment:
Does this arrange for it to be materialized in the background, or does it
block?
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageFabricTest.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.LocalInputSource;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule;
+import org.apache.druid.msq.dart.guice.DartControllerModule;
+import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule;
+import org.apache.druid.msq.dart.guice.DartWorkerModule;
+import org.apache.druid.msq.guice.IndexerMemoryManagementModule;
+import org.apache.druid.msq.guice.MSQDurableStorageModule;
+import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.guice.MSQSqlModule;
+import org.apache.druid.msq.guice.SqlTaskModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.DruidProcessingConfigTest;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.apache.druid.testing.embedded.msq.EmbeddedDurableShuffleStorageTest;
+import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.com.google.common.io.ByteStreams;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Virtual storage fabric mode tests for classic native JSON queries
+ */
+class QueryVirtualStorageFabricTest extends EmbeddedClusterTestBase
+{
+ private final EmbeddedBroker broker = new EmbeddedBroker();
+ private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedHistorical historical = new EmbeddedHistorical();
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+ private final EmbeddedRouter router = new EmbeddedRouter();
+ private final MinIOStorageResource storageResource = new
MinIOStorageResource();
+
+ private EmbeddedMSQApis msqApis;
+
+ @Override
+ public EmbeddedDruidCluster createCluster()
+ {
+ historical.addProperty("druid.segmentCache.isVirtualStorageFabric", "true")
Review Comment:
the property seems oddly verbose to me. how about `druid.segmentCache.vsf`
or `druid.segmentCache.virtualStorage`. same goes for
`minVirtualStorageFabricLoadThreads`, I'd suggest like `minVsfLoadThreads` or
`minVirtualStorageLoadThreads`.
##########
server/src/main/java/org/apache/druid/segment/loading/CacheEntry.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Common interface for items stored in a {@link StorageLocation}
+ */
+public interface CacheEntry
+{
+ /**
+ * Unique identifier for the cache entry
+ */
+ CacheEntryIdentifier getId();
+
+ /**
+ * Size in bytes of the cache entry
+ */
+ long getSize();
+
+ boolean isMounted();
+
+ /**
+ * Materializes the cache entry into the assigned {@link StorageLocation}.
If a cache entry is already mounted in the
+ * location, calling this method should be a no-op. If the cache entry is
mounted in a different location, this method
+ * will unmount the item from the other location and mount in the new
location.
+ */
+ void mount(File location) throws IOException, SegmentLoadingException;
+
+ /**
+ * Removes the physical artifacts of a cache entry from the location it is
currently mounted
Review Comment:
Does this arrange for background removal, or does it block?
##########
processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This class represents an intent to acquire a reference to a {@link Segment}
and then use it to do stuff, finally
+ * closing when done. When an {@link AcquireSegmentAction} is created, segment
cache implementations will create a
+ * 'hold' to ensure it cannot be removed from the cache until this action has
been closed. {@link #getSegmentFuture()}
+ * will return the segment if already cached, or attempt to download from deep
storage to load into the cache if not.
+ * The {@link Segment} returned by the future places a separate hold on the
cache until the segment itself is closed,
+ * and MUST be closed when the caller is finished doing segment things with
it. The caller must also call
+ * {@link #close()} on this object to clean up the hold that exists while
possibly loading the segment, and may do so
+ * as soon as the {@link Segment} is acquired (or can do so earlier to abort
the load and release the hold).
+ */
+public class AcquireSegmentAction implements Closeable
+{
+ public static final Closeable NOOP_CLEANUP = () -> {};
+
+ public static List<SegmentReference> mapAllSegments(
+ List<AcquireSegmentAction> acquireSegmentActions,
+ SegmentMapFunction segmentMapFunction,
+ long timeoutAt
+ )
+ {
+ final Closer safetyNet = Closer.create();
+ Throwable failure = null;
+
+ // getting the future kicks off any background action, so materialize them
all to a list to get things started
+ final List<Future<Optional<Segment>>> futures = new
ArrayList<>(acquireSegmentActions.size());
+ for (AcquireSegmentAction acquireSegmentAction : acquireSegmentActions) {
+ safetyNet.register(acquireSegmentAction);
+ // if we haven't failed yet, keep collecing futures (we always want to
collect the actions themselves though
Review Comment:
collecting (spelling)
##########
server/src/main/java/org/apache/druid/segment/loading/CacheEntry.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Common interface for items stored in a {@link StorageLocation}
+ */
+public interface CacheEntry
+{
+ /**
+ * Unique identifier for the cache entry
+ */
+ CacheEntryIdentifier getId();
+
+ /**
+ * Size in bytes of the cache entry
+ */
+ long getSize();
+
+ boolean isMounted();
+
+ /**
+ * Materializes the cache entry into the assigned {@link StorageLocation}.
If a cache entry is already mounted in the
+ * location, calling this method should be a no-op. If the cache entry is
mounted in a different location, this method
+ * will unmount the item from the other location and mount in the new
location.
Review Comment:
> If the cache entry is mounted in a different location, this method will
unmount the item from the other location and mount in the new location
This seems interesting, I'm wondering what's the use case for it?
##########
processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This class represents an intent to acquire a reference to a {@link Segment}
and then use it to do stuff, finally
+ * closing when done. When an {@link AcquireSegmentAction} is created, segment
cache implementations will create a
+ * 'hold' to ensure it cannot be removed from the cache until this action has
been closed. {@link #getSegmentFuture()}
+ * will return the segment if already cached, or attempt to download from deep
storage to load into the cache if not.
+ * The {@link Segment} returned by the future places a separate hold on the
cache until the segment itself is closed,
+ * and MUST be closed when the caller is finished doing segment things with
it. The caller must also call
+ * {@link #close()} on this object to clean up the hold that exists while
possibly loading the segment, and may do so
+ * as soon as the {@link Segment} is acquired (or can do so earlier to abort
the load and release the hold).
+ */
+public class AcquireSegmentAction implements Closeable
+{
+ public static final Closeable NOOP_CLEANUP = () -> {};
+
+ public static List<SegmentReference> mapAllSegments(
+ List<AcquireSegmentAction> acquireSegmentActions,
+ SegmentMapFunction segmentMapFunction,
+ long timeoutAt
+ )
+ {
+ final Closer safetyNet = Closer.create();
+ Throwable failure = null;
+
+ // getting the future kicks off any background action, so materialize them
all to a list to get things started
+ final List<Future<Optional<Segment>>> futures = new
ArrayList<>(acquireSegmentActions.size());
+ for (AcquireSegmentAction acquireSegmentAction : acquireSegmentActions) {
+ safetyNet.register(acquireSegmentAction);
+ // if we haven't failed yet, keep collecing futures (we always want to
collect the actions themselves though
+ // to close
Review Comment:
comment appears to be cut off
##########
processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This class represents an intent to acquire a reference to a {@link Segment}
and then use it to do stuff, finally
+ * closing when done. When an {@link AcquireSegmentAction} is created, segment
cache implementations will create a
+ * 'hold' to ensure it cannot be removed from the cache until this action has
been closed. {@link #getSegmentFuture()}
+ * will return the segment if already cached, or attempt to download from deep
storage to load into the cache if not.
+ * The {@link Segment} returned by the future places a separate hold on the
cache until the segment itself is closed,
+ * and MUST be closed when the caller is finished doing segment things with
it. The caller must also call
+ * {@link #close()} on this object to clean up the hold that exists while
possibly loading the segment, and may do so
+ * as soon as the {@link Segment} is acquired (or can do so earlier to abort
the load and release the hold).
+ */
+public class AcquireSegmentAction implements Closeable
+{
+ public static final Closeable NOOP_CLEANUP = () -> {};
+
+ public static List<SegmentReference> mapAllSegments(
+ List<AcquireSegmentAction> acquireSegmentActions,
+ SegmentMapFunction segmentMapFunction,
+ long timeoutAt
+ )
+ {
+ final Closer safetyNet = Closer.create();
+ Throwable failure = null;
+
+ // getting the future kicks off any background action, so materialize them
all to a list to get things started
+ final List<Future<Optional<Segment>>> futures = new
ArrayList<>(acquireSegmentActions.size());
+ for (AcquireSegmentAction acquireSegmentAction : acquireSegmentActions) {
+ safetyNet.register(acquireSegmentAction);
+ // if we haven't failed yet, keep collecing futures (we always want to
collect the actions themselves though
+ // to close
+ if (failure == null) {
+ try {
+ futures.add(acquireSegmentAction.getSegmentFuture());
+ }
+ catch (Throwable t) {
+ failure = t;
+ }
+ } else {
+ futures.add(Futures.immediateFuture(Optional.empty()));
+ }
+ }
+
+
+ final List<SegmentReference> segmentReferences = new
ArrayList<>(acquireSegmentActions.size());
+ for (int i = 0; i < acquireSegmentActions.size(); i++) {
+ // if anything fails, want to ignore it initially so we can collect all
additional futures to properly clean up
+ // all references before rethrowing the error
+ try {
+ final AcquireSegmentAction action = acquireSegmentActions.get(i);
+ final Future<Optional<Segment>> future = futures.get(i);
+ final Optional<Segment> segment = future.get(timeoutAt -
System.currentTimeMillis(), TimeUnit.MILLISECONDS)
Review Comment:
It's possible that the `Segment` gets created but this `future.get` throws
an error (maybe the `Segment` gets created a split second after the
`future.get` times out; maybe the `future.get` is interrupted; etc). In this
case the `Segment` won't be registered with the `safetyNet`. Is that bad?
If it is bad, it could be fixed by catching timeout exceptions from
`future.get` and, in that case, adding a listener to the `future` that closes
the segment if it ever does end up being generated.
##########
processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This class represents an intent to acquire a reference to a {@link Segment}
and then use it to do stuff, finally
+ * closing when done. When an {@link AcquireSegmentAction} is created, segment
cache implementations will create a
+ * 'hold' to ensure it cannot be removed from the cache until this action has
been closed. {@link #getSegmentFuture()}
+ * will return the segment if already cached, or attempt to download from deep
storage to load into the cache if not.
+ * The {@link Segment} returned by the future places a separate hold on the
cache until the segment itself is closed,
+ * and MUST be closed when the caller is finished doing segment things with
it. The caller must also call
+ * {@link #close()} on this object to clean up the hold that exists while
possibly loading the segment, and may do so
+ * as soon as the {@link Segment} is acquired (or can do so earlier to abort
the load and release the hold).
+ */
+public class AcquireSegmentAction implements Closeable
+{
+ public static final Closeable NOOP_CLEANUP = () -> {};
+
+ public static List<SegmentReference> mapAllSegments(
+ List<AcquireSegmentAction> acquireSegmentActions,
+ SegmentMapFunction segmentMapFunction,
+ long timeoutAt
+ )
+ {
+ final Closer safetyNet = Closer.create();
+ Throwable failure = null;
+
+ // getting the future kicks off any background action, so materialize them
all to a list to get things started
+ final List<Future<Optional<Segment>>> futures = new
ArrayList<>(acquireSegmentActions.size());
+ for (AcquireSegmentAction acquireSegmentAction : acquireSegmentActions) {
+ safetyNet.register(acquireSegmentAction);
+ // if we haven't failed yet, keep collecing futures (we always want to
collect the actions themselves though
+ // to close
+ if (failure == null) {
+ try {
+ futures.add(acquireSegmentAction.getSegmentFuture());
+ }
+ catch (Throwable t) {
+ failure = t;
+ }
+ } else {
+ futures.add(Futures.immediateFuture(Optional.empty()));
+ }
+ }
+
+
+ final List<SegmentReference> segmentReferences = new
ArrayList<>(acquireSegmentActions.size());
+ for (int i = 0; i < acquireSegmentActions.size(); i++) {
+ // if anything fails, want to ignore it initially so we can collect all
additional futures to properly clean up
+ // all references before rethrowing the error
+ try {
+ final AcquireSegmentAction action = acquireSegmentActions.get(i);
+ final Future<Optional<Segment>> future = futures.get(i);
+ final Optional<Segment> segment = future.get(timeoutAt -
System.currentTimeMillis(), TimeUnit.MILLISECONDS)
+ .map(safetyNet::register);
+ segmentReferences.add(
+ new SegmentReference(
+ action.getDescriptor(),
+ segmentMapFunction.apply(segment),
+ action
+ )
+ );
+ }
+ catch (Throwable t) {
Review Comment:
Can one of these `Throwable` be an `InterruptedException`? If that's
possible, should check for it and re-set the interrupt flag. Same question for
the other `catch (Throwable t)` higher up in the method.
##########
server/src/main/java/org/apache/druid/server/ServerManager.java:
##########
@@ -233,23 +287,37 @@ protected <T> FunctionalIterable<QueryRunner<T>>
getQueryRunnersForSegments(
final Closer closer
)
{
+ final List<SegmentReference> segmentReferences;
+ // this is kind of gross, but otherwise we're going to be loading weak
assignments more or less as soon as they
+ // are assigned instead of on demand at query time
+ if (query instanceof SegmentMetadataQuery) {
+ segmentReferences = getSegmentMetadataSegmentReferences(timeline, specs,
segmentMapFn);
+ } else {
+ segmentReferences = getAndLoadAllSegmentReferences(
+ timeline,
+ specs,
+ segmentMapFn,
+ query.context().getTimeout()
+ );
+ }
return FunctionalIterable
- .create(acquireAllSegments(timeline, specs, segmentMapFn, closer))
+ .create(segmentReferences)
.transform(
ref ->
- ref.getSegmentReference()
- .map(segment ->
- buildQueryRunnerForSegment(
- ref.getSegmentDescriptor(),
- segment,
- factory,
- toolChest,
- cpuTimeAccumulator,
- cacheKeyPrefix
- )
- ).orElse(
- new
ReportTimelineMissingSegmentQueryRunner<>(ref.getSegmentDescriptor())
- )
+ closer.register(ref)
Review Comment:
I think this doesn't register `ref` with `closer` until the query runner for
`ref` is actually created. This might not happen immediately, it depends on
what `QueryRunnerFactory#mergeRunners` does. Also, if runner creation for some
`ref` throws an error for some reason, future ones won't get registered at all.
Or if runner creation is intentionally skipped, which I think is possible for
`ScanQueryRunnerFactory` with `limit` (IIRC, it will skip creation of runners
after reaching `limit`).
Does any of this cause problems?
##########
processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This class represents an intent to acquire a reference to a {@link Segment}
and then use it to do stuff, finally
+ * closing when done. When an {@link AcquireSegmentAction} is created, segment
cache implementations will create a
+ * 'hold' to ensure it cannot be removed from the cache until this action has
been closed. {@link #getSegmentFuture()}
+ * will return the segment if already cached, or attempt to download from deep
storage to load into the cache if not.
+ * The {@link Segment} returned by the future places a separate hold on the
cache until the segment itself is closed,
+ * and MUST be closed when the caller is finished doing segment things with
it. The caller must also call
+ * {@link #close()} on this object to clean up the hold that exists while
possibly loading the segment, and may do so
+ * as soon as the {@link Segment} is acquired (or can do so earlier to abort
the load and release the hold).
+ */
+public class AcquireSegmentAction implements Closeable
+{
+ public static final Closeable NOOP_CLEANUP = () -> {};
+
+ public static List<SegmentReference> mapAllSegments(
+ List<AcquireSegmentAction> acquireSegmentActions,
+ SegmentMapFunction segmentMapFunction,
+ long timeoutAt
+ )
+ {
+ final Closer safetyNet = Closer.create();
+ Throwable failure = null;
+
+ // getting the future kicks off any background action, so materialize them
all to a list to get things started
+ final List<Future<Optional<Segment>>> futures = new
ArrayList<>(acquireSegmentActions.size());
+ for (AcquireSegmentAction acquireSegmentAction : acquireSegmentActions) {
+ safetyNet.register(acquireSegmentAction);
+ // if we haven't failed yet, keep collecing futures (we always want to
collect the actions themselves though
+ // to close
+ if (failure == null) {
+ try {
+ futures.add(acquireSegmentAction.getSegmentFuture());
+ }
+ catch (Throwable t) {
+ failure = t;
+ }
+ } else {
+ futures.add(Futures.immediateFuture(Optional.empty()));
+ }
+ }
+
+
+ final List<SegmentReference> segmentReferences = new
ArrayList<>(acquireSegmentActions.size());
+ for (int i = 0; i < acquireSegmentActions.size(); i++) {
+ // if anything fails, want to ignore it initially so we can collect all
additional futures to properly clean up
+ // all references before rethrowing the error
+ try {
+ final AcquireSegmentAction action = acquireSegmentActions.get(i);
+ final Future<Optional<Segment>> future = futures.get(i);
+ final Optional<Segment> segment = future.get(timeoutAt -
System.currentTimeMillis(), TimeUnit.MILLISECONDS)
+ .map(safetyNet::register);
+ segmentReferences.add(
+ new SegmentReference(
+ action.getDescriptor(),
+ segmentMapFunction.apply(segment),
+ action
+ )
+ );
+ }
+ catch (Throwable t) {
+ if (failure == null) {
+ failure = t;
+ } else {
+ failure.addSuppressed(t);
+ }
+ }
+ }
+ if (failure != null) {
+ throw CloseableUtils.closeAndWrapInCatch(failure, safetyNet);
+ }
+ return segmentReferences;
+ }
+
+ public static AcquireSegmentAction missingSegment(final SegmentDescriptor
descriptor)
+ {
+ return new AcquireSegmentAction(descriptor, () ->
Futures.immediateFuture(Optional.empty()), NOOP_CLEANUP);
+ }
+
+ private final SegmentDescriptor segmentDescriptor;
+ private final Supplier<ListenableFuture<Optional<Segment>>>
segmentFutureSupplier;
+ private final Closeable loadCleanup;
+
+ public AcquireSegmentAction(
+ SegmentDescriptor segmentDescriptor,
+ Supplier<ListenableFuture<Optional<Segment>>> segmentFutureSupplier,
+ Closeable loadCleanup
+ )
+ {
+ this.segmentDescriptor = segmentDescriptor;
+ this.segmentFutureSupplier = segmentFutureSupplier;
+ this.loadCleanup = loadCleanup;
+ }
+
+ public SegmentDescriptor getDescriptor()
+ {
+ return segmentDescriptor;
+ }
+
+ public ListenableFuture<Optional<Segment>> getSegmentFuture()
+ {
+ return segmentFutureSupplier.get();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ loadCleanup.close();
Review Comment:
Any concern about accidental double-close? Could put in something defensive
here, if so.
##########
processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This class represents an intent to acquire a reference to a {@link Segment}
and then use it to do stuff, finally
+ * closing when done. When an {@link AcquireSegmentAction} is created, segment
cache implementations will create a
+ * 'hold' to ensure it cannot be removed from the cache until this action has
been closed. {@link #getSegmentFuture()}
+ * will return the segment if already cached, or attempt to download from deep
storage to load into the cache if not.
+ * The {@link Segment} returned by the future places a separate hold on the
cache until the segment itself is closed,
+ * and MUST be closed when the caller is finished doing segment things with
it. The caller must also call
+ * {@link #close()} on this object to clean up the hold that exists while
possibly loading the segment, and may do so
+ * as soon as the {@link Segment} is acquired (or can do so earlier to abort
the load and release the hold).
+ */
+public class AcquireSegmentAction implements Closeable
+{
+ public static final Closeable NOOP_CLEANUP = () -> {};
+
+ public static List<SegmentReference> mapAllSegments(
+ List<AcquireSegmentAction> acquireSegmentActions,
+ SegmentMapFunction segmentMapFunction,
+ long timeoutAt
+ )
+ {
+ final Closer safetyNet = Closer.create();
+ Throwable failure = null;
+
+ // getting the future kicks off any background action, so materialize them
all to a list to get things started
+ final List<Future<Optional<Segment>>> futures = new
ArrayList<>(acquireSegmentActions.size());
+ for (AcquireSegmentAction acquireSegmentAction : acquireSegmentActions) {
+ safetyNet.register(acquireSegmentAction);
+ // if we haven't failed yet, keep collecing futures (we always want to
collect the actions themselves though
+ // to close
+ if (failure == null) {
+ try {
+ futures.add(acquireSegmentAction.getSegmentFuture());
+ }
+ catch (Throwable t) {
+ failure = t;
+ }
+ } else {
+ futures.add(Futures.immediateFuture(Optional.empty()));
+ }
+ }
+
+
+ final List<SegmentReference> segmentReferences = new
ArrayList<>(acquireSegmentActions.size());
+ for (int i = 0; i < acquireSegmentActions.size(); i++) {
+ // if anything fails, want to ignore it initially so we can collect all
additional futures to properly clean up
+ // all references before rethrowing the error
+ try {
+ final AcquireSegmentAction action = acquireSegmentActions.get(i);
+ final Future<Optional<Segment>> future = futures.get(i);
+ final Optional<Segment> segment = future.get(timeoutAt -
System.currentTimeMillis(), TimeUnit.MILLISECONDS)
+ .map(safetyNet::register);
+ segmentReferences.add(
+ new SegmentReference(
+ action.getDescriptor(),
+ segmentMapFunction.apply(segment),
+ action
+ )
+ );
+ }
+ catch (Throwable t) {
+ if (failure == null) {
+ failure = t;
+ } else {
+ failure.addSuppressed(t);
+ }
+ }
+ }
+ if (failure != null) {
+ throw CloseableUtils.closeAndWrapInCatch(failure, safetyNet);
+ }
+ return segmentReferences;
+ }
+
+ public static AcquireSegmentAction missingSegment(final SegmentDescriptor
descriptor)
+ {
+ return new AcquireSegmentAction(descriptor, () ->
Futures.immediateFuture(Optional.empty()), NOOP_CLEANUP);
+ }
+
+ private final SegmentDescriptor segmentDescriptor;
+ private final Supplier<ListenableFuture<Optional<Segment>>>
segmentFutureSupplier;
+ private final Closeable loadCleanup;
+
+ public AcquireSegmentAction(
+ SegmentDescriptor segmentDescriptor,
+ Supplier<ListenableFuture<Optional<Segment>>> segmentFutureSupplier,
+ Closeable loadCleanup
+ )
+ {
+ this.segmentDescriptor = segmentDescriptor;
+ this.segmentFutureSupplier = segmentFutureSupplier;
+ this.loadCleanup = loadCleanup;
+ }
+
+ public SegmentDescriptor getDescriptor()
+ {
+ return segmentDescriptor;
+ }
+
+ public ListenableFuture<Optional<Segment>> getSegmentFuture()
+ {
+ return segmentFutureSupplier.get();
Review Comment:
Seems odd that the supplier isn't memoized, since it means any time someone
calls `getSegmentFuture()` on an `AcquireSegmentAction`, this thing will get
resubmitted. For a method named `get` I would expect repeated calls to return
the same future.
##########
server/src/main/java/org/apache/druid/server/ServerManager.java:
##########
@@ -233,23 +287,37 @@ protected <T> FunctionalIterable<QueryRunner<T>>
getQueryRunnersForSegments(
final Closer closer
)
{
+ final List<SegmentReference> segmentReferences;
+ // this is kind of gross, but otherwise we're going to be loading weak
assignments more or less as soon as they
Review Comment:
Will need to be documented.
##########
processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This class represents an intent to acquire a reference to a {@link Segment}
and then use it to do stuff, finally
+ * closing when done. When an {@link AcquireSegmentAction} is created, segment
cache implementations will create a
+ * 'hold' to ensure it cannot be removed from the cache until this action has
been closed. {@link #getSegmentFuture()}
+ * will return the segment if already cached, or attempt to download from deep
storage to load into the cache if not.
+ * The {@link Segment} returned by the future places a separate hold on the
cache until the segment itself is closed,
+ * and MUST be closed when the caller is finished doing segment things with
it. The caller must also call
+ * {@link #close()} on this object to clean up the hold that exists while
possibly loading the segment, and may do so
+ * as soon as the {@link Segment} is acquired (or can do so earlier to abort
the load and release the hold).
+ */
+public class AcquireSegmentAction implements Closeable
+{
+ public static final Closeable NOOP_CLEANUP = () -> {};
+
+ public static List<SegmentReference> mapAllSegments(
+ List<AcquireSegmentAction> acquireSegmentActions,
+ SegmentMapFunction segmentMapFunction,
+ long timeoutAt
+ )
+ {
+ final Closer safetyNet = Closer.create();
+ Throwable failure = null;
+
+ // getting the future kicks off any background action, so materialize them
all to a list to get things started
+ final List<Future<Optional<Segment>>> futures = new
ArrayList<>(acquireSegmentActions.size());
+ for (AcquireSegmentAction acquireSegmentAction : acquireSegmentActions) {
+ safetyNet.register(acquireSegmentAction);
+ // if we haven't failed yet, keep collecing futures (we always want to
collect the actions themselves though
+ // to close
+ if (failure == null) {
+ try {
+ futures.add(acquireSegmentAction.getSegmentFuture());
+ }
+ catch (Throwable t) {
+ failure = t;
+ }
+ } else {
+ futures.add(Futures.immediateFuture(Optional.empty()));
+ }
+ }
+
+
+ final List<SegmentReference> segmentReferences = new
ArrayList<>(acquireSegmentActions.size());
+ for (int i = 0; i < acquireSegmentActions.size(); i++) {
+ // if anything fails, want to ignore it initially so we can collect all
additional futures to properly clean up
+ // all references before rethrowing the error
+ try {
+ final AcquireSegmentAction action = acquireSegmentActions.get(i);
+ final Future<Optional<Segment>> future = futures.get(i);
+ final Optional<Segment> segment = future.get(timeoutAt -
System.currentTimeMillis(), TimeUnit.MILLISECONDS)
+ .map(safetyNet::register);
+ segmentReferences.add(
+ new SegmentReference(
+ action.getDescriptor(),
+ segmentMapFunction.apply(segment),
+ action
+ )
+ );
+ }
+ catch (Throwable t) {
+ if (failure == null) {
+ failure = t;
+ } else {
+ failure.addSuppressed(t);
Review Comment:
This could end up being quite a lot of exceptions if 100k segments fail to
fetch. Limit the number?
##########
processing/src/main/java/org/apache/druid/segment/SegmentReference.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.loading.AcquireSegmentAction;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Wrapper for a {@link SegmentDescriptor} and {@link Optional<Segment>}, the
latter being created by a
+ * {@link SegmentMapFunction} being applied to a {@link
ReferenceCountedSegmentProvider}. Closing this object closes
+ * both the {@link #segmentReference} and any closeables attached from the
process of creating this object, such as
+ * from {@link AcquireSegmentAction}
+ */
+public class SegmentReference implements Closeable
+{
+ public static SegmentReference missing(SegmentDescriptor segmentDescriptor)
+ {
+ return new SegmentReference(segmentDescriptor, Optional.empty(),
AcquireSegmentAction.NOOP_CLEANUP);
+ }
+
+ private final SegmentDescriptor segmentDescriptor;
+ private final Optional<Segment> segmentReference;
+ private final Closer closer = Closer.create();
+
+ public SegmentReference(
+ SegmentDescriptor segmentDescriptor,
+ Optional<Segment> segmentReference,
+ Closeable cleanupHold
+ )
+ {
+ this.segmentDescriptor = segmentDescriptor;
+ this.segmentReference = segmentReference.map(closer::register);
+ closer.register(cleanupHold);
Review Comment:
This will close `cleanupHold` first, then `segmentReference`. Is that the
right order? Seems possibly backwards.
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java:
##########
@@ -39,116 +43,96 @@ public interface SegmentCacheManager
boolean canHandleSegments();
/**
- * Return a list of cached segments from local disk, if any. This should be
called only
- * when {@link #canHandleSegments()} is true.
+ * Return a list of cached segments from local disk, if any. This should be
called only when
+ * {@link #canHandleSegments()} is true.
*/
List<DataSegment> getCachedSegments() throws IOException;
/**
- * Store a segment info file for the supplied segment on disk. This
operation is idempotent when called
- * multiple times for a given segment.
+ * Store a segment info file for the supplied segment on disk. This
operation is idempotent when called multiple
+ * times for a given segment.
*/
void storeInfoFile(DataSegment segment) throws IOException;
/**
- * Remove the segment info file for the supplied segment from disk. If the
file cannot be
- * deleted, do nothing.
+ * Remove the segment info file for the supplied segment from disk. If the
file cannot be deleted, do nothing.
*
- * @see SegmentCacheManager#cleanup(DataSegment)
+ * @see SegmentCacheManager#drop(DataSegment)
*/
void removeInfoFile(DataSegment segment);
+
/**
- * Returns a {@link ReferenceCountedSegmentProvider} that will be added by
the {@link org.apache.druid.server.SegmentManager}
- * to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This
method can be called multiple times
- * by the {@link org.apache.druid.server.SegmentManager} and implementation
can either return same {@link ReferenceCountedSegmentProvider}
- * or a different {@link ReferenceCountedSegmentProvider}. Caller should not
assume any particular behavior.
- * <p>
- * Returning a {@code ReferenceCountingSegment} will let custom
implementations keep track of reference count for
- * segments that the custom implementations are creating. That way, custom
implementations can know when the segment
- * is in use or not.
- * </p>
- * @param segment Segment to get on each download after service bootstrap
- * @throws SegmentLoadingException If there is an error in loading the
segment
- * @see SegmentCacheManager#getBootstrapSegment(DataSegment,
SegmentLazyLoadFailCallback)
+ * Given a {@link DataSegment}, which contains the instructions for where
and how to fetch a {@link Segment} from
+ * deep storage, this method tries to load and subsequently serve it to
callers via
+ * {@link #acquireSegment(DataSegment)} or {@link
#acquireSegment(DataSegment, SegmentDescriptor)}. If the segment
+ * cannot be loaded either due to error or insufficient storage space, this
method throws a
+ * {@link SegmentLoadingException}.
+ *
+ * @param segment Segment to get on each download (after service bootstrap)
+ * @throws SegmentLoadingException If there is an error in loading the
segment or insufficient storage space
+ * @see SegmentCacheManager#bootstrap(DataSegment,
SegmentLazyLoadFailCallback)
*/
- ReferenceCountedSegmentProvider getSegment(DataSegment segment) throws
SegmentLoadingException;
+ void load(DataSegment segment) throws SegmentLoadingException;
/**
- * Similar to {@link #getSegment(DataSegment)}, this method returns a {@link
ReferenceCountedSegmentProvider} that will be
- * added by the {@link org.apache.druid.server.SegmentManager} to the {@link
org.apache.druid.timeline.VersionedIntervalTimeline}
- * during startup on data nodes.
- * @param segment Segment to retrieve during service bootstrap
+ * Similar to {@link #load(DataSegment)}, this method loads segments during
startup on data nodes. Implementations of
+ * this method may be configured to use a larger than normal work pool that
only exists during startup and is shutdown
+ * after startup by calling {@link #shutdownBootstrap()}
+ *
+ * @param segment Segment to retrieve during service bootstrap
* @param loadFailed Callback to execute when segment lazy load failed. This
applies only when
* {@code lazyLoadOnStart} is enabled
- * @throws SegmentLoadingException - If there is an error in loading the
segment
- * @see SegmentCacheManager#getSegment(DataSegment)
+ * @throws SegmentLoadingException - If there is an error in loading the
segment or insufficient storage space
+ * @see SegmentCacheManager#load(DataSegment)
+ * @see SegmentCacheManager#shutdownBootstrap()
*/
- ReferenceCountedSegmentProvider getBootstrapSegment(
- DataSegment segment,
- SegmentLazyLoadFailCallback loadFailed
- ) throws SegmentLoadingException;
+ void bootstrap(DataSegment segment, SegmentLazyLoadFailCallback loadFailed)
throws SegmentLoadingException;
/**
- * This method fetches the files for the given segment if the segment is not
downloaded already. It
- * is not required to {@link #reserve(DataSegment)} before calling this
method. If caller has not reserved
- * the space explicitly via {@link #reserve(DataSegment)}, the
implementation should reserve space on caller's
- * behalf.
- * If the space has been explicitly reserved already
- * - implementation should use only the reserved space to store segment
files.
- * - implementation should not release the location in case of download
erros and leave it to the caller.
- * @throws SegmentLoadingException if there is an error in downloading files
- */
- File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
-
- /**
- * Asynchronously load the supplied segment into the page cache on each
download after the service finishes bootstrapping.
- * Equivalent to `cat segment_files > /dev/null` to force loading the
segment index files into page cache so that
- * later when the segment is queried, they are already in page cache and
only a minor page fault needs to be triggered
- * instead of a major page fault to make the query latency more consistent.
+ * Cleanup the segment files cache space used by the segment, releasing the
{@link StorageLocation} reservation
*
- * @see SegmentCacheManager#loadSegmentIntoPageCacheOnBootstrap(DataSegment)
+ * @see SegmentCacheManager#removeInfoFile(DataSegment)
*/
- void loadSegmentIntoPageCache(DataSegment segment);
+ void drop(DataSegment segment);
/**
- * Similar to {@link #loadSegmentIntoPageCache(DataSegment)}, but
asynchronously load the supplied segment into the
- * page cache during service bootstrap.
- *
- * @see SegmentCacheManager#loadSegmentIntoPageCache(DataSegment)
+ * Applies a {@link SegmentMapFunction} to a {@link Segment} if it is
available in the cache. If not present in any
+ * storage location, this method will not attempt to download the {@link
DataSegment} from deep storage. The
+ * {@link Segment} returned by this method is considered an open reference,
cache implementations must not allow it
+ * to be dropped until it has been closed. As such, the returned {@link
Segment} must be closed when the caller is
+ * finished doing segment things.
*/
- void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment);
+ Optional<Segment> acquireSegment(DataSegment dataSegment);
Review Comment:
The name of these two methods being the same is confusing, esp. since one
may fetch and the other won't. How about calling them `acquireSegmentIfCached`
and `acquireSegment`?
Btw, looking at the implementation of `SegmentLocalCacheManager`, it seems
like it doesn't quite do what this says. It looks like it returns segments with
static references only. It doesn't look like it returns weakly held segments,
even if they are currently in the cache. Is that intentional? It doesn't match
the javadoc here.
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -209,382 +278,173 @@ public void removeInfoFile(DataSegment segment)
}
@Override
- public ReferenceCountedSegmentProvider getSegment(final DataSegment
dataSegment) throws SegmentLoadingException
- {
- final File segmentFiles = getSegmentFiles(dataSegment);
- final SegmentizerFactory factory = getSegmentFactory(segmentFiles);
-
- final Segment segment = factory.factorize(dataSegment, segmentFiles,
false, SegmentLazyLoadFailCallback.NOOP);
- return ReferenceCountedSegmentProvider.wrapSegment(segment,
dataSegment.getShardSpec());
- }
-
- @Override
- public ReferenceCountedSegmentProvider getBootstrapSegment(
- final DataSegment dataSegment,
- final SegmentLazyLoadFailCallback loadFailed
- ) throws SegmentLoadingException
- {
- final File segmentFiles = getSegmentFiles(dataSegment);
- final SegmentizerFactory factory = getSegmentFactory(segmentFiles);
-
- final Segment segment = factory.factorize(dataSegment, segmentFiles,
config.isLazyLoadOnStart(), loadFailed);
- return ReferenceCountedSegmentProvider.wrapSegment(segment,
dataSegment.getShardSpec());
- }
-
- private SegmentizerFactory getSegmentFactory(final File segmentFiles) throws
SegmentLoadingException
- {
- final File factoryJson = new File(segmentFiles, "factory.json");
- final SegmentizerFactory factory;
-
- if (factoryJson.exists()) {
- try {
- factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class);
- }
- catch (IOException e) {
- throw new SegmentLoadingException(e, "Failed to get segment facotry
for %s", e.getMessage());
- }
- } else {
- factory = new MMappedQueryableSegmentizerFactory(indexIO);
- }
- return factory;
- }
-
- /**
- * Returns the effective segment info directory based on the configuration
settings.
- * The directory is selected based on the following configurations injected
into this class:
- * <ul>
- * <li>{@link SegmentLoaderConfig#getInfoDir()} - If {@code infoDir} is
set, it is used as the info directory.</li>
- * <li>{@link SegmentLoaderConfig#getLocations()} - If the info directory
is not set, the first location from this list is used.</li>
- * <li>List of {@link StorageLocation}s injected - If both the info
directory and locations list are not set, the
- * first storage location is used.</li>
- * </ul>
- *
- * @throws DruidException if none of the configurations are set, and the
info directory cannot be determined.
- */
- private File getEffectiveInfoDir()
- {
- final File infoDir;
- if (config.getInfoDir() != null) {
- infoDir = config.getInfoDir();
- } else if (!config.getLocations().isEmpty()) {
- infoDir = new File(config.getLocations().get(0).getPath(), "info_dir");
- } else if (!locations.isEmpty()) {
- infoDir = new File(locations.get(0).getPath(), "info_dir");
- } else {
- throw DruidException.forPersona(DruidException.Persona.OPERATOR)
- .ofCategory(DruidException.Category.NOT_FOUND)
- .build("Could not determine infoDir. Make sure
'druid.segmentCache.infoDir' "
- + "or 'druid.segmentCache.locations' is set correctly.");
- }
- return infoDir;
- }
-
- private static String getSegmentDir(DataSegment segment)
- {
- return DataSegmentPusher.getDefaultStorageDir(segment, false);
- }
-
- /**
- * Checks whether a segment is already cached. It can return false even if
{@link #reserve(DataSegment)}
- * has been successful for a segment but is not downloaded yet.
- */
- boolean isSegmentCached(final DataSegment segment)
- {
- return findStoragePathIfCached(segment) != null;
- }
-
- /**
- * This method will try to find if the segment is already downloaded on any
location. If so, the segment path
- * is returned. Along with that, location state is also updated with the
segment location. Refer to
- * {@link StorageLocation#maybeReserve(String, DataSegment)} for more
details.
- * If the segment files are damaged in any location, they are removed from
the location.
- * @param segment - Segment to check
- * @return - Path corresponding to segment directory if found, null
otherwise.
- */
- @Nullable
- private File findStoragePathIfCached(final DataSegment segment)
+ public Optional<Segment> acquireSegment(final DataSegment dataSegment)
{
+ final SegmentCacheEntryIdentifier cacheEntryIdentifier = new
SegmentCacheEntryIdentifier(dataSegment.getId());
for (StorageLocation location : locations) {
- String storageDir = getSegmentDir(segment);
- File localStorageDir = location.segmentDirectoryAsFile(storageDir);
- if (localStorageDir.exists()) {
- if (checkSegmentFilesIntact(localStorageDir)) {
- log.warn(
- "[%s] may be damaged. Delete all the segment files and pull from
DeepStorage again.",
- localStorageDir.getAbsolutePath()
- );
- cleanupCacheFiles(location.getPath(), localStorageDir);
- location.removeSegmentDir(localStorageDir, segment);
- break;
- } else {
- // Before returning, we also reserve the space. Refer to the
StorageLocation#maybeReserve documentation for details.
- location.maybeReserve(storageDir, segment);
- return localStorageDir;
- }
+ if (location.isReserved(cacheEntryIdentifier)) {
+ final SegmentCacheEntry cacheEntry =
location.getCacheEntry(cacheEntryIdentifier);
+ return cacheEntry.referenceProvider.acquireReference();
}
}
- return null;
+ return Optional.empty();
}
- /**
- * check data intact.
- * @param dir segments cache dir
- * @return true means segment files may be damaged.
- */
- private boolean checkSegmentFilesIntact(File dir)
- {
- return checkSegmentFilesIntactWithStartMarker(dir);
- }
-
- /**
- * If there is 'downloadStartMarker' existed in localStorageDir, the
segments files might be damaged.
- * Because each time, Druid will delete the 'downloadStartMarker' file after
pulling and unzip the segments from DeepStorage.
- * downloadStartMarker existed here may mean something error during download
segments and the segment files may be damaged.
- */
- private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir)
- {
- final File downloadStartMarker = new File(localStorageDir.getPath(),
DOWNLOAD_START_MARKER_FILE_NAME);
- return downloadStartMarker.exists();
- }
-
- /**
- * Make sure segments files in loc is intact, otherwise function like
loadSegments will failed because of segment files is damaged.
- * @param segment
- * @return
- * @throws SegmentLoadingException
- */
@Override
- public File getSegmentFiles(DataSegment segment) throws
SegmentLoadingException
+ public AcquireSegmentAction acquireSegment(
+ final DataSegment dataSegment,
+ final SegmentDescriptor descriptor
+ ) throws SegmentLoadingException
{
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
+ final SegmentCacheEntryIdentifier identifier = new
SegmentCacheEntryIdentifier(dataSegment.getId());
+ for (StorageLocation location : locations) {
+ final StorageLocation.ReservationHold<SegmentCacheEntry> hold =
+ location.addWeakReservationHoldIfExists(identifier);
try {
- File segmentDir = findStoragePathIfCached(segment);
- if (segmentDir != null) {
- return segmentDir;
+ if (hold != null) {
+ if (hold.getEntry().isMounted()) {
+ return new AcquireSegmentAction(
+ descriptor,
+ () ->
Futures.immediateFuture(hold.getEntry().referenceProvider.acquireReference()),
+ hold
+ );
+ } else {
+ // go ahead and mount it, someone else is probably trying this as
well, but mount is done under a segment
+ // lock and is a no-op if already mounted, and if we win we need
it to be mounted
+ return new AcquireSegmentAction(
+ descriptor,
+ makeOnDemandLoadSupplier(dataSegment, hold.getEntry(),
location),
+ hold
+ );
+ }
}
-
- return loadSegmentWithRetry(segment);
}
- finally {
- unlock(segment, lock);
+ catch (Throwable t) {
+ throw CloseableUtils.closeAndWrapInCatch(t, hold);
}
}
- }
-
- /**
- * If we have already reserved a location before, probably via {@link
#reserve(DataSegment)}, then only that location
- * should be tried. Otherwise, we would fetch locations using {@link
StorageLocationSelectorStrategy} and try all
- * of them one by one till there is success.
- * Location may fail because of IO failure, most likely in two cases:<p>
- * 1. druid don't have the write access to this location, most likely the
administrator doesn't config it correctly<p>
- * 2. disk failure, druid can't read/write to this disk anymore
- * <p>
- * Locations are fetched using {@link StorageLocationSelectorStrategy}.
- */
- private File loadSegmentWithRetry(DataSegment segment) throws
SegmentLoadingException
- {
- String segmentDir = getSegmentDir(segment);
-
- // Try the already reserved location. If location has been reserved
outside, then we do not release the location
- // here and simply delete any downloaded files. That is, we revert
anything we do in this function and nothing else.
- for (StorageLocation loc : locations) {
- if (loc.isReserved(segmentDir)) {
- File storageDir = loc.segmentDirectoryAsFile(segmentDir);
- boolean success = loadInLocationWithStartMarkerQuietly(loc, segment,
storageDir, false);
- if (!success) {
- throw new SegmentLoadingException(
- "Failed to load segment[%s] in reserved location[%s]",
segment.getId(), loc.getPath().getAbsolutePath()
+ final Iterator<StorageLocation> iterator = strategy.getLocations();
Review Comment:
Is it possible two callers start loading the same segment in two different
locations? Presumably `strategy.getLocations()` might give different locations
to different callers. And I don't see locking here.
##########
server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java:
##########
@@ -47,17 +85,30 @@ public class StorageLocation
private final long maxSizeBytes;
private final long freeSpaceToKeep;
+ private final ConcurrentHashMap<CacheEntryIdentifier, CacheEntry>
staticCacheEntries = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<CacheEntryIdentifier, WeakCacheEntry>
weakCacheEntries = new ConcurrentHashMap<>();
+
+ @GuardedBy("lock")
+ private WeakCacheEntry head;
+ @GuardedBy("lock")
+ private WeakCacheEntry tail;
+ @GuardedBy("lock")
+ private WeakCacheEntry hand;
+
/**
- * Set of files stored under the {@link #path}.
+ * Current total size of files in bytes, including weak entries.
*/
- @GuardedBy("this")
- private final Set<File> files = new HashSet<>();
+ private final AtomicLong currSizeBytes = new AtomicLong(0);
+
+ @GuardedBy("lock")
+ private long currWeakSizeBytes = 0;
Review Comment:
appears unused? is this for later?
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentCacheEntryIdentifier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.Objects;
+
+/**
+ * Use a {@link SegmentId} as a {@link CacheEntryIdentifier}
+ */
+public final class SegmentCacheEntryIdentifier implements CacheEntryIdentifier
Review Comment:
Wondering how partial fetches could work in the future. Will the
`CacheEntryIdentifier` become `SegmentId` + projection or `SegmentId` +
projection + column?
##########
processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This class represents an intent to acquire a reference to a {@link Segment}
and then use it to do stuff, finally
+ * closing when done. When an {@link AcquireSegmentAction} is created, segment
cache implementations will create a
+ * 'hold' to ensure it cannot be removed from the cache until this action has
been closed. {@link #getSegmentFuture()}
+ * will return the segment if already cached, or attempt to download from deep
storage to load into the cache if not.
+ * The {@link Segment} returned by the future places a separate hold on the
cache until the segment itself is closed,
+ * and MUST be closed when the caller is finished doing segment things with
it. The caller must also call
+ * {@link #close()} on this object to clean up the hold that exists while
possibly loading the segment, and may do so
+ * as soon as the {@link Segment} is acquired (or can do so earlier to abort
the load and release the hold).
+ */
+public class AcquireSegmentAction implements Closeable
+{
+ public static final Closeable NOOP_CLEANUP = () -> {};
+
+ public static List<SegmentReference> mapAllSegments(
+ List<AcquireSegmentAction> acquireSegmentActions,
+ SegmentMapFunction segmentMapFunction,
+ long timeoutAt
+ )
+ {
+ final Closer safetyNet = Closer.create();
+ Throwable failure = null;
+
+ // getting the future kicks off any background action, so materialize them
all to a list to get things started
+ final List<Future<Optional<Segment>>> futures = new
ArrayList<>(acquireSegmentActions.size());
+ for (AcquireSegmentAction acquireSegmentAction : acquireSegmentActions) {
+ safetyNet.register(acquireSegmentAction);
+ // if we haven't failed yet, keep collecing futures (we always want to
collect the actions themselves though
+ // to close
+ if (failure == null) {
+ try {
+ futures.add(acquireSegmentAction.getSegmentFuture());
+ }
+ catch (Throwable t) {
+ failure = t;
+ }
+ } else {
+ futures.add(Futures.immediateFuture(Optional.empty()));
+ }
+ }
+
+
+ final List<SegmentReference> segmentReferences = new
ArrayList<>(acquireSegmentActions.size());
+ for (int i = 0; i < acquireSegmentActions.size(); i++) {
+ // if anything fails, want to ignore it initially so we can collect all
additional futures to properly clean up
+ // all references before rethrowing the error
+ try {
+ final AcquireSegmentAction action = acquireSegmentActions.get(i);
+ final Future<Optional<Segment>> future = futures.get(i);
+ final Optional<Segment> segment = future.get(timeoutAt -
System.currentTimeMillis(), TimeUnit.MILLISECONDS)
+ .map(safetyNet::register);
+ segmentReferences.add(
+ new SegmentReference(
+ action.getDescriptor(),
+ segmentMapFunction.apply(segment),
+ action
+ )
+ );
+ }
+ catch (Throwable t) {
+ if (failure == null) {
+ failure = t;
+ } else {
+ failure.addSuppressed(t);
+ }
+ }
+ }
+ if (failure != null) {
+ throw CloseableUtils.closeAndWrapInCatch(failure, safetyNet);
+ }
+ return segmentReferences;
+ }
+
+ public static AcquireSegmentAction missingSegment(final SegmentDescriptor
descriptor)
+ {
+ return new AcquireSegmentAction(descriptor, () ->
Futures.immediateFuture(Optional.empty()), NOOP_CLEANUP);
+ }
+
+ private final SegmentDescriptor segmentDescriptor;
+ private final Supplier<ListenableFuture<Optional<Segment>>>
segmentFutureSupplier;
+ private final Closeable loadCleanup;
+
+ public AcquireSegmentAction(
+ SegmentDescriptor segmentDescriptor,
+ Supplier<ListenableFuture<Optional<Segment>>> segmentFutureSupplier,
+ Closeable loadCleanup
+ )
+ {
+ this.segmentDescriptor = segmentDescriptor;
+ this.segmentFutureSupplier = segmentFutureSupplier;
+ this.loadCleanup = loadCleanup;
+ }
+
+ public SegmentDescriptor getDescriptor()
+ {
+ return segmentDescriptor;
+ }
+
+ public ListenableFuture<Optional<Segment>> getSegmentFuture()
Review Comment:
Seems that in some cases, calling this method actually triggers the download
to start. That isn't totally obvious so it should be called out in javadoc here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]