This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch 33.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/33.0.0 by this push:
new 5cd6b683bc4 Fix two issues where CursorHolders were not closed on
failure of asCursor(). (#17915) (#17925)
5cd6b683bc4 is described below
commit 5cd6b683bc4b3585f0f243b05eb5ab43834a3dbe
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Wed Apr 16 06:47:22 2025 +0200
Fix two issues where CursorHolders were not closed on failure of
asCursor(). (#17915) (#17925)
* Fix two issues where CursorHolders were not closed on failure of
asCursor().
One in TopNQueryEngine (native) and one in ScanQueryFrameProcessor (MSQ).
This is especially important because asCursor() itself can fail in ways that
leave the CursorHolder with resources that must be closed.
* Fix style.
(cherry picked from commit 0e81a19add0ccf4ed441d7c6549164b03cb901b0)
Co-authored-by: Gian Merlino <[email protected]>
---
.../msq/querykit/scan/ScanQueryFrameProcessor.java | 15 ++-
.../apache/druid/query/topn/TopNQueryEngine.java | 104 +++++++++++----------
2 files changed, 69 insertions(+), 50 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 1be191bddbe..3f42e6c8e5f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -79,6 +79,7 @@ import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
@@ -192,7 +193,8 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
}
@Override
- protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(final
DataServerQueryHandler dataServerQueryHandler) throws IOException
+ protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(final
DataServerQueryHandler dataServerQueryHandler)
+ throws IOException
{
if (cursor == null) {
ScanQuery preparedQuery = prepareScanQueryForDataServer(query);
@@ -268,7 +270,16 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
null
)
);
- final Cursor nextCursor = nextCursorHolder.asCursor();
+
+ final Cursor nextCursor;
+
+ // If asCursor() fails, we need to close nextCursorHolder immediately.
+ try {
+ nextCursor = nextCursorHolder.asCursor();
+ }
+ catch (Throwable t) {
+ throw CloseableUtils.closeAndWrapInCatch(t, nextCursorHolder);
+ }
if (nextCursor == null) {
// No cursors!
diff --git
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index c8aa3292158..2b1e5f35a3e 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -51,6 +51,7 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@@ -89,61 +90,68 @@ public class TopNQueryEngine
final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, queryMetrics);
final CursorHolder cursorHolder =
cursorFactory.makeCursorHolder(buildSpec);
- if (cursorHolder.isPreAggregated()) {
- query =
query.withAggregatorSpecs(Preconditions.checkNotNull(cursorHolder.getAggregatorsForPreAggregated()));
- }
- final Cursor cursor = cursorHolder.asCursor();
- if (cursor == null) {
- return Sequences.withBaggage(Sequences.empty(), cursorHolder);
- }
- final TimeBoundaryInspector timeBoundaryInspector =
segment.as(TimeBoundaryInspector.class);
+ // Once we have a cursorHolder, we need to either return a Sequence, or
close it immediately.
+ try {
+ if (cursorHolder.isPreAggregated()) {
+ query =
query.withAggregatorSpecs(Preconditions.checkNotNull(cursorHolder.getAggregatorsForPreAggregated()));
+ }
+ final Cursor cursor = cursorHolder.asCursor();
+ if (cursor == null) {
+ return Sequences.withBaggage(Sequences.empty(), cursorHolder);
+ }
- final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+ final TimeBoundaryInspector timeBoundaryInspector =
segment.as(TimeBoundaryInspector.class);
- final ColumnSelectorPlus<TopNColumnAggregatesProcessor<?>> selectorPlus =
- DimensionHandlerUtils.createColumnSelectorPlus(
- new
TopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()),
- query.getDimensionSpec(),
- factory
- );
+ final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
- final int cardinality;
- if (selectorPlus.getSelector() instanceof DimensionDictionarySelector) {
- cardinality = ((DimensionDictionarySelector)
selectorPlus.getSelector()).getValueCardinality();
- } else {
- cardinality = DimensionDictionarySelector.CARDINALITY_UNKNOWN;
- }
- final TopNCursorInspector cursorInspector = new TopNCursorInspector(
- factory,
- segment.as(TopNOptimizationInspector.class),
- segment.getDataInterval(),
- cardinality
- );
+ final ColumnSelectorPlus<TopNColumnAggregatesProcessor<?>> selectorPlus =
+ DimensionHandlerUtils.createColumnSelectorPlus(
+ new
TopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()),
+ query.getDimensionSpec(),
+ factory
+ );
- final CursorGranularizer granularizer = CursorGranularizer.create(
- cursor,
- timeBoundaryInspector,
- cursorHolder.getTimeOrder(),
- query.getGranularity(),
- buildSpec.getInterval()
- );
- if (granularizer == null || selectorPlus.getSelector() == null) {
- return Sequences.withBaggage(Sequences.empty(), cursorHolder);
- }
+ final int cardinality;
+ if (selectorPlus.getSelector() instanceof DimensionDictionarySelector) {
+ cardinality = ((DimensionDictionarySelector)
selectorPlus.getSelector()).getValueCardinality();
+ } else {
+ cardinality = DimensionDictionarySelector.CARDINALITY_UNKNOWN;
+ }
+ final TopNCursorInspector cursorInspector = new TopNCursorInspector(
+ factory,
+ segment.as(TopNOptimizationInspector.class),
+ segment.getDataInterval(),
+ cardinality
+ );
- if (queryMetrics != null) {
- queryMetrics.cursor(cursor);
+ final CursorGranularizer granularizer = CursorGranularizer.create(
+ cursor,
+ timeBoundaryInspector,
+ cursorHolder.getTimeOrder(),
+ query.getGranularity(),
+ buildSpec.getInterval()
+ );
+ if (granularizer == null || selectorPlus.getSelector() == null) {
+ return Sequences.withBaggage(Sequences.empty(), cursorHolder);
+ }
+
+ if (queryMetrics != null) {
+ queryMetrics.cursor(cursor);
+ }
+ final TopNMapFn mapFn = getMapFn(query, cursorInspector, queryMetrics);
+ return Sequences.filter(
+ Sequences.simple(granularizer.getBucketIterable())
+ .map(bucketInterval -> {
+ granularizer.advanceToBucket(bucketInterval);
+ return mapFn.apply(cursor, selectorPlus, granularizer,
queryMetrics);
+ }),
+ Predicates.notNull()
+ ).withBaggage(cursorHolder);
+ }
+ catch (Throwable t) {
+ throw CloseableUtils.closeAndWrapInCatch(t, cursorHolder);
}
- final TopNMapFn mapFn = getMapFn(query, cursorInspector, queryMetrics);
- return Sequences.filter(
- Sequences.simple(granularizer.getBucketIterable())
- .map(bucketInterval -> {
- granularizer.advanceToBucket(bucketInterval);
- return mapFn.apply(cursor, selectorPlus, granularizer,
queryMetrics);
- }),
- Predicates.notNull()
- ).withBaggage(cursorHolder);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]