cryptoe commented on code in PR #14042:
URL: https://github.com/apache/druid/pull/14042#discussion_r1176541229
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java:
##########
@@ -60,56 +65,146 @@ public TaskDataSegmentProvider(
this.coordinatorClient = coordinatorClient;
this.segmentCacheManager = segmentCacheManager;
this.indexIO = indexIO;
+ this.holders = new ConcurrentHashMap<>();
}
@Override
- public LazyResourceHolder<Segment> fetchSegment(
+ public Supplier<ResourceHolder<Segment>> fetchSegment(
final SegmentId segmentId,
final ChannelCounters channelCounters
)
{
+ // Returns Supplier<ResourceHolder> instead of ResourceHolder, so the
Coordinator calls and segment downloads happen
+ // in processing threads, rather than the main thread. (They happen when
fetchSegmentInternal is called.)
+ return () -> {
+ ResourceHolder<Segment> holder = null;
+
+ while (holder == null) {
+ holder = holders.computeIfAbsent(
+ segmentId,
+ k -> new SegmentHolder(
+ () -> fetchSegmentInternal(segmentId, channelCounters),
+ () -> holders.remove(segmentId)
+ )
+ ).get();
+ }
+
+ return holder;
+ };
+ }
+
+ /**
+ * Helper used by {@link #fetchSegment(SegmentId, ChannelCounters)}. Does
the actual fetching of a segment, once it
+ * is determined that we definitely need to go out and get one.
+ */
+ private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
+ final SegmentId segmentId,
+ final ChannelCounters channelCounters
+ )
+ {
+ final DataSegment dataSegment;
try {
- // Use LazyResourceHolder so Coordinator call and segment downloads
happen in processing threads,
- // rather than the main thread.
- return new LazyResourceHolder<>(
- () -> {
- final DataSegment dataSegment;
- try {
- dataSegment = FutureUtils.get(
- coordinatorClient.fetchUsedSegment(
- segmentId.getDataSource(),
- segmentId.toString()
- ),
- true
- );
- }
- catch (InterruptedException | ExecutionException e) {
- throw new RE(e, "Failed to fetch segment details from
Coordinator for [%s]", segmentId);
- }
+ dataSegment = FutureUtils.get(
+ coordinatorClient.fetchUsedSegment(
+ segmentId.getDataSource(),
+ segmentId.toString()
+ ),
+ true
+ );
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new RE(e, "Failed to fetch segment details from Coordinator for
[%s]", segmentId);
+ }
- final Closer closer = Closer.create();
- try {
- final File segmentDir =
segmentCacheManager.getSegmentFiles(dataSegment);
- closer.register(() -> FileUtils.deleteDirectory(segmentDir));
+ final Closer closer = Closer.create();
+ try {
+ if (!segmentCacheManager.reserve(dataSegment)) {
+ throw new ISE("Could not reserve location for segment [%s]",
segmentId);
+ }
+ closer.register(() -> segmentCacheManager.cleanup(dataSegment));
+ final File segmentDir = segmentCacheManager.getSegmentFiles(dataSegment);
- final QueryableIndex index = indexIO.loadIndex(segmentDir);
- final int numRows = index.getNumRows();
- final long size = dataSegment.getSize();
- closer.register(() -> channelCounters.addFile(numRows, size));
- closer.register(index);
- return Pair.of(new QueryableIndexSegment(index,
dataSegment.getId()), closer);
+ final QueryableIndex index =
closer.register(indexIO.loadIndex(segmentDir));
+ final QueryableIndexSegment segment = new QueryableIndexSegment(index,
dataSegment.getId());
+ final int numRows = index.getNumRows();
+ final long size = dataSegment.getSize();
+ closer.register(() -> channelCounters.addFile(numRows, size));
+ return new ReferenceCountingResourceHolder<>(segment, closer);
+ }
+ catch (IOException | SegmentLoadingException e) {
+ throw CloseableUtils.closeInCatch(
+ new RE(e, "Failed to download segment [%s]", segmentId),
+ closer
+ );
+ }
+ }
+
+ private static class SegmentHolder implements
Supplier<ResourceHolder<Segment>>
+ {
+ private final Supplier<ResourceHolder<Segment>> holderSupplier;
+ private final Closeable cleanupFn;
+ private ReferenceCountingResourceHolder<Segment> holder;
+ private boolean closing;
Review Comment:
Nit: can we add a safety check of guarded by("this") to 146, 147
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]