This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5e7266cd09a SegmentManager: Close references on failure of
getSegmentsBundle. (#19070)
5e7266cd09a is described below
commit 5e7266cd09af819d5a1cd7f5d1fc5b3aa8fd0ae8
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Mar 2 10:43:55 2026 -0800
SegmentManager: Close references on failure of getSegmentsBundle. (#19070)
Similar to the approach in ServerManager#getOrLoadBundleSegments, we
use a safetyNet to close references if bundle creation fails.
---
.../org/apache/druid/server/SegmentManager.java | 62 ++++++++++++++--------
1 file changed, 39 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java
b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index 58c9254b246..a3fcbe4907a 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -149,31 +149,47 @@ public class SegmentManager
SegmentMapFunction segmentMapFunction
)
{
- final ArrayList<SegmentReference> segmentReferences = new ArrayList<>();
- final ArrayList<SegmentDescriptor> missingSegments = new ArrayList<>();
- final ArrayList<DataSegmentAndDescriptor> loadableSegments = new
ArrayList<>();
- for (DataSegmentAndDescriptor segment : segments) {
- final DataSegment dataSegment = segment.getDataSegment();
- if (dataSegment == null) {
- missingSegments.add(segment.getDescriptor());
- continue;
- }
- Optional<Segment> ref = acquireCachedSegment(dataSegment);
- if (ref.isPresent()) {
- segmentReferences.add(
- new SegmentReference(
- segment.getDescriptor(),
- segmentMapFunction.apply(ref),
- null
- )
- );
- } else if (canLoadSegmentOnDemand(dataSegment)) {
- loadableSegments.add(segment);
- } else {
- missingSegments.add(segment.getDescriptor());
+ // Closer to collect everything that needs to be cleaned up in the event
of failure. If we make it
+ // out of this function, closing the segment references is the caller's
responsibility.
+ final Closer safetyNet = Closer.create();
+ try {
+ final ArrayList<SegmentReference> segmentReferences = new ArrayList<>();
+ final ArrayList<SegmentDescriptor> missingSegments = new ArrayList<>();
+ final ArrayList<DataSegmentAndDescriptor> loadableSegments = new
ArrayList<>();
+ for (final DataSegmentAndDescriptor segment : segments) {
+ final DataSegment dataSegment = segment.getDataSegment();
+ if (dataSegment == null) {
+ missingSegments.add(segment.getDescriptor());
+ continue;
+ }
+ final Optional<Segment> ref = acquireCachedSegment(dataSegment);
+ if (ref.isPresent()) {
+ try {
+ final Optional<Segment> mapped =
segmentMapFunction.apply(ref).map(safetyNet::register);
+ segmentReferences.add(
+ new SegmentReference(
+ segment.getDescriptor(),
+ mapped,
+ null
+ )
+ );
+ }
+ catch (Throwable t) {
+ // If applying the mapFn failed, attach the base segment to the
closer and rethrow
+ ref.ifPresent(safetyNet::register);
+ throw t;
+ }
+ } else if (canLoadSegmentOnDemand(dataSegment)) {
+ loadableSegments.add(segment);
+ } else {
+ missingSegments.add(segment.getDescriptor());
+ }
}
+ return new LeafSegmentsBundle(segmentReferences, loadableSegments,
missingSegments);
+ }
+ catch (Throwable t) {
+ throw CloseableUtils.closeAndWrapInCatch(t, safetyNet);
}
- return new LeafSegmentsBundle(segmentReferences, loadableSegments,
missingSegments);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]