This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 22990f290f3 fix leaky segment reference on segment drop (#18782)
22990f290f3 is described below
commit 22990f290f3f5eba4bb19ead36ba02f92e8e5b23
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Nov 28 16:17:59 2025 -0800
fix leaky segment reference on segment drop (#18782)
---
.../org/apache/druid/segment/TestSegmentUtils.java | 8 ++++++++
.../segment/loading/SegmentLocalCacheManager.java | 19 +++++++++++++++++++
.../java/org/apache/druid/server/SegmentManager.java | 2 +-
.../org/apache/druid/server/SegmentManagerTest.java | 20 +++++++++++++++++---
4 files changed, 45 insertions(+), 4 deletions(-)
diff --git
a/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java
b/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java
index f5762a16574..1a24d41adf8 100644
--- a/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java
+++ b/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java
@@ -126,6 +126,12 @@ public class TestSegmentUtils
{
return Cursors.ascendingTimeOrder();
}
+
+ @Override
+ public int getNumRows()
+ {
+ return 1234;
+ }
};
public static class SegmentForTesting extends QueryableIndexSegment
@@ -178,6 +184,8 @@ public class TestSegmentUtils
return (T) INDEX;
} else if (clazz.equals(CursorFactory.class)) {
return (T) new QueryableIndexCursorFactory(INDEX);
+ } else if (clazz.equals(PhysicalSegmentInspector.class)) {
+ return (T) new QueryableIndexPhysicalSegmentInspector(INDEX);
}
return null;
}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 3cdf2f1bf64..bc7f2d8d247 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -648,6 +648,25 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
return false;
}
+ /**
+ * Testing use only please, any callers that want to do stuff with segments
should use
+ * {@link #acquireCachedSegment(DataSegment)} or {@link
#acquireSegment(DataSegment)} instead. Does not hold locks
+ * and so is not really safe to use while the cache manager is active
+ */
+ @VisibleForTesting
+ @Nullable
+ public ReferenceCountedSegmentProvider
getSegmentReferenceProvider(DataSegment segment)
+ {
+ final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
+ for (StorageLocation location : locations) {
+ final SegmentCacheEntry entry = location.getCacheEntry(cacheEntry.id);
+ if (entry != null) {
+ return entry.referenceProvider;
+ }
+ }
+ return null;
+ }
+
/**
* Returns the effective segment info directory based on the configuration
settings.
* The directory is selected based on the following configurations injected
into this class:
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java
b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index cf8314088ec..82b4d44a4fe 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -380,11 +380,11 @@ public class SegmentManager
try (final Closer closer = Closer.create()) {
final Optional<Segment> oldSegment =
cacheManager.acquireCachedSegment(oldSegmentRef);
long numberOfRows = oldSegment.map(segment -> {
+ closer.register(segment);
final PhysicalSegmentInspector countInspector =
segment.as(PhysicalSegmentInspector.class);
if (countInspector != null) {
return countInspector.getNumRows();
}
- CloseableUtils.closeAndWrapExceptions(segment);
return 0;
}).orElse(0);
diff --git
a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 3e05eb04396..214b52bad95 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.ReferenceCountedSegmentProvider;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.SegmentMapFunction;
import org.apache.druid.segment.TestHelper;
@@ -45,7 +46,6 @@ import org.apache.druid.segment.loading.AcquireSegmentResult;
import
org.apache.druid.segment.loading.LeastBytesUsedStorageLocationSelectorStrategy;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalLoadSpec;
-import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
@@ -90,7 +90,9 @@ public class SegmentManagerTest extends
InitializedNullHandlingTest
);
private ExecutorService executor;
+ private SegmentLocalCacheManager cacheManager;
private SegmentManager segmentManager;
+ private SegmentLocalCacheManager virtualCacheManager;
private SegmentManager virtualSegmentManager;
@Rule
@@ -160,7 +162,7 @@ public class SegmentManagerTest extends
InitializedNullHandlingTest
);
final List<StorageLocation> storageLocations =
loaderConfig.toStorageLocations();
- final SegmentLocalCacheManager cacheManager = new SegmentLocalCacheManager(
+ cacheManager = new SegmentLocalCacheManager(
storageLocations,
loaderConfig,
new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations),
@@ -170,7 +172,7 @@ public class SegmentManagerTest extends
InitializedNullHandlingTest
segmentManager = new SegmentManager(cacheManager);
final List<StorageLocation> virtualStorageLocations =
virtualLoaderConfig.toStorageLocations();
- final SegmentCacheManager virtualCacheManager = new
SegmentLocalCacheManager(
+ virtualCacheManager = new SegmentLocalCacheManager(
virtualStorageLocations,
virtualLoaderConfig,
new
LeastBytesUsedStorageLocationSelectorStrategy(virtualStorageLocations),
@@ -238,8 +240,12 @@ public class SegmentManagerTest extends
InitializedNullHandlingTest
@Test
public void testDropSegment() throws SegmentLoadingException,
ExecutionException, InterruptedException, IOException
{
+ List<ReferenceCountedSegmentProvider> referenceProviders = new
ArrayList<>();
for (DataSegment eachSegment : SEGMENTS) {
segmentManager.loadSegment(eachSegment);
+ ReferenceCountedSegmentProvider refProvider =
cacheManager.getSegmentReferenceProvider(eachSegment);
+ referenceProviders.add(refProvider);
+ Assert.assertFalse(refProvider.isClosed());
}
final List<Future<Void>> futures = ImmutableList.of(SEGMENTS.get(0),
SEGMENTS.get(2)).stream()
@@ -260,6 +266,14 @@ public class SegmentManagerTest extends
InitializedNullHandlingTest
assertResult(
ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(3), SEGMENTS.get(4))
);
+ for (int i = 0; i < SEGMENTS.size(); i++) {
+ Assert.assertEquals(0, referenceProviders.get(i).getNumReferences());
+ if (i == 0 || i == 2) {
+ Assert.assertTrue(referenceProviders.get(i).isClosed());
+ } else {
+ Assert.assertFalse(referenceProviders.get(i).isClosed());
+ }
+ }
}
private Void loadSegmentOrFail(DataSegment segment)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]