This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fb7fdb4d112 SegementCacheManager: acquireCachedSegment with SegmentId.
(#19044)
fb7fdb4d112 is described below
commit fb7fdb4d112b9abd714d5eea5843e3126c887427
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Feb 23 13:40:16 2026 -0800
SegementCacheManager: acquireCachedSegment with SegmentId. (#19044)
Prior to this patch, acquireCachedSegment required the entire DataSegment
object. This patch loosens the requirement to just the SegmentId.
This is useful under MSQ, where caching is fully on-demand (no load rules)
and the full DataSegment object is not necessarily available in the local
timeline, even if a segment is cached locally. See the change in
RegularLoadableSegment.
---
.../indexing/common/task/CompactionTaskTest.java | 14 ++++++---
.../AbstractMultiPhaseParallelIndexingTest.java | 2 +-
.../druid/msq/input/RegularLoadableSegment.java | 12 +++-----
.../druid/msq/exec/MSQCompactionTaskRunTest.java | 9 +-----
.../druid/segment/loading/SegmentCacheManager.java | 9 +++---
.../segment/loading/SegmentLocalCacheManager.java | 6 ++--
.../org/apache/druid/server/SegmentManager.java | 14 +++++++--
.../segment/loading/NoopSegmentCacheManager.java | 3 +-
.../SegmentLocalCacheManagerConcurrencyTest.java | 2 +-
.../loading/SegmentLocalCacheManagerTest.java | 12 ++++----
.../coordination/SegmentLoadDropHandlerTest.java | 4 +--
.../druid/test/utils/TestSegmentCacheManager.java | 36 ++++++++++++++--------
12 files changed, 70 insertions(+), 53 deletions(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index aeb5006a249..2b20fcfa825 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -145,6 +145,7 @@ import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
@@ -1972,11 +1973,16 @@ public class CompactionTaskTest
}
@Override
- public Optional<Segment> acquireCachedSegment(DataSegment dataSegment)
+ public Optional<Segment> acquireCachedSegment(SegmentId segmentId)
{
- return Optional.of(
- new
QueryableIndexSegment(indexIO.loadIndex(segments.get(dataSegment)),
dataSegment.getId())
- );
+ for (Map.Entry<DataSegment, File> entry : segments.entrySet()) {
+ if (entry.getKey().getId().equals(segmentId)) {
+ return Optional.of(
+ new QueryableIndexSegment(indexIO.loadIndex(entry.getValue()),
segmentId)
+ );
+ }
+ }
+ return Optional.empty();
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index a6ab29d2334..e02dd9324ab 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -299,7 +299,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest
extends AbstractParallelIn
.manufacturate(tempSegmentDir, false);
try {
cacheManager.load(dataSegment);
- return cacheManager.acquireCachedSegment(dataSegment).orElseThrow();
+ return
cacheManager.acquireCachedSegment(dataSegment.getId()).orElseThrow();
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java
index a73bcb12032..33399e354a1 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java
@@ -140,15 +140,11 @@ public class RegularLoadableSegment implements
LoadableSegment
throw DruidException.defensive("Segment with descriptor[%s] is already
acquired", descriptor);
}
- if (cachedDataSegment != null) {
- final Optional<Segment> cachedSegment =
segmentManager.acquireCachedSegment(cachedDataSegment);
- if (cachedSegment.isPresent()) {
- acquired = true;
- }
- return cachedSegment;
+ final Optional<Segment> cachedSegment =
segmentManager.acquireCachedSegment(segmentId);
+ if (cachedSegment.isPresent()) {
+ acquired = true;
}
-
- return Optional.empty();
+ return cachedSegment;
}
@Override
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
index eeb24bb62f1..0193dd46331 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
@@ -212,14 +212,7 @@ public class MSQCompactionTaskRunTest extends
CompactionTaskRunBase
null
);
});
-
when(segmentCacheManager.acquireCachedSegment(any())).thenAnswer(invocation -> {
- DataSegment segment = invocation.getArgument(0);
- QueryableIndexSegment index = new QueryableIndexSegment(
- new TestUtils().getTestIndexIO().loadIndex(new File((String)
segment.getLoadSpec().get("path"))),
- segment.getId()
- );
- return Optional.of(index);
- });
+
when(segmentCacheManager.acquireCachedSegment(any())).thenReturn(Optional.empty());
GroupingEngine groupingEngine =
GroupByQueryRunnerTest.makeQueryRunnerFactory(
new GroupByQueryConfig(),
TestGroupByBuffers.createDefault()
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
index 065c3d23922..922f0f17b0a 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
@@ -25,6 +25,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.SegmentMapFunction;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import java.io.File;
@@ -70,7 +71,7 @@ public interface SegmentCacheManager
/**
* Given a {@link DataSegment}, which contains the instructions for where
and how to fetch a {@link Segment} from
* deep storage, this method tries to load and subsequently serve it to
callers via
- * {@link #acquireCachedSegment(DataSegment)} or {@link
#acquireSegment(DataSegment)}. If the segment
+ * {@link #acquireCachedSegment(SegmentId)} or {@link
#acquireSegment(DataSegment)}. If the segment
* cannot be loaded either due to error or insufficient storage space, this
method throws a
* {@link SegmentLoadingException}.
*
@@ -108,7 +109,7 @@ public interface SegmentCacheManager
* to be dropped until it has been closed. As such, the returned {@link
Segment} must be closed when the caller is
* finished doing segment things.
*/
- Optional<Segment> acquireCachedSegment(DataSegment dataSegment);
+ Optional<Segment> acquireCachedSegment(SegmentId segmentId);
/**
* Returns a {@link AcquireSegmentAction} for a given {@link DataSegment}
and {@link SegmentDescriptor}, which returns
@@ -121,8 +122,8 @@ public interface SegmentCacheManager
AcquireSegmentAction acquireSegment(DataSegment dataSegment);
/**
- * Alternative to {@link #acquireCachedSegment(DataSegment)}, to return the
{@link File} location of the segment files
- * stored in the cache, instead of a {@link Optional<Segment>}. Unlike
{@link #acquireCachedSegment(DataSegment)} and
+ * Alternative to {@link #acquireCachedSegment(SegmentId)}, to return the
{@link File} location of the segment files
+ * stored in the cache, instead of a {@link Optional<Segment>}. Unlike
{@link #acquireCachedSegment(SegmentId)} and
* {@link #acquireSegment(DataSegment)}, this method does not provide any
protections for callers,
* and should only be used by callers that are in control of when {@link
#drop(DataSegment)} is called. This method
* will not download the segment files from deep storage if they do not
already exist in the cache, callers should use
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 650f799090a..9b5496fbadb 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -400,9 +400,9 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
@Override
- public Optional<Segment> acquireCachedSegment(final DataSegment dataSegment)
+ public Optional<Segment> acquireCachedSegment(final SegmentId segmentId)
{
- final SegmentCacheEntryIdentifier cacheEntryIdentifier = new
SegmentCacheEntryIdentifier(dataSegment.getId());
+ final SegmentCacheEntryIdentifier cacheEntryIdentifier = new
SegmentCacheEntryIdentifier(segmentId);
for (StorageLocation location : locations) {
final SegmentCacheEntry cacheEntry =
location.getStaticCacheEntry(cacheEntryIdentifier);
if (cacheEntry != null) {
@@ -729,7 +729,7 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
/**
* Testing use only please, any callers that want to do stuff with segments
should use
- * {@link #acquireCachedSegment(DataSegment)} or {@link
#acquireSegment(DataSegment)} instead. Does not hold locks
+ * {@link #acquireCachedSegment(SegmentId)} or {@link
#acquireSegment(DataSegment)} instead. Does not hold locks
* and so is not really safe to use while the cache manager is active
*/
@VisibleForTesting
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java
b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index af95eafaad6..58c9254b246 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -182,9 +182,17 @@ public class SegmentManager
* download a {@link DataSegment} if it is not already present in {@link
#cacheManager}, use
* {@link #acquireSegment(DataSegment)} instead.
*/
+ public Optional<Segment> acquireCachedSegment(SegmentId segmentId)
+ {
+ return cacheManager.acquireCachedSegment(segmentId);
+ }
+
+ /**
+ * Convenience overload of {@link #acquireCachedSegment(SegmentId)} that
accepts a {@link DataSegment}.
+ */
public Optional<Segment> acquireCachedSegment(DataSegment dataSegment)
{
- return cacheManager.acquireCachedSegment(dataSegment);
+ return acquireCachedSegment(dataSegment.getId());
}
/**
@@ -311,7 +319,7 @@ public class SegmentManager
);
long numOfRows = 0;
- final Optional<Segment> loadedSegment =
cacheManager.acquireCachedSegment(dataSegment);
+ final Optional<Segment> loadedSegment =
cacheManager.acquireCachedSegment(dataSegment.getId());
if (loadedSegment.isPresent()) {
final Segment segment = loadedSegment.get();
final IndexedTable table = segment.as(IndexedTable.class);
@@ -378,7 +386,7 @@ public class SegmentManager
if (oldSegmentRef != null) {
try (final Closer closer = Closer.create()) {
- final Optional<Segment> oldSegment =
cacheManager.acquireCachedSegment(oldSegmentRef);
+ final Optional<Segment> oldSegment =
cacheManager.acquireCachedSegment(oldSegmentRef.getId());
long numberOfRows = oldSegment.map(segment -> {
closer.register(segment);
final PhysicalSegmentInspector countInspector =
segment.as(PhysicalSegmentInspector.class);
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
b/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
index 6121dcaf656..bec367ddc9a 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.loading;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import java.io.File;
@@ -95,7 +96,7 @@ public class NoopSegmentCacheManager implements
SegmentCacheManager
}
@Override
- public Optional<Segment> acquireCachedSegment(DataSegment dataSegment)
+ public Optional<Segment> acquireCachedSegment(SegmentId segmentId)
{
throw new UnsupportedOperationException();
}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index 1ba6969e934..89f87403c3a 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -993,7 +993,7 @@ class SegmentLocalCacheManagerConcurrencyTest
if (maxDelayBefore > 0) {
Thread.sleep(ThreadLocalRandom.current().nextInt(maxDelayBefore));
}
- final Optional<Segment> segmentReference =
segmentManager.acquireCachedSegment(segment).map(closer::register);
+ final Optional<Segment> segmentReference =
segmentManager.acquireCachedSegment(segment.getId()).map(closer::register);
if (segmentReference.isPresent()) {
PhysicalSegmentInspector gadget =
segmentReference.get().as(PhysicalSegmentInspector.class);
if (maxDelayAfter > 0) {
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
index f6b27472361..55310348d21 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
@@ -832,7 +832,7 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
final DataSegment dataSegment = TestSegmentUtils.makeSegment("foo", "v1",
Intervals.of("2020/2021"));
manager.bootstrap(dataSegment, SegmentLazyLoadFailCallback.NOOP);
- Segment actualBootstrapSegment =
manager.acquireCachedSegment(dataSegment).orElse(null);
+ Segment actualBootstrapSegment =
manager.acquireCachedSegment(dataSegment.getId()).orElse(null);
Assert.assertNotNull(actualBootstrapSegment);
Assert.assertEquals(dataSegment.getId(), actualBootstrapSegment.getId());
Assert.assertEquals(dataSegment.getInterval(),
actualBootstrapSegment.getDataInterval());
@@ -869,7 +869,7 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
final DataSegment dataSegment = TestSegmentUtils.makeSegment("foo", "v1",
Intervals.of("2020/2021"));
manager.bootstrap(dataSegment, () -> {});
- Segment actualBootstrapSegment =
manager.acquireCachedSegment(dataSegment).orElse(null);
+ Segment actualBootstrapSegment =
manager.acquireCachedSegment(dataSegment.getId()).orElse(null);
Assert.assertNotNull(actualBootstrapSegment);
Assert.assertEquals(dataSegment.getId(), actualBootstrapSegment.getId());
Assert.assertEquals(dataSegment.getInterval(),
actualBootstrapSegment.getDataInterval());
@@ -918,7 +918,7 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
manager.load(segmentToLoad);
Assert.assertNull(manager.getSegmentFiles(segmentToLoad));
-
Assert.assertFalse(manager.acquireCachedSegment(segmentToLoad).isPresent());
+
Assert.assertFalse(manager.acquireCachedSegment(segmentToLoad.getId()).isPresent());
AcquireSegmentAction segmentAction = manager.acquireSegment(segmentToLoad);
AcquireSegmentResult result = segmentAction.getSegmentFuture().get();
Optional<Segment> theSegment =
result.getReferenceProvider().acquireReference();
@@ -988,7 +988,7 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
manager.bootstrap(segmentToBootstrap, SegmentLazyLoadFailCallback.NOOP);
Assert.assertNull(manager.getSegmentFiles(segmentToBootstrap));
-
Assert.assertFalse(manager.acquireCachedSegment(segmentToBootstrap).isPresent());
+
Assert.assertFalse(manager.acquireCachedSegment(segmentToBootstrap.getId()).isPresent());
AcquireSegmentAction segmentAction =
manager.acquireSegment(segmentToBootstrap);
AcquireSegmentResult result = segmentAction.getSegmentFuture().get();
Optional<Segment> theSegment =
result.getReferenceProvider().acquireReference();
@@ -1138,7 +1138,7 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
manager.load(segmentToLoad);
Assert.assertNull(manager.getSegmentFiles(segmentToLoad));
-
Assert.assertFalse(manager.acquireCachedSegment(segmentToLoad).isPresent());
+
Assert.assertFalse(manager.acquireCachedSegment(segmentToLoad.getId()).isPresent());
AcquireSegmentAction segmentAction = manager.acquireSegment(segmentToLoad);
// now drop it before we actually load it, but dropping a weakly held
reference does not remove the entry from the
@@ -1306,7 +1306,7 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
.build();
manager.load(tombstone);
- Segment segment = manager.acquireCachedSegment(tombstone).orElse(null);
+ Segment segment =
manager.acquireCachedSegment(tombstone.getId()).orElse(null);
Assert.assertEquals(tombstone.getId(), segment.getId());
Assert.assertEquals(interval, segment.getDataInterval());
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 77d58837315..f66f4c5530d 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -159,7 +159,7 @@ public class SegmentLoadDropHandlerTest
Assert.assertEquals(ImmutableList.of(segment),
segmentAnnouncer.getObservedSegments());
Assert.assertFalse(
"segment files shouldn't be deleted",
- cacheManager.getObservedSegmentsRemovedFromCache().contains(segment)
+
cacheManager.getObservedSegmentsRemovedFromCache().contains(segment.getId())
);
}
@@ -210,7 +210,7 @@ public class SegmentLoadDropHandlerTest
Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(segment));
Assert.assertFalse(
"segment files shouldn't be deleted",
- cacheManager.getObservedSegmentsRemovedFromCache().contains(segment)
+
cacheManager.getObservedSegmentsRemovedFromCache().contains(segment.getId())
);
}
diff --git
a/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java
b/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java
index 67dff44e609..fe135005d19 100644
---
a/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java
+++
b/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java
@@ -32,6 +32,7 @@ import org.apache.druid.segment.loading.AcquireSegmentResult;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import java.util.List;
@@ -50,11 +51,12 @@ import java.util.concurrent.atomic.AtomicInteger;
public class TestSegmentCacheManager extends NoopSegmentCacheManager
{
private final List<DataSegment> cachedSegments;
- private final Map<DataSegment, ReferenceCountedSegmentProvider>
referenceProviders;
+ private final Map<SegmentId, ReferenceCountedSegmentProvider>
referenceProviders;
+ private final Map<SegmentId, DataSegment> segmentLookup;
private final List<DataSegment> observedBootstrapSegments;
private final List<DataSegment> observedSegments;
- private final List<DataSegment> observedSegmentsRemovedFromCache;
+ private final Set<SegmentId> observedSegmentsRemovedFromCache;
private final AtomicInteger observedShutdownBootstrapCount;
public TestSegmentCacheManager()
@@ -66,12 +68,13 @@ public class TestSegmentCacheManager extends
NoopSegmentCacheManager
{
this.cachedSegments = ImmutableList.copyOf(segmentsToCache);
this.referenceProviders = new ConcurrentHashMap<>();
+ this.segmentLookup = new ConcurrentHashMap<>();
// While inneficient, these CopyOnWriteArrayList objects greatly simplify
meeting the thread
// safety mandate from SegmentCacheManager. For testing, this should be ok.
this.observedBootstrapSegments = new CopyOnWriteArrayList<>();
this.observedSegments = new CopyOnWriteArrayList<>();
- this.observedSegmentsRemovedFromCache = new CopyOnWriteArrayList<>();
+ this.observedSegmentsRemovedFromCache = ConcurrentHashMap.newKeySet();
this.observedShutdownBootstrapCount = new AtomicInteger(0);
}
@@ -82,7 +85,8 @@ public class TestSegmentCacheManager extends
NoopSegmentCacheManager
*/
public void registerSegment(final DataSegment dataSegment, final Segment
segment)
{
- referenceProviders.put(dataSegment,
ReferenceCountedSegmentProvider.of(segment));
+ segmentLookup.put(dataSegment.getId(), dataSegment);
+ referenceProviders.put(dataSegment.getId(),
ReferenceCountedSegmentProvider.of(segment));
}
@Override
@@ -101,20 +105,24 @@ public class TestSegmentCacheManager extends
NoopSegmentCacheManager
public void bootstrap(DataSegment segment, SegmentLazyLoadFailCallback
loadFailed)
{
observedBootstrapSegments.add(segment);
+ getSegmentInternal(segment);
}
@Override
public void load(final DataSegment segment)
{
observedSegments.add(segment);
+ getSegmentInternal(segment);
}
private ReferenceCountedSegmentProvider getSegmentInternal(final DataSegment
segment)
{
+ segmentLookup.putIfAbsent(segment.getId(), segment);
return referenceProviders.compute(
- segment,
- (s, existingProvider) -> {
+ segment.getId(),
+ (id, existingProvider) -> {
if (existingProvider == null) {
+ final DataSegment s = segmentLookup.get(id);
if (s.isTombstone()) {
return
ReferenceCountedSegmentProvider.of(TombstoneSegmentizerFactory.segmentForTombstone(s));
} else {
@@ -133,18 +141,22 @@ public class TestSegmentCacheManager extends
NoopSegmentCacheManager
}
@Override
- public Optional<Segment> acquireCachedSegment(DataSegment dataSegment)
+ public Optional<Segment> acquireCachedSegment(SegmentId segmentId)
{
- if (observedSegmentsRemovedFromCache.contains(dataSegment)) {
+ if (observedSegmentsRemovedFromCache.contains(segmentId)) {
return Optional.empty();
}
- return getSegmentInternal(dataSegment).acquireReference();
+ final ReferenceCountedSegmentProvider provider =
referenceProviders.get(segmentId);
+ if (provider == null) {
+ return Optional.empty();
+ }
+ return provider.acquireReference();
}
@Override
public AcquireSegmentAction acquireSegment(DataSegment dataSegment)
{
- if (observedSegmentsRemovedFromCache.contains(dataSegment)) {
+ if (observedSegmentsRemovedFromCache.contains(dataSegment.getId())) {
return AcquireSegmentAction.missingSegment();
}
return new AcquireSegmentAction(
@@ -181,7 +193,7 @@ public class TestSegmentCacheManager extends
NoopSegmentCacheManager
public void drop(DataSegment segment)
{
getSegmentInternal(segment).close();
- observedSegmentsRemovedFromCache.add(segment);
+ observedSegmentsRemovedFromCache.add(segment.getId());
}
public List<DataSegment> getObservedBootstrapSegments()
@@ -195,7 +207,7 @@ public class TestSegmentCacheManager extends
NoopSegmentCacheManager
}
- public List<DataSegment> getObservedSegmentsRemovedFromCache()
+ public Set<SegmentId> getObservedSegmentsRemovedFromCache()
{
return observedSegmentsRemovedFromCache;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]