This is an automated email from the ASF dual-hosted git repository. cwylie pushed a commit to branch 35.0.1 in repository https://gitbox.apache.org/repos/asf/druid.git
commit 35de31f5918fc88f123ef94797ba8f91fe825ba7 Author: Clint Wylie <[email protected]> AuthorDate: Fri Nov 28 16:17:59 2025 -0800 fix leaky segment reference on segment drop (#18782) --- .../org/apache/druid/segment/TestSegmentUtils.java | 8 ++++++++ .../segment/loading/SegmentLocalCacheManager.java | 19 +++++++++++++++++++ .../java/org/apache/druid/server/SegmentManager.java | 2 +- .../org/apache/druid/server/SegmentManagerTest.java | 16 +++++++++++++++- 4 files changed, 43 insertions(+), 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java b/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java index f5762a16574..1a24d41adf8 100644 --- a/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java +++ b/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java @@ -126,6 +126,12 @@ public class TestSegmentUtils { return Cursors.ascendingTimeOrder(); } + + @Override + public int getNumRows() + { + return 1234; + } }; public static class SegmentForTesting extends QueryableIndexSegment @@ -178,6 +184,8 @@ public class TestSegmentUtils return (T) INDEX; } else if (clazz.equals(CursorFactory.class)) { return (T) new QueryableIndexCursorFactory(INDEX); + } else if (clazz.equals(PhysicalSegmentInspector.class)) { + return (T) new QueryableIndexPhysicalSegmentInspector(INDEX); } return null; } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java index 162e20a28d1..89a28f4c538 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java @@ -551,6 +551,25 @@ public class SegmentLocalCacheManager implements SegmentCacheManager return false; } + /** + * Testing use only please, any callers that want to do stuff with segments should use + * {@link #acquireCachedSegment(DataSegment)} or {@link #acquireSegment(DataSegment)} instead. Does not hold locks + * and so is not really safe to use while the cache manager is active + */ + @VisibleForTesting + @Nullable + public ReferenceCountedSegmentProvider getSegmentReferenceProvider(DataSegment segment) + { + final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment); + for (StorageLocation location : locations) { + final SegmentCacheEntry entry = location.getCacheEntry(cacheEntry.id); + if (entry != null) { + return entry.referenceProvider; + } + } + return null; + } + /** * Returns the effective segment info directory based on the configuration settings. * The directory is selected based on the following configurations injected into this class: diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 3ac628bbbd7..527715c9274 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -329,11 +329,11 @@ public class SegmentManager try (final Closer closer = Closer.create()) { final Optional<Segment> oldSegment = cacheManager.acquireCachedSegment(oldSegmentRef); long numberOfRows = oldSegment.map(segment -> { + closer.register(segment); final PhysicalSegmentInspector countInspector = segment.as(PhysicalSegmentInspector.class); if (countInspector != null) { return countInspector.getNumRows(); } - CloseableUtils.closeAndWrapExceptions(segment); return 0; }).orElse(0); diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index fbb63a2d38d..9b85588608e 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.Ordering; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.query.TableDataSource; +import org.apache.druid.segment.ReferenceCountedSegmentProvider; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestIndex; @@ -73,6 +74,7 @@ public class SegmentManagerTest ); private ExecutorService executor; + private SegmentLocalCacheManager cacheManager; private SegmentManager segmentManager; @Rule @@ -104,7 +106,7 @@ public class SegmentManagerTest objectMapper.registerSubtypes(TestSegmentUtils.TestSegmentizerFactory.class); final List<StorageLocation> storageLocations = loaderConfig.toStorageLocations(); - final SegmentLocalCacheManager cacheManager = new SegmentLocalCacheManager( + cacheManager = new SegmentLocalCacheManager( storageLocations, loaderConfig, new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations), @@ -169,8 +171,12 @@ public class SegmentManagerTest @Test public void testDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException, IOException { + List<ReferenceCountedSegmentProvider> referenceProviders = new ArrayList<>(); for (DataSegment eachSegment : SEGMENTS) { segmentManager.loadSegment(eachSegment); + ReferenceCountedSegmentProvider refProvider = cacheManager.getSegmentReferenceProvider(eachSegment); + referenceProviders.add(refProvider); + Assert.assertFalse(refProvider.isClosed()); } final List<Future<Void>> futures = ImmutableList.of(SEGMENTS.get(0), SEGMENTS.get(2)).stream() @@ -191,6 +197,14 @@ public class SegmentManagerTest assertResult( ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(3), SEGMENTS.get(4)) ); + for (int i = 0; i < SEGMENTS.size(); i++) { + Assert.assertEquals(0, referenceProviders.get(i).getNumReferences()); + if (i == 0 || i == 2) { + Assert.assertTrue(referenceProviders.get(i).isClosed()); + } else { + Assert.assertFalse(referenceProviders.get(i).isClosed()); + } + } } private Void loadSegmentOrFail(DataSegment segment) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
