This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0e81a19add0 Fix two issues where CursorHolders were not closed on
failure of asCursor(). (#17915)
0e81a19add0 is described below
commit 0e81a19add0ccf4ed441d7c6549164b03cb901b0
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Apr 15 02:59:39 2025 -0700
Fix two issues where CursorHolders were not closed on failure of
asCursor(). (#17915)
* Fix two issues where CursorHolders were not closed on failure of
asCursor().
One in TopNQueryEngine (native) and one in ScanQueryFrameProcessor (MSQ).
This is especially important because asCursor() itself can fail in ways that
leave the CursorHolder with resources that must be closed.
* Fix style.
---
.../msq/querykit/scan/ScanQueryFrameProcessor.java | 15 ++-
.../apache/druid/query/topn/TopNQueryEngine.java | 104 +++++++++++----------
2 files changed, 69 insertions(+), 50 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 1be191bddbe..3f42e6c8e5f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -79,6 +79,7 @@ import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
@@ -192,7 +193,8 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
}
@Override
- protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(final
DataServerQueryHandler dataServerQueryHandler) throws IOException
+ protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(final
DataServerQueryHandler dataServerQueryHandler)
+ throws IOException
{
if (cursor == null) {
ScanQuery preparedQuery = prepareScanQueryForDataServer(query);
@@ -268,7 +270,16 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
null
)
);
- final Cursor nextCursor = nextCursorHolder.asCursor();
+
+ final Cursor nextCursor;
+
+ // If asCursor() fails, we need to close nextCursorHolder immediately.
+ try {
+ nextCursor = nextCursorHolder.asCursor();
+ }
+ catch (Throwable t) {
+ throw CloseableUtils.closeAndWrapInCatch(t, nextCursorHolder);
+ }
if (nextCursor == null) {
// No cursors!
diff --git
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index c8aa3292158..2b1e5f35a3e 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -51,6 +51,7 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@@ -89,61 +90,68 @@ public class TopNQueryEngine
final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, queryMetrics);
final CursorHolder cursorHolder =
cursorFactory.makeCursorHolder(buildSpec);
- if (cursorHolder.isPreAggregated()) {
- query =
query.withAggregatorSpecs(Preconditions.checkNotNull(cursorHolder.getAggregatorsForPreAggregated()));
- }
- final Cursor cursor = cursorHolder.asCursor();
- if (cursor == null) {
- return Sequences.withBaggage(Sequences.empty(), cursorHolder);
- }
- final TimeBoundaryInspector timeBoundaryInspector =
segment.as(TimeBoundaryInspector.class);
+ // Once we have a cursorHolder, we need to either return a Sequence, or
close it immediately.
+ try {
+ if (cursorHolder.isPreAggregated()) {
+ query =
query.withAggregatorSpecs(Preconditions.checkNotNull(cursorHolder.getAggregatorsForPreAggregated()));
+ }
+ final Cursor cursor = cursorHolder.asCursor();
+ if (cursor == null) {
+ return Sequences.withBaggage(Sequences.empty(), cursorHolder);
+ }
- final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+ final TimeBoundaryInspector timeBoundaryInspector =
segment.as(TimeBoundaryInspector.class);
- final ColumnSelectorPlus<TopNColumnAggregatesProcessor<?>> selectorPlus =
- DimensionHandlerUtils.createColumnSelectorPlus(
- new
TopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()),
- query.getDimensionSpec(),
- factory
- );
+ final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
- final int cardinality;
- if (selectorPlus.getSelector() instanceof DimensionDictionarySelector) {
- cardinality = ((DimensionDictionarySelector)
selectorPlus.getSelector()).getValueCardinality();
- } else {
- cardinality = DimensionDictionarySelector.CARDINALITY_UNKNOWN;
- }
- final TopNCursorInspector cursorInspector = new TopNCursorInspector(
- factory,
- segment.as(TopNOptimizationInspector.class),
- segment.getDataInterval(),
- cardinality
- );
+ final ColumnSelectorPlus<TopNColumnAggregatesProcessor<?>> selectorPlus =
+ DimensionHandlerUtils.createColumnSelectorPlus(
+ new
TopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()),
+ query.getDimensionSpec(),
+ factory
+ );
- final CursorGranularizer granularizer = CursorGranularizer.create(
- cursor,
- timeBoundaryInspector,
- cursorHolder.getTimeOrder(),
- query.getGranularity(),
- buildSpec.getInterval()
- );
- if (granularizer == null || selectorPlus.getSelector() == null) {
- return Sequences.withBaggage(Sequences.empty(), cursorHolder);
- }
+ final int cardinality;
+ if (selectorPlus.getSelector() instanceof DimensionDictionarySelector) {
+ cardinality = ((DimensionDictionarySelector)
selectorPlus.getSelector()).getValueCardinality();
+ } else {
+ cardinality = DimensionDictionarySelector.CARDINALITY_UNKNOWN;
+ }
+ final TopNCursorInspector cursorInspector = new TopNCursorInspector(
+ factory,
+ segment.as(TopNOptimizationInspector.class),
+ segment.getDataInterval(),
+ cardinality
+ );
- if (queryMetrics != null) {
- queryMetrics.cursor(cursor);
+ final CursorGranularizer granularizer = CursorGranularizer.create(
+ cursor,
+ timeBoundaryInspector,
+ cursorHolder.getTimeOrder(),
+ query.getGranularity(),
+ buildSpec.getInterval()
+ );
+ if (granularizer == null || selectorPlus.getSelector() == null) {
+ return Sequences.withBaggage(Sequences.empty(), cursorHolder);
+ }
+
+ if (queryMetrics != null) {
+ queryMetrics.cursor(cursor);
+ }
+ final TopNMapFn mapFn = getMapFn(query, cursorInspector, queryMetrics);
+ return Sequences.filter(
+ Sequences.simple(granularizer.getBucketIterable())
+ .map(bucketInterval -> {
+ granularizer.advanceToBucket(bucketInterval);
+ return mapFn.apply(cursor, selectorPlus, granularizer,
queryMetrics);
+ }),
+ Predicates.notNull()
+ ).withBaggage(cursorHolder);
+ }
+ catch (Throwable t) {
+ throw CloseableUtils.closeAndWrapInCatch(t, cursorHolder);
}
- final TopNMapFn mapFn = getMapFn(query, cursorInspector, queryMetrics);
- return Sequences.filter(
- Sequences.simple(granularizer.getBucketIterable())
- .map(bucketInterval -> {
- granularizer.advanceToBucket(bucketInterval);
- return mapFn.apply(cursor, selectorPlus, granularizer,
queryMetrics);
- }),
- Predicates.notNull()
- ).withBaggage(cursorHolder);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]