This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 26.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/26.0.0 by this push:
     new 5b9574dd3d Fix two concurrency issues with segment fetching. (#14042) 
(#14256)
5b9574dd3d is described below

commit 5b9574dd3dbc1d60d437fb8572d9acb5f0c2d405
Author: Gian Merlino <[email protected]>
AuthorDate: Thu May 11 05:28:54 2023 -0700

    Fix two concurrency issues with segment fetching. (#14042) (#14256)
    
    * Fix two concurrency issues with segment fetching. (#14042)
    
    * Fix two concurrency issues with segment fetching.
    
    1) SegmentLocalCacheManager: Fix a concurrency issue where certain directory
       cleanup happened outside of directoryWriteRemoveLock. This created the
       possibility that segments would be deleted by one thread, while being
       actively downloaded by another thread.
    
    2) TaskDataSegmentProcessor (MSQ): Fix a concurrency issue when two stages
       in the same process both use the same segment. For example: a self-join
       using distributed sort-merge. Prior to this change, the two stages could
       delete each others' segments.
    
    3) ReferenceCountingResourceHolder: increment() returns a new 
ResourceHolder,
       rather than a Releaser. This allows it to be passed to callers without 
them
       having to hold on to both the original ResourceHolder *and* a Releaser.
    
    4) Simplify various interfaces and implementations by using ResourceHolder
       instead of Pair and instead of split-up fields.
    
    * Add test.
    
    * Fix style.
    
    * Remove Releaser.
    
    * Updates from master.
    
    * Add some GuardedBys.
    
    * Use the correct GuardedBy.
    
    * Adjustments.
    
    * Remove unnecessary classes.
---
 .../druid/msq/exec/TaskDataSegmentProvider.java    | 192 +++++++++---
 .../input/external/ExternalInputSliceReader.java   |   5 +-
 .../msq/input/table/SegmentWithDescriptor.java     |  42 +--
 .../msq/input/table/SegmentsInputSliceReader.java  |   8 +-
 .../druid/msq/querykit/BaseLeafFrameProcessor.java |  12 +-
 .../querykit/BaseLeafFrameProcessorFactory.java    |  37 ++-
 .../druid/msq/querykit/DataSegmentProvider.java    |  11 +-
 .../druid/msq/querykit/LazyResourceHolder.java     |  26 +-
 .../groupby/GroupByPreShuffleFrameProcessor.java   |   9 +-
 .../GroupByPreShuffleFrameProcessorFactory.java    |   4 +-
 .../msq/querykit/scan/ScanQueryFrameProcessor.java |   9 +-
 .../scan/ScanQueryFrameProcessorFactory.java       |   4 +-
 .../msq/exec/TaskDataSegmentProviderTest.java      | 342 +++++++++++++++++++++
 .../querykit/scan/ScanQueryFrameProcessorTest.java |   5 +-
 .../druid/msq/test/CalciteMSQTestsHelper.java      |  19 +-
 .../org/apache/druid/msq/test/MSQTestBase.java     |  12 +-
 ...rceHolder.java => CloseableResourceHolder.java} |  35 ++-
 .../ReferenceCountingResourceHolder.java           |  21 +-
 .../apache/druid/collections/ResourceHolder.java   |   6 +
 .../epinephelinae/GroupByMergingQueryRunnerV2.java |   6 +-
 .../groupby/epinephelinae/ParallelCombiner.java    |   5 +-
 .../collections/CloseableResourceHolderTest.java}  |  28 +-
 .../ReferenceCountingResourceHolderTest.java       |   2 +-
 .../segment/loading/SegmentLocalCacheManager.java  |  12 +-
 24 files changed, 674 insertions(+), 178 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java
index 7c4dc2bb09..18d60f6e4d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java
@@ -19,14 +19,15 @@
 
 package org.apache.druid.msq.exec;
 
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.collections.ReferenceCountingResourceHolder;
+import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.msq.querykit.DataSegmentProvider;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
 import org.apache.druid.msq.rpc.CoordinatorServiceClient;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.QueryableIndex;
@@ -38,9 +39,13 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.utils.CloseableUtils;
 
+import javax.annotation.Nullable;
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
 
 /**
  * Production implementation of {@link DataSegmentProvider} using Coordinator 
APIs.
@@ -50,6 +55,7 @@ public class TaskDataSegmentProvider implements 
DataSegmentProvider
   private final CoordinatorServiceClient coordinatorClient;
   private final SegmentCacheManager segmentCacheManager;
   private final IndexIO indexIO;
+  private final ConcurrentHashMap<SegmentId, SegmentHolder> holders;
 
   public TaskDataSegmentProvider(
       CoordinatorServiceClient coordinatorClient,
@@ -60,56 +66,162 @@ public class TaskDataSegmentProvider implements 
DataSegmentProvider
     this.coordinatorClient = coordinatorClient;
     this.segmentCacheManager = segmentCacheManager;
     this.indexIO = indexIO;
+    this.holders = new ConcurrentHashMap<>();
   }
 
   @Override
-  public LazyResourceHolder<Segment> fetchSegment(
+  public Supplier<ResourceHolder<Segment>> fetchSegment(
       final SegmentId segmentId,
       final ChannelCounters channelCounters
   )
   {
+    // Returns Supplier<ResourceHolder> instead of ResourceHolder, so the 
Coordinator calls and segment downloads happen
+    // in processing threads, rather than the main thread. (They happen when 
fetchSegmentInternal is called.)
+    return () -> {
+      ResourceHolder<Segment> holder = null;
+
+      while (holder == null) {
+        holder = holders.computeIfAbsent(
+            segmentId,
+            k -> new SegmentHolder(
+                () -> fetchSegmentInternal(segmentId, channelCounters),
+                () -> holders.remove(segmentId)
+            )
+        ).get();
+      }
+
+      return holder;
+    };
+  }
+
+  /**
+   * Helper used by {@link #fetchSegment(SegmentId, ChannelCounters)}. Does 
the actual fetching of a segment, once it
+   * is determined that we definitely need to go out and get one.
+   */
+  private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
+      final SegmentId segmentId,
+      final ChannelCounters channelCounters
+  )
+  {
+    final DataSegment dataSegment;
     try {
-      // Use LazyResourceHolder so Coordinator call and segment downloads 
happen in processing threads,
-      // rather than the main thread.
-      return new LazyResourceHolder<>(
-          () -> {
-            final DataSegment dataSegment;
-            try {
-              dataSegment = FutureUtils.get(
-                  coordinatorClient.fetchUsedSegment(
-                      segmentId.getDataSource(),
-                      segmentId.toString()
-                  ),
-                  true
-              );
-            }
-            catch (InterruptedException | ExecutionException e) {
-              throw new RE(e, "Failed to fetch segment details from 
Coordinator for [%s]", segmentId);
-            }
+      dataSegment = FutureUtils.get(
+          coordinatorClient.fetchUsedSegment(
+              segmentId.getDataSource(),
+              segmentId.toString()
+          ),
+          true
+      );
+    }
+    catch (InterruptedException | ExecutionException e) {
+      throw new RE(e, "Failed to fetch segment details from Coordinator for 
[%s]", segmentId);
+    }
+
+    final Closer closer = Closer.create();
+    try {
+      if (!segmentCacheManager.reserve(dataSegment)) {
+        throw new ISE("Could not reserve location for segment [%s]", 
segmentId);
+      }
+      closer.register(() -> segmentCacheManager.cleanup(dataSegment));
+      final File segmentDir = segmentCacheManager.getSegmentFiles(dataSegment);
 
-            final Closer closer = Closer.create();
+      final QueryableIndex index = 
closer.register(indexIO.loadIndex(segmentDir));
+      final QueryableIndexSegment segment = new QueryableIndexSegment(index, 
dataSegment.getId());
+      final int numRows = index.getNumRows();
+      final long size = dataSegment.getSize();
+      closer.register(() -> channelCounters.addFile(numRows, size));
+      return new ReferenceCountingResourceHolder<>(segment, closer);
+    }
+    catch (IOException | SegmentLoadingException e) {
+      throw CloseableUtils.closeInCatch(
+          new RE(e, "Failed to download segment [%s]", segmentId),
+          closer
+      );
+    }
+  }
+
+  private static class SegmentHolder implements 
Supplier<ResourceHolder<Segment>>
+  {
+    private final Supplier<ResourceHolder<Segment>> holderSupplier;
+    private final Closeable cleanupFn;
+
+    @GuardedBy("this")
+    private ReferenceCountingResourceHolder<Segment> holder;
+
+    @GuardedBy("this")
+    private boolean closing;
+
+    @GuardedBy("this")
+    private boolean closed;
+
+    public SegmentHolder(Supplier<ResourceHolder<Segment>> holderSupplier, 
Closeable cleanupFn)
+    {
+      this.holderSupplier = holderSupplier;
+      this.cleanupFn = cleanupFn;
+    }
+
+    @Override
+    @Nullable
+    public ResourceHolder<Segment> get()
+    {
+      synchronized (this) {
+        if (closing) {
+          // Wait until the holder is closed.
+          while (!closed) {
             try {
-              final File segmentDir = 
segmentCacheManager.getSegmentFiles(dataSegment);
-              closer.register(() -> FileUtils.deleteDirectory(segmentDir));
-
-              final QueryableIndex index = indexIO.loadIndex(segmentDir);
-              final int numRows = index.getNumRows();
-              final long size = dataSegment.getSize();
-              closer.register(() -> channelCounters.addFile(numRows, size));
-              closer.register(index);
-              return Pair.of(new QueryableIndexSegment(index, 
dataSegment.getId()), closer);
+              wait();
             }
-            catch (IOException | SegmentLoadingException e) {
-              throw CloseableUtils.closeInCatch(
-                  new RE(e, "Failed to download segment [%s]", segmentId),
-                  closer
-              );
+            catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new RuntimeException(e);
             }
           }
-      );
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
+
+          // Then, return null so "fetchSegment" will try again.
+          return null;
+        } else if (holder == null) {
+          final ResourceHolder<Segment> segmentHolder = holderSupplier.get();
+          holder = new ReferenceCountingResourceHolder<>(
+              segmentHolder.get(),
+              () -> {
+                synchronized (this) {
+                  CloseableUtils.closeAll(
+                      () -> {
+                        // synchronized block not strictly needed here, but 
errorprone needs it since it doesn't
+                        // understand the lambda is immediately called. See 
https://errorprone.info/bugpattern/GuardedBy
+                        synchronized (this) {
+                          closing = true;
+                        }
+                      },
+                      segmentHolder,
+                      cleanupFn, // removes this holder from the "holders" map
+                      () -> {
+                        // synchronized block not strictly needed here, but 
errorprone needs it since it doesn't
+                        // understand the lambda is immediately called. See 
https://errorprone.info/bugpattern/GuardedBy
+                        synchronized (this) {
+                          closed = true;
+                          SegmentHolder.this.notifyAll();
+                        }
+                      }
+                  );
+                }
+              }
+          );
+          final ResourceHolder<Segment> retVal = holder.increment();
+          // Store already-closed holder, so it disappears when the last 
reference is closed.
+          holder.close();
+          return retVal;
+        } else {
+          try {
+            return holder.increment();
+          }
+          catch (IllegalStateException e) {
+            // Possible race: holder is in the process of closing. (This is 
the only reason "increment" can throw ISE.)
+            // Return null so "fetchSegment" will try again.
+            return null;
+          }
+        }
+      }
     }
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java
index c609f8b4c1..8e70552847 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java
@@ -20,6 +20,7 @@
 package org.apache.druid.msq.input.external;
 
 import com.google.common.collect.Iterators;
+import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
@@ -32,7 +33,6 @@ import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.guava.BaseSequence;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
@@ -48,7 +48,6 @@ import org.apache.druid.msq.input.NilInputSource;
 import org.apache.druid.msq.input.ReadableInput;
 import org.apache.druid.msq.input.ReadableInputs;
 import org.apache.druid.msq.input.table.SegmentWithDescriptor;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
 import org.apache.druid.msq.util.DimensionSchemaUtils;
 import org.apache.druid.segment.RowAdapters;
 import org.apache.druid.segment.RowBasedSegment;
@@ -234,7 +233,7 @@ public class ExternalInputSliceReader implements 
InputSliceReader
           );
 
           return new SegmentWithDescriptor(
-              new LazyResourceHolder<>(() -> Pair.of(segment, () -> {})),
+              () -> ResourceHolder.fromCloseable(segment),
               segmentId.toDescriptor()
           );
         }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
index 94109bc4a7..020b9f2a5b 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
@@ -24,36 +24,45 @@ import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.segment.Segment;
 
-import java.io.Closeable;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 /**
- * A holder for a physical segment.
+ * A holder for a supplier of a physical segment.
  */
-public class SegmentWithDescriptor implements Closeable
+public class SegmentWithDescriptor
 {
-  private final ResourceHolder<? extends Segment> segmentHolder;
+  private final Supplier<? extends ResourceHolder<Segment>> segmentSupplier;
   private final SegmentDescriptor descriptor;
 
+  /**
+   * Create a new instance.
+   *
+   * @param segmentSupplier supplier of a {@link ResourceHolder} of segment. 
The {@link ResourceHolder#close()} logic
+   *                        must include a delegated call to {@link 
Segment#close()}.
+   * @param descriptor      segment descriptor
+   */
   public SegmentWithDescriptor(
-      final ResourceHolder<? extends Segment> segmentHolder,
+      final Supplier<? extends ResourceHolder<Segment>> segmentSupplier,
       final SegmentDescriptor descriptor
   )
   {
-    this.segmentHolder = Preconditions.checkNotNull(segmentHolder, "segment");
+    this.segmentSupplier = Preconditions.checkNotNull(segmentSupplier, 
"segment");
     this.descriptor = Preconditions.checkNotNull(descriptor, "descriptor");
   }
 
   /**
    * The physical segment.
    *
-   * Named "getOrLoad" because the segment may be held by an eager or lazy 
resource holder (i.e.
-   * {@link org.apache.druid.msq.querykit.LazyResourceHolder}). If the 
resource holder is lazy, the segment is acquired
+   * Named "getOrLoad" because the segment may be generated by a lazy 
supplier. In this case, the segment is acquired
    * as part of the call to this method.
+   *
+   * It is not necessary to call {@link Segment#close()} on the returned 
segment. Calling {@link ResourceHolder#close()}
+   * is enough.
    */
-  public Segment getOrLoadSegment()
+  public ResourceHolder<Segment> getOrLoad()
   {
-    return segmentHolder.get();
+    return segmentSupplier.get();
   }
 
   /**
@@ -64,15 +73,6 @@ public class SegmentWithDescriptor implements Closeable
     return descriptor;
   }
 
-  /**
-   * Release resources used by the physical segment.
-   */
-  @Override
-  public void close()
-  {
-    segmentHolder.close();
-  }
-
   @Override
   public boolean equals(Object o)
   {
@@ -83,12 +83,12 @@ public class SegmentWithDescriptor implements Closeable
       return false;
     }
     SegmentWithDescriptor that = (SegmentWithDescriptor) o;
-    return Objects.equals(segmentHolder, that.segmentHolder) && 
Objects.equals(descriptor, that.descriptor);
+    return Objects.equals(segmentSupplier, that.segmentSupplier) && 
Objects.equals(descriptor, that.descriptor);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(segmentHolder, descriptor);
+    return Objects.hash(segmentSupplier, descriptor);
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
index 29236d7b49..f7efd287ae 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
@@ -20,7 +20,6 @@
 package org.apache.druid.msq.input.table;
 
 import com.google.common.collect.Iterators;
-import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.msq.counters.CounterNames;
 import org.apache.druid.msq.counters.CounterTracker;
@@ -29,7 +28,6 @@ import org.apache.druid.msq.input.InputSliceReader;
 import org.apache.druid.msq.input.ReadableInput;
 import org.apache.druid.msq.input.ReadableInputs;
 import org.apache.druid.msq.querykit.DataSegmentProvider;
-import org.apache.druid.segment.Segment;
 import org.apache.druid.timeline.SegmentId;
 
 import java.util.Iterator;
@@ -92,8 +90,10 @@ public class SegmentsInputSliceReader implements 
InputSliceReader
               descriptor.getPartitionNumber()
           );
 
-          final ResourceHolder<Segment> segmentHolder = 
dataSegmentProvider.fetchSegment(segmentId, channelCounters);
-          return new SegmentWithDescriptor(segmentHolder, descriptor);
+          return new SegmentWithDescriptor(
+              dataSegmentProvider.fetchSegment(segmentId, channelCounters),
+              descriptor
+          );
         }
     ).iterator();
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
index e8b761c800..ece5a6e145 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
@@ -55,7 +55,7 @@ public abstract class BaseLeafFrameProcessor implements 
FrameProcessor<Long>
   private final Query<?> query;
   private final ReadableInput baseInput;
   private final List<ReadableFrameChannel> inputChannels;
-  private final ResourceHolder<WritableFrameChannel> outputChannel;
+  private final ResourceHolder<WritableFrameChannel> outputChannelHolder;
   private final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder;
   private final BroadcastJoinHelper broadcastJoinHelper;
 
@@ -66,14 +66,14 @@ public abstract class BaseLeafFrameProcessor implements 
FrameProcessor<Long>
       final ReadableInput baseInput,
       final Int2ObjectMap<ReadableInput> sideChannels,
       final JoinableFactoryWrapper joinableFactory,
-      final ResourceHolder<WritableFrameChannel> outputChannel,
+      final ResourceHolder<WritableFrameChannel> outputChannelHolder,
       final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
       final long memoryReservedForBroadcastJoin
   )
   {
     this.query = query;
     this.baseInput = baseInput;
-    this.outputChannel = outputChannel;
+    this.outputChannelHolder = outputChannelHolder;
     this.frameWriterFactoryHolder = frameWriterFactoryHolder;
 
     final Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> 
inputChannelsAndBroadcastJoinHelper =
@@ -150,7 +150,7 @@ public abstract class BaseLeafFrameProcessor implements 
FrameProcessor<Long>
   @Override
   public List<WritableFrameChannel> outputChannels()
   {
-    return Collections.singletonList(outputChannel.get());
+    return Collections.singletonList(outputChannelHolder.get());
   }
 
   @Override
@@ -171,9 +171,7 @@ public abstract class BaseLeafFrameProcessor implements 
FrameProcessor<Long>
   @Override
   public void cleanup() throws IOException
   {
-    // Don't close the output channel, because multiple workers write to the 
same channel.
-    // The channel should be closed by the caller.
-    FrameProcessors.closeAll(inputChannels(), Collections.emptyList(), 
outputChannel, frameWriterFactoryHolder);
+    FrameProcessors.closeAll(inputChannels(), Collections.emptyList(), 
outputChannelHolder, frameWriterFactoryHolder);
   }
 
   protected FrameWriterFactory getFrameWriterFactory()
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
index 9f5a7462b4..5305afa2ce 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
@@ -32,7 +32,6 @@ import org.apache.druid.frame.processor.OutputChannelFactory;
 import org.apache.druid.frame.processor.OutputChannels;
 import org.apache.druid.frame.write.FrameWriterFactory;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -254,7 +253,7 @@ public abstract class BaseLeafFrameProcessorFactory extends 
BaseFrameProcessorFa
   protected abstract FrameProcessor<Long> makeProcessor(
       ReadableInput baseInput,
       Int2ObjectMap<ReadableInput> sideChannels,
-      ResourceHolder<WritableFrameChannel> outputChannelSupplier,
+      ResourceHolder<WritableFrameChannel> outputChannel,
       ResourceHolder<FrameWriterFactory> frameWriterFactory,
       FrameContext providerThingy
   );
@@ -272,21 +271,29 @@ public abstract class BaseLeafFrameProcessorFactory 
extends BaseFrameProcessorFa
             resource = queueRef.get().poll();
           }
 
-          return Pair.of(
-              resource,
-              () -> {
-                synchronized (queueRef) {
-                  final Queue<T> queue = queueRef.get();
-                  if (queue != null) {
-                    queue.add(resource);
-                    return;
-                  }
-                }
+          return new ResourceHolder<T>()
+          {
+            @Override
+            public T get()
+            {
+              return resource;
+            }
 
-                // Queue was null
-                backupCloser.accept(resource);
+            @Override
+            public void close()
+            {
+              synchronized (queueRef) {
+                final Queue<T> queue = queueRef.get();
+                if (queue != null) {
+                  queue.add(resource);
+                  return;
+                }
               }
-          );
+
+              // Queue was null
+              backupCloser.accept(resource);
+            }
+          };
         }
     );
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java
index b5d95a4401..5f5f6099e5 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java
@@ -24,13 +24,18 @@ import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.timeline.SegmentId;
 
+import java.util.function.Supplier;
+
 public interface DataSegmentProvider
 {
   /**
-   * Fetches the segment corresponding to the provided segmentId from deep 
storage, 
-   * segment fetched.
+   * Returns a supplier that fetches the segment corresponding to the provided 
segmentId from deep storage. The segment
+   * is not actually fetched until you call {@link Supplier#get()}. Once you 
call this, make sure to also call
+   * {@link ResourceHolder#close()}.
+   *
+   * It is not necessary to call {@link ResourceHolder#close()} if you never 
call {@link Supplier#get()}.
    */
-  ResourceHolder<Segment> fetchSegment(
+  Supplier<ResourceHolder<Segment>> fetchSegment(
       SegmentId segmentId,
       ChannelCounters channelCounters
   );
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java
index 4ae03c5dcf..e9a897cd16 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java
@@ -21,11 +21,9 @@ package org.apache.druid.msq.querykit;
 
 import com.google.common.base.Preconditions;
 import org.apache.druid.collections.ResourceHolder;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.logger.Logger;
 
 import javax.annotation.concurrent.NotThreadSafe;
-import java.io.Closeable;
 import java.util.function.Supplier;
 
 @NotThreadSafe
@@ -33,11 +31,10 @@ public class LazyResourceHolder<T> implements 
ResourceHolder<T>
 {
   private static final Logger log = new Logger(LazyResourceHolder.class);
 
-  private final Supplier<Pair<T, Closeable>> supplier;
-  private T resource = null;
-  private Closeable closer = null;
+  private final Supplier<ResourceHolder<T>> supplier;
+  private ResourceHolder<T> supplied = null;
 
-  public LazyResourceHolder(final Supplier<Pair<T, Closeable>> supplier)
+  public LazyResourceHolder(final Supplier<ResourceHolder<T>> supplier)
   {
     this.supplier = Preconditions.checkNotNull(supplier, "supplier");
   }
@@ -45,28 +42,25 @@ public class LazyResourceHolder<T> implements 
ResourceHolder<T>
   @Override
   public T get()
   {
-    if (resource == null) {
-      final Pair<T, Closeable> supplied = supplier.get();
-      resource = Preconditions.checkNotNull(supplied.lhs, "resource");
-      closer = Preconditions.checkNotNull(supplied.rhs, "closer");
+    if (supplied == null) {
+      supplied = supplier.get();
     }
 
-    return resource;
+    return supplied.get();
   }
 
   @Override
   public void close()
   {
-    if (resource != null) {
+    if (supplied != null) {
       try {
-        closer.close();
+        supplied.close();
       }
       catch (Throwable e) {
-        log.noStackTrace().warn(e, "Exception encountered while closing 
resource: %s", resource);
+        log.noStackTrace().warn(e, "Exception encountered while closing 
resource: %s", supplied.get());
       }
       finally {
-        resource = null;
-        closer = null;
+        supplied = null;
       }
     }
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index 5bcc4267b9..47427be85a 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -48,6 +48,7 @@ import 
org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.spec.SpecificSegmentSpec;
 import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.timeline.SegmentId;
@@ -76,7 +77,7 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
       final Int2ObjectMap<ReadableInput> sideChannels,
       final GroupByStrategySelector strategySelector,
       final JoinableFactoryWrapper joinableFactory,
-      final ResourceHolder<WritableFrameChannel> outputChannel,
+      final ResourceHolder<WritableFrameChannel> outputChannelHolder,
       final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
       final long memoryReservedForBroadcastJoin
   )
@@ -86,7 +87,7 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
         baseInput,
         sideChannels,
         joinableFactory,
-        outputChannel,
+        outputChannelHolder,
         frameWriterFactoryHolder,
         memoryReservedForBroadcastJoin
     );
@@ -103,13 +104,13 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
   protected ReturnOrAwait<Long> runWithSegment(final SegmentWithDescriptor 
segment) throws IOException
   {
     if (resultYielder == null) {
-      closer.register(segment);
+      final ResourceHolder<Segment> segmentHolder = 
closer.register(segment.getOrLoad());
 
       final Sequence<ResultRow> rowSequence =
           strategySelector.strategize(query)
                           .process(
                               query.withQuerySegmentSpec(new 
SpecificSegmentSpec(segment.getDescriptor())),
-                              
mapSegment(segment.getOrLoadSegment()).asStorageAdapter(),
+                              
mapSegment(segmentHolder.get()).asStorageAdapter(),
                               null
                           );
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java
index 285e75eaa5..207a5ae6a2 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java
@@ -28,11 +28,9 @@ import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.frame.channel.WritableFrameChannel;
 import org.apache.druid.frame.processor.FrameProcessor;
 import org.apache.druid.frame.write.FrameWriterFactory;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.msq.input.ReadableInput;
 import org.apache.druid.msq.kernel.FrameContext;
 import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 
@@ -69,7 +67,7 @@ public class GroupByPreShuffleFrameProcessorFactory extends 
BaseLeafFrameProcess
         frameContext.groupByStrategySelector(),
         new JoinableFactoryWrapper(frameContext.joinableFactory()),
         outputChannelHolder,
-        new LazyResourceHolder<>(() -> Pair.of(frameWriterFactoryHolder.get(), 
frameWriterFactoryHolder)),
+        frameWriterFactoryHolder,
         frameContext.memoryParameters().getBroadcastJoinMemory()
     );
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 0482e2715d..f843c189e3 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -54,6 +54,7 @@ import 
org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.spec.SpecificSegmentSpec;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.VirtualColumns;
@@ -90,7 +91,7 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
       final ReadableInput baseInput,
       final Int2ObjectMap<ReadableInput> sideChannels,
       final JoinableFactoryWrapper joinableFactory,
-      final ResourceHolder<WritableFrameChannel> outputChannel,
+      final ResourceHolder<WritableFrameChannel> outputChannelHolder,
       final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
       @Nullable final AtomicLong runningCountForLimit,
       final long memoryReservedForBroadcastJoin,
@@ -102,7 +103,7 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
         baseInput,
         sideChannels,
         joinableFactory,
-        outputChannel,
+        outputChannelHolder,
         frameWriterFactoryHolder,
         memoryReservedForBroadcastJoin
     );
@@ -152,12 +153,12 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
   protected ReturnOrAwait<Long> runWithSegment(final SegmentWithDescriptor 
segment) throws IOException
   {
     if (cursor == null) {
-      closer.register(segment);
+      final ResourceHolder<Segment> segmentHolder = 
closer.register(segment.getOrLoad());
 
       final Yielder<Cursor> cursorYielder = Yielders.each(
           makeCursors(
               query.withQuerySegmentSpec(new 
SpecificSegmentSpec(segment.getDescriptor())),
-              mapSegment(segment.getOrLoadSegment()).asStorageAdapter()
+              mapSegment(segmentHolder.get()).asStorageAdapter()
           )
       );
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
index bc8ec9608e..1f1263ef76 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
@@ -28,11 +28,9 @@ import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.frame.channel.WritableFrameChannel;
 import org.apache.druid.frame.processor.FrameProcessor;
 import org.apache.druid.frame.write.FrameWriterFactory;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.msq.input.ReadableInput;
 import org.apache.druid.msq.kernel.FrameContext;
 import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 
@@ -84,7 +82,7 @@ public class ScanQueryFrameProcessorFactory extends 
BaseLeafFrameProcessorFactor
         sideChannels,
         new JoinableFactoryWrapper(frameContext.joinableFactory()),
         outputChannelHolder,
-        new LazyResourceHolder<>(() -> Pair.of(frameWriterFactoryHolder.get(), 
frameWriterFactoryHolder)),
+        frameWriterFactoryHolder,
         runningCountForLimit,
         frameContext.memoryParameters().getBroadcastJoinMemory(),
         frameContext.jsonMapper()
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
new file mode 100644
index 0000000000..b1a97b3da0
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.rpc.CoordinatorServiceClient;
+import org.apache.druid.rpc.ServiceRetryPolicy;
+import org.apache.druid.segment.DimensionHandler;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.Metadata;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.data.ListIndexed;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+public class TaskDataSegmentProviderTest
+{
+  private static final String DATASOURCE = "foo";
+  private static final int NUM_SEGMENTS = 10;
+  private static final int THREADS = 8;
+  private static final String LOAD_SPEC_FILE_NAME = "data";
+
+  private List<DataSegment> segments;
+  private File cacheDir;
+  private SegmentLocalCacheManager cacheManager;
+  private TaskDataSegmentProvider provider;
+  private ListeningExecutorService exec;
+  private IndexIO indexIO = EasyMock.mock(IndexIO.class);
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws Exception
+  {
+    EmittingLogger.registerEmitter(new NoopServiceEmitter());
+
+    EasyMock.reset(indexIO);
+    EasyMock.expect(indexIO.loadIndex(EasyMock.anyObject())).andReturn(new 
TestQueryableIndex()).anyTimes();
+    EasyMock.replay(indexIO);
+
+    final ObjectMapper jsonMapper = TestHelper.JSON_MAPPER;
+    jsonMapper.registerSubtypes(TestLoadSpec.class);
+
+    segments = new ArrayList<>();
+
+    for (int i = 0; i < NUM_SEGMENTS; i++) {
+      // Two segments per interval; helps verify that direction creation + 
deletion does not include races.
+      final DateTime startTime = DateTimes.of("2000").plusDays(i / 2);
+      final int partitionNum = i % 2;
+
+      segments.add(
+          DataSegment.builder()
+                     .dataSource(DATASOURCE)
+                     .interval(
+                         Intervals.utc(
+                             startTime.getMillis(),
+                             startTime.plusDays(1).getMillis()
+                         )
+                     )
+                     .version("0")
+                     .shardSpec(new NumberedShardSpec(partitionNum, 2))
+                     .loadSpec(
+                         jsonMapper.convertValue(
+                             new TestLoadSpec(i),
+                             JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+                         )
+                     )
+                     .size(1)
+                     .build()
+      );
+    }
+
+    cacheDir = temporaryFolder.newFolder();
+    cacheManager = new SegmentLocalCacheManager(
+        new SegmentLoaderConfig().withLocations(
+            ImmutableList.of(new StorageLocationConfig(cacheDir, 
10_000_000_000L, null))
+        ),
+        jsonMapper
+    );
+
+    provider = new TaskDataSegmentProvider(
+        new TestCoordinatorServiceClientImpl(),
+        cacheManager,
+        indexIO
+    );
+
+    exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(THREADS, 
getClass().getSimpleName() + "-%s"));
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    if (indexIO != null) {
+      EasyMock.verify(indexIO);
+    }
+
+    if (exec != null) {
+      exec.shutdownNow();
+      exec.awaitTermination(1, TimeUnit.MINUTES);
+    }
+  }
+
+  @Test
+  public void testConcurrency()
+  {
+    final int iterations = 1000;
+    final List<ListenableFuture<Boolean>> testFutures = new ArrayList<>();
+
+    for (int i = 0; i < iterations; i++) {
+      final int expectedSegmentNumber = i % NUM_SEGMENTS;
+      final DataSegment segment = segments.get(expectedSegmentNumber);
+      final ListenableFuture<Supplier<ResourceHolder<Segment>>> f =
+          exec.submit(() -> provider.fetchSegment(segment.getId(), new 
ChannelCounters()));
+
+      testFutures.add(
+          FutureUtils.transform(
+              f,
+              holderSupplier -> {
+                try {
+                  final ResourceHolder<Segment> holder = holderSupplier.get();
+                  Assert.assertEquals(segment.getId(), holder.get().getId());
+
+                  final String expectedStorageDir = 
DataSegmentPusher.getDefaultStorageDir(segment, false);
+                  final File expectedFile = new File(
+                      StringUtils.format(
+                          "%s/%s/%s",
+                          cacheDir,
+                          expectedStorageDir,
+                          LOAD_SPEC_FILE_NAME
+                      )
+                  );
+
+                  Assert.assertTrue(expectedFile.exists());
+                  Assert.assertArrayEquals(
+                      Ints.toByteArray(expectedSegmentNumber),
+                      Files.readAllBytes(expectedFile.toPath())
+                  );
+
+                  holder.close();
+
+                  return true;
+                }
+                catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              }
+          )
+      );
+    }
+
+    Assert.assertEquals(iterations, testFutures.size());
+    for (int i = 0; i < iterations; i++) {
+      ListenableFuture<Boolean> testFuture = testFutures.get(i);
+      Assert.assertTrue("Test iteration #" + i, 
FutureUtils.getUnchecked(testFuture, false));
+    }
+
+    // Cache dir should exist, but be empty, since we've closed all holders.
+    Assert.assertTrue(cacheDir.exists());
+    Assert.assertArrayEquals(new String[]{}, cacheDir.list());
+  }
+
+  private class TestCoordinatorServiceClientImpl implements 
CoordinatorServiceClient
+  {
+    @Override
+    public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, 
String segmentId)
+    {
+      for (final DataSegment segment : segments) {
+        if (segment.getDataSource().equals(dataSource) && 
segment.getId().toString().equals(segmentId)) {
+          return Futures.immediateFuture(segment);
+        }
+      }
+
+      return Futures.immediateFailedFuture(new ISE("No such segment[%s] for 
dataSource[%s]", segmentId, dataSource));
+    }
+
+    @Override
+    public CoordinatorServiceClient withRetryPolicy(ServiceRetryPolicy 
retryPolicy)
+    {
+      return this;
+    }
+  }
+
+  @JsonTypeName("test")
+  private static class TestLoadSpec implements LoadSpec
+  {
+    private final int uniqueId;
+
+    @JsonCreator
+    public TestLoadSpec(@JsonProperty("uniqueId") int uniqueId)
+    {
+      this.uniqueId = uniqueId;
+    }
+
+    @JsonProperty
+    public int getUniqueId()
+    {
+      return uniqueId;
+    }
+
+    @Override
+    public LoadSpecResult loadSegment(File destDir) throws 
SegmentLoadingException
+    {
+      try {
+        Files.write(new File(destDir, LOAD_SPEC_FILE_NAME).toPath(), 
Ints.toByteArray(uniqueId));
+        Files.write(new File(destDir, "version.bin").toPath(), 
Ints.toByteArray(1));
+        return new LoadSpecResult(1);
+      }
+      catch (IOException e) {
+        throw new SegmentLoadingException(e, "Failed to load segment in 
location [%s]", destDir);
+      }
+    }
+  }
+
+  private static class TestQueryableIndex implements QueryableIndex
+  {
+    @Override
+    public Interval getDataInterval()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getNumRows()
+    {
+      return 0;
+    }
+
+    @Override
+    public Indexed<String> getAvailableDimensions()
+    {
+      return new ListIndexed<>();
+    }
+
+    @Override
+    public BitmapFactory getBitmapFactoryForDimensions()
+    {
+      return RoaringBitmapFactory.INSTANCE;
+    }
+
+    @Nullable
+    @Override
+    public Metadata getMetadata()
+    {
+      return null;
+    }
+
+    @Override
+    public Map<String, DimensionHandler> getDimensionHandlers()
+    {
+      return Collections.emptyMap();
+    }
+
+    @Override
+    public List<String> getColumnNames()
+    {
+      return Collections.emptyList();
+    }
+
+    @Nullable
+    @Override
+    public ColumnHolder getColumnHolder(String columnName)
+    {
+      return null;
+    }
+
+    @Override
+    public void close()
+    {
+
+    }
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
index c6c76ae2e9..7fde0083bc 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.querykit.scan;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import org.apache.druid.collections.ReferenceCountingResourceHolder;
 import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.FrameType;
@@ -38,13 +39,11 @@ import org.apache.druid.frame.write.FrameWriterFactory;
 import org.apache.druid.frame.write.FrameWriters;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.msq.input.ReadableInput;
 import org.apache.druid.msq.kernel.StageId;
 import org.apache.druid.msq.kernel.StagePartition;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
 import org.apache.druid.msq.test.LimitedFrameWriterFactory;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.scan.ScanQuery;
@@ -150,7 +149,7 @@ public class ScanQueryFrameProcessorTest extends 
InitializedNullHandlingTest
             }
           }
         },
-        new LazyResourceHolder<>(() -> Pair.of(frameWriterFactory, () -> {})),
+        new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {}),
         null,
         0L,
         new DefaultObjectMapper()
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index f08072d100..b11af677b5 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableSet;
 import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.TypeLiteral;
+import org.apache.druid.collections.ReferenceCountingResourceHolder;
+import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
@@ -35,14 +37,12 @@ import org.apache.druid.guice.JoinableFactoryModule;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
 import org.apache.druid.msq.guice.MSQIndexingModule;
 import org.apache.druid.msq.querykit.DataSegmentProvider;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.QueryProcessingPool;
@@ -80,7 +80,6 @@ import org.joda.time.Interval;
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
-import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
@@ -168,8 +167,7 @@ public class CalciteMSQTestsHelper
           ));
           binder.bind(DataSegmentAnnouncer.class).toInstance(new 
NoopDataSegmentAnnouncer());
           binder.bind(DataSegmentProvider.class)
-                .toInstance((dataSegment, channelCounters) ->
-                                new 
LazyResourceHolder<>(getSupplierForSegment(dataSegment)));
+                .toInstance((dataSegment, channelCounters) -> 
getSupplierForSegment(dataSegment));
 
           GroupByQueryConfig groupByQueryConfig = new GroupByQueryConfig();
           binder.bind(GroupByStrategySelector.class)
@@ -185,7 +183,7 @@ public class CalciteMSQTestsHelper
     );
   }
 
-  private static Supplier<Pair<Segment, Closeable>> 
getSupplierForSegment(SegmentId segmentId)
+  private static Supplier<ResourceHolder<Segment>> 
getSupplierForSegment(SegmentId segmentId)
   {
     final TemporaryFolder temporaryFolder = new TemporaryFolder();
     try {
@@ -291,13 +289,6 @@ public class CalciteMSQTestsHelper
       {
       }
     };
-    return new Supplier<Pair<Segment, Closeable>>()
-    {
-      @Override
-      public Pair<Segment, Closeable> get()
-      {
-        return new Pair<>(segment, Closer.create());
-      }
-    };
+    return () -> new ReferenceCountingResourceHolder<>(segment, 
Closer.create());
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index cc97d4bd82..2e0e09ff19 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -36,6 +36,8 @@ import com.google.inject.Module;
 import com.google.inject.TypeLiteral;
 import com.google.inject.util.Modules;
 import com.google.inject.util.Providers;
+import org.apache.druid.collections.ReferenceCountingResourceHolder;
+import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
@@ -66,7 +68,6 @@ import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.metadata.input.InputSourceModule;
@@ -95,7 +96,6 @@ import org.apache.druid.msq.indexing.report.MSQResultsReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.msq.querykit.DataSegmentProvider;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
 import org.apache.druid.msq.sql.MSQTaskQueryMaker;
 import org.apache.druid.msq.sql.MSQTaskSqlEngine;
 import org.apache.druid.msq.util.MultiStageQueryContext;
@@ -180,7 +180,6 @@ import org.mockito.Mockito;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -387,8 +386,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
           binder.bind(QueryProcessingPool.class)
                 .toInstance(new 
ForwardingQueryProcessingPool(Execs.singleThreaded("Test-runner-processing-pool")));
           binder.bind(DataSegmentProvider.class)
-                .toInstance((dataSegment, channelCounters) ->
-                                new 
LazyResourceHolder<>(getSupplierForSegment(dataSegment)));
+                .toInstance((dataSegment, channelCounters) -> 
getSupplierForSegment(dataSegment));
           binder.bind(IndexIO.class).toInstance(indexIO);
           
binder.bind(SpecificSegmentsQuerySegmentWalker.class).toInstance(qf.walker());
 
@@ -551,7 +549,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
   }
 
   @Nonnull
-  private Supplier<Pair<Segment, Closeable>> getSupplierForSegment(SegmentId 
segmentId)
+  private Supplier<ResourceHolder<Segment>> getSupplierForSegment(SegmentId 
segmentId)
   {
     if (segmentManager.getSegment(segmentId) == null) {
       final QueryableIndex index;
@@ -651,7 +649,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
       };
       segmentManager.addSegment(segment);
     }
-    return () -> new Pair<>(segmentManager.getSegment(segmentId), 
Closer.create());
+    return () -> 
ReferenceCountingResourceHolder.fromCloseable(segmentManager.getSegment(segmentId));
   }
 
   public SelectTester testSelectQuery()
diff --git 
a/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java 
b/processing/src/main/java/org/apache/druid/collections/CloseableResourceHolder.java
similarity index 51%
copy from 
processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
copy to 
processing/src/main/java/org/apache/druid/collections/CloseableResourceHolder.java
index 5f35bc9c44..e2c8234d38 100644
--- a/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
+++ 
b/processing/src/main/java/org/apache/druid/collections/CloseableResourceHolder.java
@@ -19,14 +19,39 @@
 
 package org.apache.druid.collections;
 
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.utils.CloseableUtils;
+
 import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicReference;
 
-/**
- */
-public interface ResourceHolder<T> extends Closeable
+public class CloseableResourceHolder<T extends Closeable> implements 
ResourceHolder<T>
 {
-  T get();
+  private final AtomicReference<T> resource;
+
+  /**
+   * Use {@link ResourceHolder#fromCloseable}.
+   */
+  CloseableResourceHolder(T resource)
+  {
+    this.resource = new AtomicReference<>(Preconditions.checkNotNull(resource, 
"resource"));
+  }
+
+  @Override
+  public T get()
+  {
+    final T retVal = resource.get();
+    if (retVal == null) {
+      throw new ISE("Already closed");
+    }
+    return retVal;
+  }
 
   @Override
-  void close();
+  public void close()
+  {
+    final T oldResource = resource.getAndSet(null);
+    CloseableUtils.closeAndWrapExceptions(oldResource);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java
 
b/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java
index 808100dfb2..6d731923c5 100644
--- 
a/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java
+++ 
b/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java
@@ -77,13 +77,14 @@ public class ReferenceCountingResourceHolder<T> implements 
ResourceHolder<T>
   }
 
   /**
-   * Increments the reference count by 1 and returns a {@link Releaser}. The 
returned {@link Releaser} is used to
-   * decrement the reference count when the caller no longer needs the 
resource.
+   * Increments the reference count by 1 and returns a {@link ResourceHolder} 
representing the new references.
+   * The returned {@link ResourceHolder} "close" method decrements the 
reference count when the caller no longer
+   * needs the resource.
    *
-   * {@link Releaser}s are not thread-safe. If multiple threads need 
references to the same holder, they should
-   * each acquire their own {@link Releaser}.
+   * Returned {@link ResourceHolder} are not thread-safe. If multiple threads 
need references to the same resource, they
+   * should each call this method on the original object.
    */
-  public Releaser increment()
+  public ResourceHolder<T> increment()
   {
     while (true) {
       int count = this.refCount.get();
@@ -95,11 +96,17 @@ public class ReferenceCountingResourceHolder<T> implements 
ResourceHolder<T>
       }
     }
 
-    // This Releaser is supposed to be used from a single thread, so no 
synchronization/atomicity
-    return new Releaser()
+    // This ResourceHolder is supposed to be used from a single thread, so no 
synchronization/atomicity
+    return new ResourceHolder<T>()
     {
       boolean released = false;
 
+      @Override
+      public T get()
+      {
+        return object;
+      }
+
       @Override
       public void close()
       {
diff --git 
a/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java 
b/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
index 5f35bc9c44..0eab667aeb 100644
--- a/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
+++ b/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
@@ -22,6 +22,7 @@ package org.apache.druid.collections;
 import java.io.Closeable;
 
 /**
+ *
  */
 public interface ResourceHolder<T> extends Closeable
 {
@@ -29,4 +30,9 @@ public interface ResourceHolder<T> extends Closeable
 
   @Override
   void close();
+
+  static <T extends Closeable> ResourceHolder<T> fromCloseable(final T 
resource)
+  {
+    return new CloseableResourceHolder<>(resource);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
index 6718dff9f8..1ad44bc84e 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
@@ -32,7 +32,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.collections.BlockingPool;
 import org.apache.druid.collections.ReferenceCountingResourceHolder;
-import org.apache.druid.collections.Releaser;
 import org.apache.druid.common.guava.GuavaUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
@@ -59,6 +58,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.ResultRow;
 import 
org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
 
+import java.io.Closeable;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -243,9 +243,9 @@ public class GroupByMergingQueryRunnerV2 implements 
QueryRunner<ResultRow>
                                       try (
                                           // These variables are used to close 
releasers automatically.
                                           @SuppressWarnings("unused")
-                                          Releaser bufferReleaser = 
mergeBufferHolder.increment();
+                                          Closeable bufferReleaser = 
mergeBufferHolder.increment();
                                           @SuppressWarnings("unused")
-                                          Releaser grouperReleaser = 
grouperHolder.increment()
+                                          Closeable grouperReleaser = 
grouperHolder.increment()
                                       ) {
                                         // Return true if OK, false if 
resources were exhausted.
                                         return input.run(queryPlusForRunners, 
responseContext)
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java
index 5d244b579a..d00f6f4785 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java
@@ -28,7 +28,6 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
 import it.unimi.dsi.fastutil.objects.Object2IntArrayMap;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import org.apache.druid.collections.ReferenceCountingResourceHolder;
-import org.apache.druid.collections.Releaser;
 import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
@@ -48,7 +47,7 @@ import org.apache.druid.segment.ObjectColumnSelector;
 import org.apache.druid.segment.column.ColumnCapabilities;
 
 import javax.annotation.Nullable;
-
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -409,7 +408,7 @@ public class ParallelCombiner<KeyType>
                 );
                 // This variable is used to close releaser automatically.
                 @SuppressWarnings("unused")
-                final Releaser releaser = combineBufferHolder.increment()
+                final Closeable releaser = combineBufferHolder.increment()
             ) {
               while (mergedIterator.hasNext()) {
                 final Entry<KeyType> next = mergedIterator.next();
diff --git 
a/processing/src/main/java/org/apache/druid/collections/Releaser.java 
b/processing/src/test/java/org/apache/druid/collections/CloseableResourceHolderTest.java
similarity index 57%
rename from processing/src/main/java/org/apache/druid/collections/Releaser.java
rename to 
processing/src/test/java/org/apache/druid/collections/CloseableResourceHolderTest.java
index f9f2d77ec3..783e24889c 100644
--- a/processing/src/main/java/org/apache/druid/collections/Releaser.java
+++ 
b/processing/src/test/java/org/apache/druid/collections/CloseableResourceHolderTest.java
@@ -19,13 +19,29 @@
 
 package org.apache.druid.collections;
 
+import org.junit.Assert;
+import org.junit.Test;
+
 import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicLong;
 
-/**
- * Releaser is like Closeable, but doesn't throw IOExceptions.
- */
-public interface Releaser extends Closeable
+public class CloseableResourceHolderTest
 {
-  @Override
-  void close();
+  @Test
+  public void testCloseableResourceHolder()
+  {
+    final AtomicLong closeCounter = new AtomicLong();
+    final Closeable closeable = closeCounter::incrementAndGet;
+    final ResourceHolder<Closeable> holder = 
ResourceHolder.fromCloseable(closeable);
+
+    Assert.assertSame(closeable, holder.get());
+
+    holder.close();
+    Assert.assertEquals(1, closeCounter.get());
+
+    holder.close();
+    Assert.assertEquals(1, closeCounter.get());
+
+    Assert.assertThrows(IllegalStateException.class, holder::get);
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java
 
b/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java
index 29df2c3aba..9c9983450e 100644
--- 
a/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java
+++ 
b/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java
@@ -45,7 +45,7 @@ public class ReferenceCountingResourceHolderTest
     List<Thread> threads = new ArrayList<>();
     for (int i = 0; i < 100; i++) {
       Thread thread = new Thread(() -> {
-        try (Releaser r = resourceHolder.increment()) {
+        try (ResourceHolder<Closeable> r = resourceHolder.increment()) {
           try {
             Thread.sleep(1);
           }
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index c6c216594e..5f7e71501c 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -519,13 +519,13 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       catch (Exception e) {
         log.error(e, "Unable to remove directory[%s]", cacheFile);
       }
-    }
 
-    File parent = cacheFile.getParentFile();
-    if (parent != null) {
-      File[] children = parent.listFiles();
-      if (children == null || children.length == 0) {
-        cleanupCacheFiles(baseFile, parent);
+      File parent = cacheFile.getParentFile();
+      if (parent != null) {
+        File[] children = parent.listFiles();
+        if (children == null || children.length == 0) {
+          cleanupCacheFiles(baseFile, parent);
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to