jon-wei commented on a change in pull request #11398:
URL: https://github.com/apache/druid/pull/11398#discussion_r668265782
##########
File path:
server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
##########
@@ -29,10 +30,45 @@
* Loading segments from deep storage to local storage.
* Implementations must be thread-safe.
*/
+@UnstableApi
public interface SegmentLoader
{
boolean isSegmentLoaded(DataSegment segment);
- Segment getSegment(DataSegment segment, boolean lazy,
SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
+
+ /**
+ * Returns a {@link ReferenceCountingSegment} that will be added by the
{@link org.apache.druid.server.SegmentManager}
+ * to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This
method can be called multiple times
+ * by the {@link org.apache.druid.server.SegmentManager}
+ *
+ * Returning a {@code ReferenceCountingSegment} will let custom
implementations keep track of reference count for
+ * segments that the custom implementations are creating. That way, custom
implementations can know when the segment
+ * is in use or not.
+ * @param segment - {@link DataSegment} segment to download
+ * @param lazy - whether the loading is lazy
+ * @param loadFailed - Callback to invoke if the loading fails
+ * @return Segment object wrapped inside {@link ReferenceCountingSegment}.
+ * @throws SegmentLoadingException
+ */
+ ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy,
SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
Review comment:
Is the SegmentLoader guaranteed to return the same
ReferenceCountingSegment instance across multiple calls of getSegment? Should
it?
##########
File path:
server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
##########
@@ -237,44 +249,83 @@ public File getSegmentFiles(DataSegment segment) throws
SegmentLoadingException
}
/**
- * location may fail because of IO failure, most likely in two cases:<p>
+ * If we have already reserved a location before, probably via {@link
#reserve(DataSegment)}, then only that location
+ * should be tried. Otherwise, we would fetch locations using {@link
StorageLocationSelectorStrategy} and try all
+ * of them one by one till there is success.
+ * Location may fail because of IO failure, most likely in two cases:<p>
* 1. druid don't have the write access to this location, most likely the
administrator doesn't config it correctly<p>
* 2. disk failure, druid can't read/write to this disk anymore
- *
+ * <p>
* Locations are fetched using {@link StorageLocationSelectorStrategy}.
*/
- private StorageLocation loadSegmentWithRetry(DataSegment segment, String
storageDirStr) throws SegmentLoadingException
+ private File loadSegmentWithRetry(DataSegment segment) throws
SegmentLoadingException
{
- Iterator<StorageLocation> locationsIterator = strategy.getLocations();
+ String segmentDir = getSegmentDir(segment);
+
+ // Try the already reserved location. If location has been reserved
outside, then we do not release the location
+ // here and simply delete any downloaded files. That is, we revert
anything we do in this function and nothing else.
+ for (StorageLocation loc : locations) {
+ if (loc.isReserved(segmentDir)) {
+ File storageDir = loc.segmentDirectoryAsFile(segmentDir);
+ boolean success = loadInLocationWithStartMarkerQuietly(loc, segment,
storageDir, false);
+ if (!success) {
+ throw new SegmentLoadingException("Failed to load segment %s in
reserved location [%s]", segment.getId(), loc.getPath().getAbsolutePath());
Review comment:
Should this alert and cleanup instead of failing with an exception?
##########
File path:
server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
##########
@@ -154,7 +166,8 @@ public synchronized File reserve(String
segmentFilePathToAdd, String segmentId,
{
final File segmentFileToAdd = new File(path, segmentFilePathToAdd);
if (files.contains(segmentFileToAdd)) {
- return null;
+ //TODO: is this change ok?
+ return segmentFileToAdd;
Review comment:
What's the reasoning for this change?
##########
File path:
server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
##########
@@ -29,10 +30,45 @@
* Loading segments from deep storage to local storage.
* Implementations must be thread-safe.
*/
+@UnstableApi
public interface SegmentLoader
{
boolean isSegmentLoaded(DataSegment segment);
- Segment getSegment(DataSegment segment, boolean lazy,
SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
+
+ /**
+ * Returns a {@link ReferenceCountingSegment} that will be added by the
{@link org.apache.druid.server.SegmentManager}
+ * to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This
method can be called multiple times
+ * by the {@link org.apache.druid.server.SegmentManager}
+ *
+ * Returning a {@code ReferenceCountingSegment} will let custom
implementations keep track of reference count for
+ * segments that the custom implementations are creating. That way, custom
implementations can know when the segment
+ * is in use or not.
+ * @param segment - {@link DataSegment} segment to download
+ * @param lazy - whether the loading is lazy
+ * @param loadFailed - Callback to invoke if the loading fails
+ * @return Segment object wrapped inside {@link ReferenceCountingSegment}.
+ * @throws SegmentLoadingException
+ */
+ ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy,
SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
+
File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
+
+ /**
+ * Tries to reserve the space for a segment on any location. We only return
a boolean result instead of a pointer to
+ * {@link StorageLocation} since we don't want callers to operate on {@code
StorageLocation} directly outside {@code SegmentLoader}.
+ * {@link SegmentLoader} operates on the {@code StorageLocation} objects in
a thread-safe manner.
+ *
+ * @param segment - Segment to reserve
+ * @return True if enough space found to store the segment, false otherwise
+ */
+ boolean reserve(DataSegment segment);
+
+ /**
+ * Reverts the effects of {@link #reserve(DataSegment)} (DataSegment)} by
releasing the location reserved for this segment.
+ * @param segment - Segment to release the location for.
+ * @return - True if any location was reserved and released, false otherwise.
+ */
+ boolean release(DataSegment segment);
Review comment:
What calls `release()`?
##########
File path:
server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
##########
@@ -203,8 +219,8 @@ public Segment getSegment(DataSegment segment, boolean
lazy, SegmentLazyLoadFail
} else {
factory = new MMappedQueryableSegmentizerFactory(indexIO);
}
-
- return factory.factorize(segment, segmentFiles, lazy, loadFailed);
+ Segment baseSegment = factory.factorize(segment, segmentFiles, lazy,
loadFailed);
+ return ReferenceCountingSegment.wrapSegment(baseSegment,
segment.getShardSpec());
Review comment:
Since it's a stateful object with a reference count, it seems a little
weird to me that each call is returning a new instance (which would have a
fresh reference state)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]