This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fb7fdb4d112 SegementCacheManager: acquireCachedSegment with SegmentId. 
(#19044)
fb7fdb4d112 is described below

commit fb7fdb4d112b9abd714d5eea5843e3126c887427
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Feb 23 13:40:16 2026 -0800

    SegementCacheManager: acquireCachedSegment with SegmentId. (#19044)
    
    Prior to this patch, acquireCachedSegment required the entire DataSegment
    object. This patch loosens the requirement to just the SegmentId.
    
    This is useful under MSQ, where caching is fully on-demand (no load rules)
    and the full DataSegment object is not necessarily available in the local
    timeline, even if a segment is cached locally. See the change in
    RegularLoadableSegment.
---
 .../indexing/common/task/CompactionTaskTest.java   | 14 ++++++---
 .../AbstractMultiPhaseParallelIndexingTest.java    |  2 +-
 .../druid/msq/input/RegularLoadableSegment.java    | 12 +++-----
 .../druid/msq/exec/MSQCompactionTaskRunTest.java   |  9 +-----
 .../druid/segment/loading/SegmentCacheManager.java |  9 +++---
 .../segment/loading/SegmentLocalCacheManager.java  |  6 ++--
 .../org/apache/druid/server/SegmentManager.java    | 14 +++++++--
 .../segment/loading/NoopSegmentCacheManager.java   |  3 +-
 .../SegmentLocalCacheManagerConcurrencyTest.java   |  2 +-
 .../loading/SegmentLocalCacheManagerTest.java      | 12 ++++----
 .../coordination/SegmentLoadDropHandlerTest.java   |  4 +--
 .../druid/test/utils/TestSegmentCacheManager.java  | 36 ++++++++++++++--------
 12 files changed, 70 insertions(+), 53 deletions(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index aeb5006a249..2b20fcfa825 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -145,6 +145,7 @@ import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.hamcrest.CoreMatchers;
 import org.joda.time.Interval;
@@ -1972,11 +1973,16 @@ public class CompactionTaskTest
       }
 
       @Override
-      public Optional<Segment> acquireCachedSegment(DataSegment dataSegment)
+      public Optional<Segment> acquireCachedSegment(SegmentId segmentId)
       {
-        return Optional.of(
-            new 
QueryableIndexSegment(indexIO.loadIndex(segments.get(dataSegment)), 
dataSegment.getId())
-        );
+        for (Map.Entry<DataSegment, File> entry : segments.entrySet()) {
+          if (entry.getKey().getId().equals(segmentId)) {
+            return Optional.of(
+                new QueryableIndexSegment(indexIO.loadIndex(entry.getValue()), 
segmentId)
+            );
+          }
+        }
+        return Optional.empty();
       }
 
       @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index a6ab29d2334..e02dd9324ab 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -299,7 +299,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest 
extends AbstractParallelIn
         .manufacturate(tempSegmentDir, false);
     try {
       cacheManager.load(dataSegment);
-      return cacheManager.acquireCachedSegment(dataSegment).orElseThrow();
+      return 
cacheManager.acquireCachedSegment(dataSegment.getId()).orElseThrow();
     }
     catch (SegmentLoadingException e) {
       throw new RuntimeException(e);
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java
index a73bcb12032..33399e354a1 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java
@@ -140,15 +140,11 @@ public class RegularLoadableSegment implements 
LoadableSegment
       throw DruidException.defensive("Segment with descriptor[%s] is already 
acquired", descriptor);
     }
 
-    if (cachedDataSegment != null) {
-      final Optional<Segment> cachedSegment = 
segmentManager.acquireCachedSegment(cachedDataSegment);
-      if (cachedSegment.isPresent()) {
-        acquired = true;
-      }
-      return cachedSegment;
+    final Optional<Segment> cachedSegment = 
segmentManager.acquireCachedSegment(segmentId);
+    if (cachedSegment.isPresent()) {
+      acquired = true;
     }
-
-    return Optional.empty();
+    return cachedSegment;
   }
 
   @Override
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
index eeb24bb62f1..0193dd46331 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
@@ -212,14 +212,7 @@ public class MSQCompactionTaskRunTest extends 
CompactionTaskRunBase
           null
       );
     });
-    
when(segmentCacheManager.acquireCachedSegment(any())).thenAnswer(invocation -> {
-      DataSegment segment = invocation.getArgument(0);
-      QueryableIndexSegment index = new QueryableIndexSegment(
-          new TestUtils().getTestIndexIO().loadIndex(new File((String) 
segment.getLoadSpec().get("path"))),
-          segment.getId()
-      );
-      return Optional.of(index);
-    });
+    
when(segmentCacheManager.acquireCachedSegment(any())).thenReturn(Optional.empty());
     GroupingEngine groupingEngine = 
GroupByQueryRunnerTest.makeQueryRunnerFactory(
         new GroupByQueryConfig(),
         TestGroupByBuffers.createDefault()
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
index 065c3d23922..922f0f17b0a 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
@@ -25,6 +25,7 @@ import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.segment.SegmentMapFunction;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -70,7 +71,7 @@ public interface SegmentCacheManager
   /**
    * Given a {@link DataSegment}, which contains the instructions for where 
and how to fetch a {@link Segment} from
    * deep storage, this method tries to load and subsequently serve it to 
callers via
-   * {@link #acquireCachedSegment(DataSegment)} or {@link 
#acquireSegment(DataSegment)}. If the segment
+   * {@link #acquireCachedSegment(SegmentId)} or {@link 
#acquireSegment(DataSegment)}. If the segment
    * cannot be loaded either due to error or insufficient storage space, this 
method throws a
    * {@link SegmentLoadingException}.
    *
@@ -108,7 +109,7 @@ public interface SegmentCacheManager
    * to be dropped until it has been closed. As such, the returned {@link 
Segment} must be closed when the caller is
    * finished doing segment things.
    */
-  Optional<Segment> acquireCachedSegment(DataSegment dataSegment);
+  Optional<Segment> acquireCachedSegment(SegmentId segmentId);
 
   /**
    * Returns a {@link AcquireSegmentAction} for a given {@link DataSegment} 
and {@link SegmentDescriptor}, which returns
@@ -121,8 +122,8 @@ public interface SegmentCacheManager
   AcquireSegmentAction acquireSegment(DataSegment dataSegment);
 
   /**
-   * Alternative to {@link #acquireCachedSegment(DataSegment)}, to return the 
{@link File} location of the segment files
-   * stored in the cache, instead of a {@link Optional<Segment>}. Unlike 
{@link #acquireCachedSegment(DataSegment)} and
+   * Alternative to {@link #acquireCachedSegment(SegmentId)}, to return the 
{@link File} location of the segment files
+   * stored in the cache, instead of a {@link Optional<Segment>}. Unlike 
{@link #acquireCachedSegment(SegmentId)} and
    * {@link #acquireSegment(DataSegment)}, this method does not provide any 
protections for callers,
    * and should only be used by callers that are in control of when {@link 
#drop(DataSegment)} is called. This method
    * will not download the segment files from deep storage if they do not 
already exist in the cache, callers should use
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 650f799090a..9b5496fbadb 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -400,9 +400,9 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
   }
 
   @Override
-  public Optional<Segment> acquireCachedSegment(final DataSegment dataSegment)
+  public Optional<Segment> acquireCachedSegment(final SegmentId segmentId)
   {
-    final SegmentCacheEntryIdentifier cacheEntryIdentifier = new 
SegmentCacheEntryIdentifier(dataSegment.getId());
+    final SegmentCacheEntryIdentifier cacheEntryIdentifier = new 
SegmentCacheEntryIdentifier(segmentId);
     for (StorageLocation location : locations) {
       final SegmentCacheEntry cacheEntry = 
location.getStaticCacheEntry(cacheEntryIdentifier);
       if (cacheEntry != null) {
@@ -729,7 +729,7 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
 
   /**
    * Testing use only please, any callers that want to do stuff with segments 
should use
-   * {@link #acquireCachedSegment(DataSegment)} or {@link 
#acquireSegment(DataSegment)} instead. Does not hold locks
+   * {@link #acquireCachedSegment(SegmentId)} or {@link 
#acquireSegment(DataSegment)} instead. Does not hold locks
    * and so is not really safe to use while the cache manager is active
    */
   @VisibleForTesting
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java 
b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index af95eafaad6..58c9254b246 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -182,9 +182,17 @@ public class SegmentManager
    * download a {@link DataSegment} if it is not already present in {@link 
#cacheManager}, use
    * {@link #acquireSegment(DataSegment)} instead.
    */
+  public Optional<Segment> acquireCachedSegment(SegmentId segmentId)
+  {
+    return cacheManager.acquireCachedSegment(segmentId);
+  }
+
+  /**
+   * Convenience overload of {@link #acquireCachedSegment(SegmentId)} that 
accepts a {@link DataSegment}.
+   */
   public Optional<Segment> acquireCachedSegment(DataSegment dataSegment)
   {
-    return cacheManager.acquireCachedSegment(dataSegment);
+    return acquireCachedSegment(dataSegment.getId());
   }
 
   /**
@@ -311,7 +319,7 @@ public class SegmentManager
             );
 
             long numOfRows = 0;
-            final Optional<Segment> loadedSegment = 
cacheManager.acquireCachedSegment(dataSegment);
+            final Optional<Segment> loadedSegment = 
cacheManager.acquireCachedSegment(dataSegment.getId());
             if (loadedSegment.isPresent()) {
               final Segment segment = loadedSegment.get();
               final IndexedTable table = segment.as(IndexedTable.class);
@@ -378,7 +386,7 @@ public class SegmentManager
 
             if (oldSegmentRef != null) {
               try (final Closer closer = Closer.create()) {
-                final Optional<Segment> oldSegment = 
cacheManager.acquireCachedSegment(oldSegmentRef);
+                final Optional<Segment> oldSegment = 
cacheManager.acquireCachedSegment(oldSegmentRef.getId());
                 long numberOfRows = oldSegment.map(segment -> {
                   closer.register(segment);
                   final PhysicalSegmentInspector countInspector = 
segment.as(PhysicalSegmentInspector.class);
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
 
b/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
index 6121dcaf656..bec367ddc9a 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.loading;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -95,7 +96,7 @@ public class NoopSegmentCacheManager implements 
SegmentCacheManager
   }
 
   @Override
-  public Optional<Segment> acquireCachedSegment(DataSegment dataSegment)
+  public Optional<Segment> acquireCachedSegment(SegmentId segmentId)
   {
     throw new UnsupportedOperationException();
   }
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index 1ba6969e934..89f87403c3a 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -993,7 +993,7 @@ class SegmentLocalCacheManagerConcurrencyTest
         if (maxDelayBefore > 0) {
           Thread.sleep(ThreadLocalRandom.current().nextInt(maxDelayBefore));
         }
-        final Optional<Segment> segmentReference = 
segmentManager.acquireCachedSegment(segment).map(closer::register);
+        final Optional<Segment> segmentReference = 
segmentManager.acquireCachedSegment(segment.getId()).map(closer::register);
         if (segmentReference.isPresent()) {
           PhysicalSegmentInspector gadget = 
segmentReference.get().as(PhysicalSegmentInspector.class);
           if (maxDelayAfter > 0) {
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
index f6b27472361..55310348d21 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
@@ -832,7 +832,7 @@ public class SegmentLocalCacheManagerTest extends 
InitializedNullHandlingTest
     final DataSegment dataSegment = TestSegmentUtils.makeSegment("foo", "v1", 
Intervals.of("2020/2021"));
 
     manager.bootstrap(dataSegment, SegmentLazyLoadFailCallback.NOOP);
-    Segment actualBootstrapSegment = 
manager.acquireCachedSegment(dataSegment).orElse(null);
+    Segment actualBootstrapSegment = 
manager.acquireCachedSegment(dataSegment.getId()).orElse(null);
     Assert.assertNotNull(actualBootstrapSegment);
     Assert.assertEquals(dataSegment.getId(), actualBootstrapSegment.getId());
     Assert.assertEquals(dataSegment.getInterval(), 
actualBootstrapSegment.getDataInterval());
@@ -869,7 +869,7 @@ public class SegmentLocalCacheManagerTest extends 
InitializedNullHandlingTest
     final DataSegment dataSegment = TestSegmentUtils.makeSegment("foo", "v1", 
Intervals.of("2020/2021"));
 
     manager.bootstrap(dataSegment, () -> {});
-    Segment actualBootstrapSegment = 
manager.acquireCachedSegment(dataSegment).orElse(null);
+    Segment actualBootstrapSegment = 
manager.acquireCachedSegment(dataSegment.getId()).orElse(null);
     Assert.assertNotNull(actualBootstrapSegment);
     Assert.assertEquals(dataSegment.getId(), actualBootstrapSegment.getId());
     Assert.assertEquals(dataSegment.getInterval(), 
actualBootstrapSegment.getDataInterval());
@@ -918,7 +918,7 @@ public class SegmentLocalCacheManagerTest extends 
InitializedNullHandlingTest
 
     manager.load(segmentToLoad);
     Assert.assertNull(manager.getSegmentFiles(segmentToLoad));
-    
Assert.assertFalse(manager.acquireCachedSegment(segmentToLoad).isPresent());
+    
Assert.assertFalse(manager.acquireCachedSegment(segmentToLoad.getId()).isPresent());
     AcquireSegmentAction segmentAction = manager.acquireSegment(segmentToLoad);
     AcquireSegmentResult result = segmentAction.getSegmentFuture().get();
     Optional<Segment> theSegment = 
result.getReferenceProvider().acquireReference();
@@ -988,7 +988,7 @@ public class SegmentLocalCacheManagerTest extends 
InitializedNullHandlingTest
 
     manager.bootstrap(segmentToBootstrap, SegmentLazyLoadFailCallback.NOOP);
     Assert.assertNull(manager.getSegmentFiles(segmentToBootstrap));
-    
Assert.assertFalse(manager.acquireCachedSegment(segmentToBootstrap).isPresent());
+    
Assert.assertFalse(manager.acquireCachedSegment(segmentToBootstrap.getId()).isPresent());
     AcquireSegmentAction segmentAction = 
manager.acquireSegment(segmentToBootstrap);
     AcquireSegmentResult result = segmentAction.getSegmentFuture().get();
     Optional<Segment> theSegment = 
result.getReferenceProvider().acquireReference();
@@ -1138,7 +1138,7 @@ public class SegmentLocalCacheManagerTest extends 
InitializedNullHandlingTest
 
     manager.load(segmentToLoad);
     Assert.assertNull(manager.getSegmentFiles(segmentToLoad));
-    
Assert.assertFalse(manager.acquireCachedSegment(segmentToLoad).isPresent());
+    
Assert.assertFalse(manager.acquireCachedSegment(segmentToLoad.getId()).isPresent());
     AcquireSegmentAction segmentAction = manager.acquireSegment(segmentToLoad);
 
     // now drop it before we actually load it, but dropping a weakly held 
reference does not remove the entry from the
@@ -1306,7 +1306,7 @@ public class SegmentLocalCacheManagerTest extends 
InitializedNullHandlingTest
                                              .build();
 
     manager.load(tombstone);
-    Segment segment = manager.acquireCachedSegment(tombstone).orElse(null);
+    Segment segment = 
manager.acquireCachedSegment(tombstone.getId()).orElse(null);
 
     Assert.assertEquals(tombstone.getId(), segment.getId());
     Assert.assertEquals(interval, segment.getDataInterval());
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 77d58837315..f66f4c5530d 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -159,7 +159,7 @@ public class SegmentLoadDropHandlerTest
     Assert.assertEquals(ImmutableList.of(segment), 
segmentAnnouncer.getObservedSegments());
     Assert.assertFalse(
         "segment files shouldn't be deleted",
-        cacheManager.getObservedSegmentsRemovedFromCache().contains(segment)
+        
cacheManager.getObservedSegmentsRemovedFromCache().contains(segment.getId())
     );
   }
 
@@ -210,7 +210,7 @@ public class SegmentLoadDropHandlerTest
     
Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(segment));
     Assert.assertFalse(
         "segment files shouldn't be deleted",
-        cacheManager.getObservedSegmentsRemovedFromCache().contains(segment)
+        
cacheManager.getObservedSegmentsRemovedFromCache().contains(segment.getId())
     );
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java 
b/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java
index 67dff44e609..fe135005d19 100644
--- 
a/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java
+++ 
b/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java
@@ -32,6 +32,7 @@ import org.apache.druid.segment.loading.AcquireSegmentResult;
 import org.apache.druid.segment.loading.NoopSegmentCacheManager;
 import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
 
 import java.util.List;
@@ -50,11 +51,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class TestSegmentCacheManager extends NoopSegmentCacheManager
 {
   private final List<DataSegment> cachedSegments;
-  private final Map<DataSegment, ReferenceCountedSegmentProvider> 
referenceProviders;
+  private final Map<SegmentId, ReferenceCountedSegmentProvider> 
referenceProviders;
+  private final Map<SegmentId, DataSegment> segmentLookup;
 
   private final List<DataSegment> observedBootstrapSegments;
   private final List<DataSegment> observedSegments;
-  private final List<DataSegment> observedSegmentsRemovedFromCache;
+  private final Set<SegmentId> observedSegmentsRemovedFromCache;
   private final AtomicInteger observedShutdownBootstrapCount;
 
   public TestSegmentCacheManager()
@@ -66,12 +68,13 @@ public class TestSegmentCacheManager extends 
NoopSegmentCacheManager
   {
     this.cachedSegments = ImmutableList.copyOf(segmentsToCache);
     this.referenceProviders = new ConcurrentHashMap<>();
+    this.segmentLookup = new ConcurrentHashMap<>();
 
     // While inneficient, these CopyOnWriteArrayList objects greatly simplify 
meeting the thread
     // safety mandate from SegmentCacheManager. For testing, this should be ok.
     this.observedBootstrapSegments = new CopyOnWriteArrayList<>();
     this.observedSegments = new CopyOnWriteArrayList<>();
-    this.observedSegmentsRemovedFromCache = new CopyOnWriteArrayList<>();
+    this.observedSegmentsRemovedFromCache = ConcurrentHashMap.newKeySet();
 
     this.observedShutdownBootstrapCount = new AtomicInteger(0);
   }
@@ -82,7 +85,8 @@ public class TestSegmentCacheManager extends 
NoopSegmentCacheManager
    */
   public void registerSegment(final DataSegment dataSegment, final Segment 
segment)
   {
-    referenceProviders.put(dataSegment, 
ReferenceCountedSegmentProvider.of(segment));
+    segmentLookup.put(dataSegment.getId(), dataSegment);
+    referenceProviders.put(dataSegment.getId(), 
ReferenceCountedSegmentProvider.of(segment));
   }
 
   @Override
@@ -101,20 +105,24 @@ public class TestSegmentCacheManager extends 
NoopSegmentCacheManager
   public void bootstrap(DataSegment segment, SegmentLazyLoadFailCallback 
loadFailed)
   {
     observedBootstrapSegments.add(segment);
+    getSegmentInternal(segment);
   }
 
   @Override
   public void load(final DataSegment segment)
   {
     observedSegments.add(segment);
+    getSegmentInternal(segment);
   }
 
   private ReferenceCountedSegmentProvider getSegmentInternal(final DataSegment 
segment)
   {
+    segmentLookup.putIfAbsent(segment.getId(), segment);
     return referenceProviders.compute(
-        segment,
-        (s, existingProvider) -> {
+        segment.getId(),
+        (id, existingProvider) -> {
           if (existingProvider == null) {
+            final DataSegment s = segmentLookup.get(id);
             if (s.isTombstone()) {
               return 
ReferenceCountedSegmentProvider.of(TombstoneSegmentizerFactory.segmentForTombstone(s));
             } else {
@@ -133,18 +141,22 @@ public class TestSegmentCacheManager extends 
NoopSegmentCacheManager
   }
 
   @Override
-  public Optional<Segment> acquireCachedSegment(DataSegment dataSegment)
+  public Optional<Segment> acquireCachedSegment(SegmentId segmentId)
   {
-    if (observedSegmentsRemovedFromCache.contains(dataSegment)) {
+    if (observedSegmentsRemovedFromCache.contains(segmentId)) {
       return Optional.empty();
     }
-    return getSegmentInternal(dataSegment).acquireReference();
+    final ReferenceCountedSegmentProvider provider = 
referenceProviders.get(segmentId);
+    if (provider == null) {
+      return Optional.empty();
+    }
+    return provider.acquireReference();
   }
 
   @Override
   public AcquireSegmentAction acquireSegment(DataSegment dataSegment)
   {
-    if (observedSegmentsRemovedFromCache.contains(dataSegment)) {
+    if (observedSegmentsRemovedFromCache.contains(dataSegment.getId())) {
       return AcquireSegmentAction.missingSegment();
     }
     return new AcquireSegmentAction(
@@ -181,7 +193,7 @@ public class TestSegmentCacheManager extends 
NoopSegmentCacheManager
   public void drop(DataSegment segment)
   {
     getSegmentInternal(segment).close();
-    observedSegmentsRemovedFromCache.add(segment);
+    observedSegmentsRemovedFromCache.add(segment.getId());
   }
 
   public List<DataSegment> getObservedBootstrapSegments()
@@ -195,7 +207,7 @@ public class TestSegmentCacheManager extends 
NoopSegmentCacheManager
   }
 
 
-  public List<DataSegment> getObservedSegmentsRemovedFromCache()
+  public Set<SegmentId> getObservedSegmentsRemovedFromCache()
   {
     return observedSegmentsRemovedFromCache;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to