This is an automated email from the ASF dual-hosted git repository.
shauryachats pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 40af9051d29 [pinot-server/ proactive-query-killing] (2/2) integrate
query scan cost based instrumentation and killing in server operators (#18475)
40af9051d29 is described below
commit 40af9051d294f1304db65b79f4556e110d909f73
Author: Anurag Rai <[email protected]>
AuthorDate: Wed Jun 17 22:01:34 2026 +0530
[pinot-server/ proactive-query-killing] (2/2) integrate query scan cost
based instrumentation and killing in server operators (#18475)
* add integration of query scan cost based killing in all server operators
* fix claude review found issues
* add entries scanned post filter instrumentation in query killing
* add per-table scanKillingMode override to proactive query killing
Extends QueryConfig with a scanKillingMode field (disabled/logOnly/enforce)
that lets individual tables override the cluster-level scan killing mode.
Stores the resolved mode as a volatile field on QueryExecutionContext during
query init and applies it in QueryKillingManager.checkAndKillWithStrategy()
ahead of the cluster config — enabling patterns like cluster=logOnly +
table=enforce for targeted enforcement without a cluster-wide mode change.
Invalid mode strings are rejected at QueryConfig construction time via
Preconditions. Includes 4 new manager tests and 7 new QueryConfig tests.
Co-Authored-By: Claude Sonnet 4.6 (1M context) <[email protected]>
* fix dynamic ZK config changes not being picked up by QueryKillingManager
QueryKillingManager.onChange() received raw ZK keys with the full
"pinot.query.scheduler." prefix but passed them directly to
QueryMonitorConfig's update constructor, which checks for keys
without that prefix (e.g. "accounting.scan.based.killing.mode").
The mismatch caused all dynamic config changes to be silently ignored,
requiring a server restart for scan-killing config to take effect.
Strip the prefix before passing to QueryMonitorConfig, matching the
key space the init constructor uses. Add early return when no relevant
keys changed, and log the applied config values after rebuild.
Co-Authored-By: Claude Sonnet 4.6 (1M context) <[email protected]>
* switch server metrics for query killing to table specific and fall back
to global when table name unavailable
* make sure dry run metrics and logs are emitted per query only
* reduce nested bracket for checkScanBasedKilling
---------
Co-authored-by: Claude Sonnet 4.6 (1M context) <[email protected]>
---
.../pinot/core/accounting/QueryMonitorConfig.java | 3 +-
.../apache/pinot/core/operator/BaseOperator.java | 33 ++
.../pinot/core/operator/DocIdSetOperator.java | 12 +
.../core/operator/query/AggregationOperator.java | 7 +
.../core/operator/query/DistinctOperator.java | 7 +
.../query/FilteredAggregationOperator.java | 7 +
.../operator/query/FilteredGroupByOperator.java | 7 +
.../pinot/core/operator/query/GroupByOperator.java | 7 +
.../core/operator/query/SelectionOnlyOperator.java | 7 +
.../operator/query/SelectionOrderByOperator.java | 11 +
.../SelectionPartiallyOrderedByDescOperation.java | 7 +
.../SelectionPartiallyOrderedByLinearOperator.java | 7 +
.../streaming/StreamingSelectionOnlyOperator.java | 7 +
.../query/executor/ServerQueryExecutorV1Impl.java | 55 ++
.../killing/CompositeQueryKillingStrategy.java | 4 +-
.../pinot/core/query/killing/QueryKillReport.java | 17 +-
.../core/query/killing/QueryKillingManager.java | 192 ++++++-
.../core/query/killing/QueryKillingStrategy.java | 2 +-
.../strategy/ScanEntriesThresholdStrategy.java | 46 +-
.../query/OperatorScanCostTrackingTest.java | 207 +++++++
.../killing/CompositeQueryKillingStrategyTest.java | 2 +-
.../core/query/killing/QueryKillReportTest.java | 7 +
.../query/killing/QueryKillingManagerTest.java | 605 ++++++++++++++++++++-
.../strategy/ScanEntriesThresholdStrategyTest.java | 62 ++-
.../server/starter/helix/BaseServerStarter.java | 11 +
.../apache/pinot/spi/config/table/QueryConfig.java | 27 +-
.../pinot/spi/query/QueryExecutionContext.java | 81 +++
.../config/table/QueryConfigScanKillingTest.java | 68 +++
28 files changed, 1457 insertions(+), 51 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
index 920648ca506..c5bfcde27ad 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
@@ -158,7 +158,8 @@ public class QueryMonitorConfig {
(String) null);
}
- QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String> changedConfigs,
Map<String, String> clusterConfigs) {
+ public QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String>
changedConfigs,
+ Map<String, String> clusterConfigs) {
_maxHeapSize = oldConfig._maxHeapSize;
if
(changedConfigs.contains(Accounting.Keys.MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO)) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
index a99855a07a7..22413450a12 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
@@ -25,6 +25,10 @@ import java.util.stream.Collectors;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.plan.ExplainInfo;
+import org.apache.pinot.core.query.killing.QueryKillingManager;
+import org.apache.pinot.spi.exception.TerminationException;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryScanCostContext;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.InvocationScope;
import org.apache.pinot.spi.trace.Tracing;
@@ -55,10 +59,39 @@ public abstract class BaseOperator<T extends Block>
implements Operator<T> {
protected void checkTermination() {
QueryThreadContext.checkTermination(this::getExplainName);
+ checkScanBasedKilling();
}
protected void checkTerminationAndSampleUsage() {
QueryThreadContext.checkTerminationAndSampleUsage(this::getExplainName);
+ checkScanBasedKilling();
+ }
+
+ private void checkScanBasedKilling() {
+ QueryKillingManager killingManager = QueryKillingManager.getInstance();
+ if (killingManager == null) {
+ return;
+ }
+ QueryThreadContext ctx = QueryThreadContext.getIfAvailable();
+ if (ctx == null) {
+ return;
+ }
+ QueryExecutionContext execCtx = ctx.getExecutionContext();
+ QueryScanCostContext scanCost = execCtx.getQueryScanCostContext();
+ if (scanCost == null) {
+ return;
+ }
+ killingManager.checkAndKillIfNeeded(execCtx, scanCost);
+ TerminationException te = execCtx.getTerminateException();
+ if (te != null) {
+ throw te;
+ }
+ }
+
+ @javax.annotation.Nullable
+ protected static QueryScanCostContext getScanCostContext() {
+ QueryThreadContext ctx = QueryThreadContext.getIfAvailable();
+ return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() :
null;
}
protected List<ExplainInfo> getChildrenExplainInfo() {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
index f7d87453a0f..c96936c3682 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
@@ -28,6 +28,7 @@ import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.spi.query.QueryScanCostContext;
import org.apache.pinot.spi.query.QueryThreadContext;
@@ -49,6 +50,7 @@ public class DocIdSetOperator extends BaseDocIdSetOperator {
private BlockDocIdSet _blockDocIdSet;
private BlockDocIdIterator _blockDocIdIterator;
private int _currentDocId = 0;
+ private long _lastReportedEntriesScanned = 0;
public DocIdSetOperator(BaseFilterOperator filterOperator, int
maxSizeOfDocIdSet) {
Preconditions.checkArgument(maxSizeOfDocIdSet > 0 && maxSizeOfDocIdSet <=
DocIdSetPlanNode.MAX_DOC_PER_CALL);
@@ -80,6 +82,16 @@ public class DocIdSetOperator extends BaseDocIdSetOperator {
docIds[pos++] = _currentDocId;
}
if (pos > 0) {
+ // Push scan cost delta for proactive query killing
+ QueryScanCostContext scanCost = getScanCostContext();
+ if (scanCost != null) {
+ long currentTotal = _blockDocIdSet.getNumEntriesScannedInFilter();
+ long delta = currentTotal - _lastReportedEntriesScanned;
+ if (delta > 0) {
+ scanCost.addEntriesScannedInFilter(delta);
+ _lastReportedEntriesScanned = currentTotal;
+ }
+ }
return new DocIdSetBlock(docIds, pos);
} else {
return null;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
index 31ef246eb32..5035f6ea63c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
@@ -35,6 +35,7 @@ import
org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils.AggregationInfo;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor;
+import org.apache.pinot.spi.query.QueryScanCostContext;
/**
@@ -72,6 +73,12 @@ public class AggregationOperator extends
BaseOperator<AggregationResultsBlock> {
ValueBlock valueBlock;
while ((valueBlock = _projectOperator.nextBlock()) != null) {
_numDocsScanned += valueBlock.getNumDocs();
+ QueryScanCostContext scanCost = getScanCostContext();
+ if (scanCost != null) {
+ scanCost.addDocsScanned(valueBlock.getNumDocs());
+ scanCost.addEntriesScannedPostFilter(
+ (long) valueBlock.getNumDocs() *
_projectOperator.getNumColumnsProjected());
+ }
aggregationExecutor.aggregate(valueBlock);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
index 70b51ca458d..93c00d76bde 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
@@ -33,6 +33,7 @@ import org.apache.pinot.core.query.distinct.DistinctExecutor;
import org.apache.pinot.core.query.distinct.DistinctExecutorFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.query.QueryScanCostContext;
/**
@@ -60,6 +61,12 @@ public class DistinctOperator extends
BaseOperator<DistinctResultsBlock> {
ValueBlock valueBlock;
while ((valueBlock = _projectOperator.nextBlock()) != null) {
_numDocsScanned += valueBlock.getNumDocs();
+ QueryScanCostContext scanCost = getScanCostContext();
+ if (scanCost != null) {
+ scanCost.addDocsScanned(valueBlock.getNumDocs());
+ scanCost.addEntriesScannedPostFilter(
+ (long) valueBlock.getNumDocs() *
_projectOperator.getNumColumnsProjected());
+ }
if (executor.process(valueBlock)) {
break;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
index 21697367a5a..bfc56b02522 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
@@ -35,6 +35,7 @@ import
org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils.AggregationInfo;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor;
+import org.apache.pinot.spi.query.QueryScanCostContext;
/**
@@ -95,6 +96,12 @@ public class FilteredAggregationOperator extends
BaseOperator<AggregationResults
result[resultIndexMap.get(aggregationFunctions[i])] =
filteredResult.get(i);
}
_numDocsScanned += numDocsScanned;
+ QueryScanCostContext scanCost = getScanCostContext();
+ if (scanCost != null) {
+ scanCost.addDocsScanned(numDocsScanned);
+ scanCost.addEntriesScannedPostFilter(
+ (long) numDocsScanned * projectOperator.getNumColumnsProjected());
+ }
_numEntriesScannedInFilter +=
projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
_numEntriesScannedPostFilter += (long) numDocsScanned *
projectOperator.getNumColumnsProjected();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
index d0555b6e39c..0cb56dfe5eb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
@@ -48,6 +48,7 @@ import
org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
+import org.apache.pinot.spi.query.QueryScanCostContext;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,6 +161,12 @@ public class FilteredGroupByOperator extends
BaseOperator<GroupByResultsBlock> {
}
_numDocsScanned += numDocsScanned;
+ QueryScanCostContext scanCost = getScanCostContext();
+ if (scanCost != null) {
+ scanCost.addDocsScanned(numDocsScanned);
+ scanCost.addEntriesScannedPostFilter(
+ (long) numDocsScanned * projectOperator.getNumColumnsProjected());
+ }
_numEntriesScannedInFilter +=
projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
_numEntriesScannedPostFilter += (long) numDocsScanned *
projectOperator.getNumColumnsProjected();
GroupByResultHolder[] filterGroupByResults =
groupByExecutor.getGroupByResultHolders();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
index bd6f58095f3..3465484b652 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
@@ -42,6 +42,7 @@ import
org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
+import org.apache.pinot.spi.query.QueryScanCostContext;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,6 +120,12 @@ public class GroupByOperator extends
BaseOperator<GroupByResultsBlock> {
while ((valueBlock = _projectOperator.nextBlock()) != null) {
_numDocsScanned += valueBlock.getNumDocs();
+ QueryScanCostContext scanCost = getScanCostContext();
+ if (scanCost != null) {
+ scanCost.addDocsScanned(valueBlock.getNumDocs());
+ scanCost.addEntriesScannedPostFilter(
+ (long) valueBlock.getNumDocs() *
_projectOperator.getNumColumnsProjected());
+ }
groupByExecutor.process(valueBlock);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
index 8527f9dc032..6d7d6bac9d1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
@@ -38,6 +38,7 @@ import
org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.query.QueryScanCostContext;
import org.roaringbitmap.RoaringBitmap;
@@ -124,6 +125,12 @@ public class SelectionOnlyOperator extends
BaseOperator<SelectionResultsBlock> {
int numDocsToAdd = Math.min(_numRowsToKeep - _rows.size(),
valueBlock.getNumDocs());
_rows.ensureCapacity(_rows.size() + numDocsToAdd);
_numDocsScanned += numDocsToAdd;
+ QueryScanCostContext scanCost = getScanCostContext();
+ if (scanCost != null) {
+ scanCost.addDocsScanned(numDocsToAdd);
+ scanCost.addEntriesScannedPostFilter(
+ (long) numDocsToAdd * _projectOperator.getNumColumnsProjected());
+ }
if (_nullHandlingEnabled) {
for (int i = 0; i < numExpressions; i++) {
_nullBitmaps[i] = _blockValSets[i].getNullBitmap();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index 5db3e914930..d088d721abd 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -52,6 +52,7 @@ import
org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.spi.query.QueryScanCostContext;
import org.roaringbitmap.RoaringBitmap;
@@ -189,6 +190,11 @@ public class SelectionOrderByOperator extends
BaseOperator<SelectionResultsBlock
}
}
_numDocsScanned += numDocsFetched;
+ QueryScanCostContext scanCost = getScanCostContext();
+ if (scanCost != null) {
+ scanCost.addDocsScanned(numDocsFetched);
+ scanCost.addEntriesScannedPostFilter((long) numDocsFetched *
numColumnsProjected);
+ }
}
_numEntriesScannedPostFilter = (long) _numDocsScanned *
numColumnsProjected;
@@ -253,6 +259,11 @@ public class SelectionOrderByOperator extends
BaseOperator<SelectionResultsBlock
}
}
_numDocsScanned += numDocsFetched;
+ QueryScanCostContext scanCost2 = getScanCostContext();
+ if (scanCost2 != null) {
+ scanCost2.addDocsScanned(numDocsFetched);
+ scanCost2.addEntriesScannedPostFilter((long) numDocsFetched *
numColumnsProjected);
+ }
}
_numEntriesScannedPostFilter = (long) _numDocsScanned *
numColumnsProjected;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
index fb318d85be3..df4f624dedf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
@@ -30,6 +30,7 @@ import org.apache.pinot.core.operator.BaseProjectOperator;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.query.QueryScanCostContext;
/**
@@ -71,6 +72,12 @@ public class SelectionPartiallyOrderedByDescOperation
extends LinearSelectionOrd
IntFunction<Object[]> rowFetcher = fetchBlock(valueBlock, blockValSets);
int numDocsFetched = valueBlock.getNumDocs();
_numDocsScanned += numDocsFetched;
+ QueryScanCostContext scanCost = getScanCostContext();
+ if (scanCost != null) {
+ scanCost.addDocsScanned(numDocsFetched);
+ scanCost.addEntriesScannedPostFilter(
+ (long) numDocsFetched * _projectOperator.getNumColumnsProjected());
+ }
ListBuilder listBuilder = listBuilderSupplier.get();
// first, calculate the best rows on this block
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
index eaedac0003c..57237fc1ebc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
@@ -28,6 +28,7 @@ import org.apache.pinot.core.operator.DocIdOrderedOperator;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.query.QueryScanCostContext;
/**
@@ -70,6 +71,12 @@ public class SelectionPartiallyOrderedByLinearOperator
extends LinearSelectionOr
IntFunction<Object[]> rowFetcher = fetchBlock(valueBlock, blockValSets);
int numDocsFetched = valueBlock.getNumDocs();
_numDocsScanned += numDocsFetched;
+ QueryScanCostContext scanCost = getScanCostContext();
+ if (scanCost != null) {
+ scanCost.addDocsScanned(numDocsFetched);
+ scanCost.addEntriesScannedPostFilter(
+ (long) numDocsFetched * _projectOperator.getNumColumnsProjected());
+ }
for (int i = 0; i < numDocsFetched; i++) {
if (listBuilder.add(rowFetcher.apply(i))) {
return listBuilder.build();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
index bb5171c44d7..9c498a26716 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.query.QueryScanCostContext;
import org.roaringbitmap.RoaringBitmap;
@@ -122,6 +123,12 @@ public class StreamingSelectionOnlyOperator extends
BaseOperator<SelectionResult
}
}
_numDocsScanned += numDocs;
+ QueryScanCostContext scanCost = getScanCostContext();
+ if (scanCost != null) {
+ scanCost.addDocsScanned(numDocs);
+ scanCost.addEntriesScannedPostFilter(
+ (long) numDocs * _projectOperator.getNumColumnsProjected());
+ }
return new SelectionResultsBlock(_dataSchema, rows, _queryContext);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 6e1ef3c1509..ca6c6958d5b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -59,6 +59,8 @@ import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.maker.PlanMaker;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.config.SegmentPrunerConfig;
+import org.apache.pinot.core.query.killing.QueryKillingManager;
+import org.apache.pinot.core.query.killing.QueryKillingStrategy;
import org.apache.pinot.core.query.pruner.SegmentPrunerService;
import org.apache.pinot.core.query.pruner.SegmentPrunerStatistics;
import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -68,17 +70,23 @@ import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUt
import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.util.trace.TraceContext;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryScanCostContext;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.Tracer;
import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -176,6 +184,9 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
queryContext.setEnablePrefetch(_enablePrefetch);
+ // Initialize scan-based query killing for this query
+ initScanBasedKilling(queryRequest, tableNameWithType);
+
// Query scheduler wait time already exceeds query timeout, directly return
long querySchedulingTimeMs = System.currentTimeMillis() -
queryArrivalTimeMs;
if (querySchedulingTimeMs >= queryTimeoutMs) {
@@ -520,6 +531,50 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
}
+ /**
+ * Initializes scan-based query killing for this query. Sets up a {@link
QueryScanCostContext}
+ * on the current thread's {@link QueryExecutionContext} so operators can
push scan deltas, and
+ * caches the resolved per-query strategy so table-level overrides are
applied only once.
+ */
+ private void initScanBasedKilling(ServerQueryRequest queryRequest, String
tableNameWithType) {
+ QueryKillingManager killingManager = QueryKillingManager.getInstance();
+ if (killingManager == null) {
+ return;
+ }
+ QueryThreadContext ctx = QueryThreadContext.getIfAvailable();
+ if (ctx == null) {
+ return;
+ }
+ QueryExecutionContext execCtx = ctx.getExecutionContext();
+ execCtx.setTableName(tableNameWithType);
+ execCtx.setQueryId(queryRequest.getQueryId());
+ execCtx.setQueryScanCostContext(new QueryScanCostContext());
+
+ // Resolve and cache per-query strategy (applies table-level threshold
overrides)
+ QueryConfig queryConfig = null;
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
+ if (tableDataManager != null) {
+ TableConfig tableConfig =
tableDataManager.getCachedTableConfigAndSchema().getLeft();
+ if (tableConfig != null) {
+ queryConfig = tableConfig.getQueryConfig();
+ }
+ }
+ QueryKillingStrategy queryStrategy =
killingManager.resolveQueryStrategy(queryConfig);
+ if (queryStrategy != null) {
+ execCtx.setCachedKillingStrategy(queryStrategy);
+ }
+ // Resolve and store per-table kill mode override (null = use cluster mode)
+ if (queryConfig != null && queryConfig.getScanKillingMode() != null) {
+ ScanKillingMode tableMode =
ScanKillingMode.fromConfigValue(queryConfig.getScanKillingMode());
+ if (tableMode != null) {
+ execCtx.setEffectiveScanKillingMode(tableMode);
+ } else {
+ LOGGER.warn("Invalid scanKillingMode '{}' in QueryConfig for table {},
falling back to cluster mode",
+ queryConfig.getScanKillingMode(), tableNameWithType);
+ }
+ }
+ }
+
private void addPrunerStats(InstanceResponseBlock instanceResponse,
SegmentPrunerStatistics prunerStats) {
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
String.valueOf(prunerStats.getInvalidSegments()));
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
index aa363d4d360..fd596ae86fa 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
@@ -73,10 +73,10 @@ public class CompositeQueryKillingStrategy implements
QueryKillingStrategy {
@Override
public QueryKillReport buildKillReport(QueryScanCostContext ctx,
- String queryId, String tableName, String configSource) {
+ long requestId, String queryId, String tableName, String configSource) {
for (QueryKillingStrategy s : _strategies) {
if (s.shouldTerminate(ctx)) {
- return s.buildKillReport(ctx, queryId, tableName, configSource);
+ return s.buildKillReport(ctx, requestId, queryId, tableName,
configSource);
}
}
throw new IllegalStateException("buildKillReport called but no strategy
triggered");
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
index d44d873188d..f9865831424 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
@@ -25,6 +25,7 @@ import org.apache.pinot.spi.query.QueryScanCostContext;
* Immutable snapshot of a query-kill event
*/
public final class QueryKillReport {
+ private final long _requestId;
private final String _queryId;
private final String _tableName;
private final String _strategyName;
@@ -40,6 +41,7 @@ public final class QueryKillReport {
/**
* Creates a {@code QueryKillReport} by snapshotting the current state of
{@code context}.
*
+ * @param requestId server request ID for tracing
* @param queryId unique identifier of the killed query
* @param tableName fully-qualified table name (e.g. {@code
myTable_OFFLINE})
* @param strategyName name of the kill strategy that triggered the kill
@@ -49,8 +51,9 @@ public final class QueryKillReport {
* @param configSource source of the threshold config (e.g. {@code
TABLE_CONFIG})
* @param context live scan-cost context; values are snapshotted
immediately
*/
- public QueryKillReport(String queryId, String tableName, String
strategyName, String triggeringMetric,
+ public QueryKillReport(long requestId, String queryId, String tableName,
String strategyName, String triggeringMetric,
long actualValue, long thresholdValue, String configSource,
QueryScanCostContext context) {
+ _requestId = requestId;
_queryId = queryId;
_tableName = tableName;
_strategyName = strategyName;
@@ -67,6 +70,10 @@ public final class QueryKillReport {
// ----- Getters -----
+ public long getRequestId() {
+ return _requestId;
+ }
+
public String getQueryId() {
return _queryId;
}
@@ -121,13 +128,13 @@ public final class QueryKillReport {
*/
public String toCustomerMessage() {
return String.format(
- "Query '%s' on table '%s' was killed because '%s' (%,d) exceeded the
threshold (%,d) "
+ "Query '%s' (requestId=%d) on table '%s' was killed because '%s' (%,d)
exceeded the threshold (%,d) "
+ "configured in %s. "
+ "At kill time: entriesScannedInFilter=%,d, docsScanned=%,d, "
+ "entriesScannedPostFilter=%,d, elapsedMs=%d. "
+ "To reduce scan cost, consider adding a missing index (e.g.
inverted or range index) "
+ "on the filter columns.",
- _queryId, _tableName, _triggeringMetric, _actualValue,
_thresholdValue, _configSource,
+ _queryId, _requestId, _tableName, _triggeringMetric, _actualValue,
_thresholdValue, _configSource,
_snapshotEntriesScannedInFilter, _snapshotDocsScanned,
_snapshotEntriesScannedPostFilter, _elapsedTimeMs);
}
@@ -138,10 +145,10 @@ public final class QueryKillReport {
*/
public String toInternalLogMessage() {
return String.format(
- "QUERY_KILLED queryId=%s table=%s strategy=%s metric=%s actual=%d
threshold=%d "
+ "QUERY_KILLED requestId=%d queryId=%s table=%s strategy=%s metric=%s
actual=%d threshold=%d "
+ "configSource=%s entriesScannedInFilter=%d docsScanned=%d "
+ "entriesScannedPostFilter=%d elapsedMs=%d",
- _queryId, _tableName, _strategyName, _triggeringMetric, _actualValue,
_thresholdValue,
+ _requestId, _queryId, _tableName, _strategyName, _triggeringMetric,
_actualValue, _thresholdValue,
_configSource, _snapshotEntriesScannedInFilter, _snapshotDocsScanned,
_snapshotEntriesScannedPostFilter, _elapsedTimeMs);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
index ee8f0678113..6fe5c8b9e14 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
@@ -18,15 +18,22 @@
*/
package org.apache.pinot.core.query.killing;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.accounting.QueryMonitorConfig;
import
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,12 +42,12 @@ import org.slf4j.LoggerFactory;
* Central manager for scan-based query killing. Owns the guard rails and
delegates the
* actual kill decision to a {@link QueryKillingStrategy}.
*
- * The default factory is {@link ScanEntriesThresholdStrategy.Factory}, which
reads
- * scan thresholds from {@link QueryMonitorConfig}. Custom factories can be
configured
- * via {@code accounting.scan.based.killing.strategy.factory.class.name}.
+ * <p>The strategy is built once at init via a {@link
QueryKillingStrategyFactory} and rebuilt when
+ * cluster config changes via {@link #onChange}. The default factory is
+ * {@link ScanEntriesThresholdStrategy.Factory}.</p>
*
*/
-public class QueryKillingManager {
+public class QueryKillingManager implements PinotClusterConfigChangeListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(QueryKillingManager.class);
private static volatile QueryKillingManager _instance;
@@ -64,10 +71,14 @@ public class QueryKillingManager {
* Initializes the singleton instance and builds the strategy from config.
* Called once during server startup.
*/
- public static void init(AtomicReference<QueryMonitorConfig> configRef,
ServerMetrics serverMetrics) {
+ public static QueryKillingManager init(PinotConfiguration schedulerConfig,
ServerMetrics serverMetrics) {
+ long maxHeapSize = Runtime.getRuntime().maxMemory();
+ QueryMonitorConfig config = new QueryMonitorConfig(schedulerConfig,
maxHeapSize);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
QueryKillingManager manager = new QueryKillingManager(configRef,
serverMetrics);
manager.rebuildStrategy();
_instance = manager;
+ return manager;
}
@Nullable
@@ -129,6 +140,68 @@ public class QueryKillingManager {
return _strategy;
}
+ /**
+ * Handles ZK cluster config changes. Rebuilds the {@link
QueryMonitorConfig} from the delta
+ * and refreshes the killing strategy if scan-killing-related keys changed.
+ *
+ * <p>Raw ZK keys arrive with the full {@value
CommonConstants#PINOT_QUERY_SCHEDULER_PREFIX}
+ * prefix. We strip it before passing to {@link QueryMonitorConfig},
matching the key space
+ * the init constructor uses (which reads from a config already subsetted to
that prefix).</p>
+ */
+ @Override
+ public synchronized void onChange(Set<String> changedConfigs, Map<String,
String> clusterConfigs) {
+ String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + ".";
+ int prefixLen = prefix.length();
+
+ Set<String> filteredChangedConfigs = new HashSet<>();
+ for (String key : changedConfigs) {
+ if (key.startsWith(prefix)) {
+ filteredChangedConfigs.add(key.substring(prefixLen));
+ }
+ }
+
+ if (filteredChangedConfigs.isEmpty()) {
+ return;
+ }
+
+ Map<String, String> filteredClusterConfigs = new HashMap<>();
+ for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+ if (entry.getKey().startsWith(prefix)) {
+ filteredClusterConfigs.put(entry.getKey().substring(prefixLen),
entry.getValue());
+ }
+ }
+
+ QueryMonitorConfig oldConfig = _configRef.get();
+ QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig,
filteredChangedConfigs, filteredClusterConfigs);
+ _configRef.set(newConfig);
+ rebuildStrategy();
+ LOGGER.info("Scan-based killing config updated: mode={},
maxEntriesScannedInFilter={}, "
+ + "maxDocsScanned={}, maxEntriesScannedPostFilter={}",
+ newConfig.getScanBasedKillingMode(),
+ newConfig.getScanBasedKillingMaxEntriesScannedInFilter(),
+ newConfig.getScanBasedKillingMaxDocsScanned(),
+ newConfig.getScanBasedKillingMaxEntriesScannedPostFilter());
+ }
+
+ /**
+ * Convenience overload called from {@link
org.apache.pinot.core.operator.BaseOperator#checkTermination()}.
+ * Reads query context (table name, query id, cached strategy) from the
execution context.
+ */
+ public void checkAndKillIfNeeded(QueryExecutionContext executionContext,
QueryScanCostContext scanCostContext) {
+ Object cached = executionContext.getCachedKillingStrategy();
+ QueryKillingStrategy cachedStrategy;
+ if (cached instanceof QueryKillingStrategy) {
+ cachedStrategy = (QueryKillingStrategy) cached;
+ } else {
+ if (cached != null) {
+ LOGGER.warn("Unexpected cached killing strategy type: {}",
cached.getClass().getName());
+ }
+ cachedStrategy = null;
+ }
+ checkAndKillIfNeeded(executionContext, scanCostContext, cachedStrategy,
+ executionContext.getQueryId(), executionContext.getTableName());
+ }
+
/**
* Evaluates whether the query should be killed based on the active strategy.
*
@@ -138,7 +211,6 @@ public class QueryKillingManager {
public void checkAndKillIfNeeded(QueryExecutionContext executionContext,
QueryScanCostContext scanCostContext, String queryId, String tableName,
@Nullable QueryConfig queryConfig) {
- // no strategy means killing is disabled or unconfigured
QueryKillingStrategy strategy = _strategy;
if (strategy == null) {
return;
@@ -149,37 +221,109 @@ public class QueryKillingManager {
return;
}
- // Prevent duplicate kills
if (executionContext.getTerminateException() != null) {
return;
}
try {
- // Resolve per-query table overrides (returns same instance if no
overrides)
QueryKillingStrategy queryStrategy = strategy.forQuery(queryConfig,
config);
-
String configSource = (queryStrategy != strategy) ? "table:" + tableName
: "cluster";
+ checkAndKillWithStrategy(executionContext, scanCostContext,
queryStrategy, configSource, queryId, tableName,
+ config);
+ } catch (Exception e) {
+ LOGGER.error("Error in scan-based killing evaluation for query {}",
queryId, e);
+ emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_ERROR, tableName);
+ }
+ }
- // Delegate to strategy
- if (!queryStrategy.shouldTerminate(scanCostContext)) {
- return;
- }
+ /**
+ * Resolves a per-query strategy (applying table-level overrides from {@code
queryConfig}).
+ * Returns null if killing is disabled. Caches the resolved strategy on the
execution context.
+ */
+ @Nullable
+ public QueryKillingStrategy resolveQueryStrategy(@Nullable QueryConfig
queryConfig) {
+ QueryKillingStrategy strategy = _strategy;
+ if (strategy == null) {
+ return null;
+ }
+ QueryMonitorConfig config = _configRef.get();
+ if (config == null || !config.isScanBasedKillingEnabled()) {
+ return null;
+ }
+ return strategy.forQuery(queryConfig, config);
+ }
- QueryKillReport report = queryStrategy.buildKillReport(
- scanCostContext, queryId, tableName, configSource);
+ private void checkAndKillIfNeeded(QueryExecutionContext executionContext,
QueryScanCostContext scanCostContext,
+ @Nullable QueryKillingStrategy cachedStrategy, @Nullable String queryId,
@Nullable String tableName) {
+ QueryKillingStrategy currentStrategy = _strategy;
+ QueryKillingStrategy strategy = cachedStrategy != null ? cachedStrategy :
currentStrategy;
+ if (strategy == null) {
+ return;
+ }
+ QueryMonitorConfig config = _configRef.get();
+ if (config == null || !config.isScanBasedKillingEnabled()) {
+ return;
+ }
+ if (executionContext.getTerminateException() != null) {
+ return;
+ }
+ String resolvedQueryId = queryId != null ? queryId : "unknown";
+ String resolvedTableName = tableName != null ? tableName : "unknown";
+ String configSource = (cachedStrategy != null && cachedStrategy !=
currentStrategy) ? "table:" + resolvedTableName
+ : "cluster";
+ try {
+ checkAndKillWithStrategy(executionContext, scanCostContext, strategy,
configSource, resolvedQueryId,
+ resolvedTableName, config);
+ } catch (Exception e) {
+ LOGGER.error("Error in scan-based killing evaluation for query {}",
resolvedQueryId, e);
+ emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_ERROR, resolvedTableName);
+ }
+ }
- if (config.isScanBasedKillingLogOnly()) {
- LOGGER.info("Query killed in LogOnly mode: {}",
report.toInternalLogMessage());
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN,
1);
+ private void checkAndKillWithStrategy(QueryExecutionContext
executionContext, QueryScanCostContext scanCostContext,
+ QueryKillingStrategy queryStrategy, String configSource, String queryId,
String tableName,
+ QueryMonitorConfig config) {
+ if (!queryStrategy.shouldTerminate(scanCostContext)) {
+ return;
+ }
+ // Resolve effective mode: per-table override takes precedence over
cluster config
+ CommonConstants.Accounting.ScanKillingMode effectiveMode =
executionContext.getEffectiveScanKillingMode();
+ if (effectiveMode == CommonConstants.Accounting.ScanKillingMode.DISABLED) {
+ return;
+ }
+ boolean logOnly = effectiveMode ==
CommonConstants.Accounting.ScanKillingMode.LOG_ONLY
+ || (effectiveMode == null && config.isScanBasedKillingLogOnly());
+ if (logOnly) {
+ // only the first observer for this query logs the dry-run line and
+ // emits the metric; subsequent observers no-op
+ if (!executionContext.markScanKillingDryRunEmitted()) {
return;
}
-
+ long requestId = executionContext.getRequestId();
+ QueryKillReport report = queryStrategy.buildKillReport(scanCostContext,
requestId, queryId, tableName,
+ configSource);
+ LOGGER.info("Query killed in LogOnly mode: {}",
report.toInternalLogMessage());
+ emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN,
report.getTableName());
+ return;
+ }
+ long requestId = executionContext.getRequestId();
+ QueryKillReport report = queryStrategy.buildKillReport(scanCostContext,
requestId, queryId, tableName,
+ configSource);
+ if (executionContext.terminate(queryStrategy.getErrorCode(),
report.toCustomerMessage())) {
LOGGER.warn("Query Killed in enforce mode: {}",
report.toInternalLogMessage());
- executionContext.terminate(queryStrategy.getErrorCode(),
report.toCustomerMessage());
- _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1);
- } catch (Exception e) {
- LOGGER.error("Error in scan-based killing evaluation for query {}",
queryId, e);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR, 1);
+ emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN, report.getTableName());
+ }
+ }
+
+ /**
+ * Emits a kill metric per-table when the table name is known, falling back
to global emission
+ * when it is not.
+ */
+ private void emitKillMetric(ServerMeter meter, @Nullable String tableName) {
+ if (tableName != null && !tableName.isEmpty() &&
!"unknown".equals(tableName)) {
+ _serverMetrics.addMeteredTableValue(tableName, meter, 1);
+ } else {
+ _serverMetrics.addMeteredGlobalValue(meter, 1);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
index 4bf4390d59e..c5e1248689f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
@@ -47,7 +47,7 @@ public interface QueryKillingStrategy {
* Only called when {@link #shouldTerminate} returns true.
*/
QueryKillReport buildKillReport(QueryScanCostContext context,
- String queryId, String tableName, String configSource);
+ long requestId, String queryId, String tableName, String configSource);
/** Error code for the termination response. */
default QueryErrorCode getErrorCode() {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
index 92b13dcdc60..12adba92f13 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
@@ -49,10 +49,17 @@ public class ScanEntriesThresholdStrategy implements
QueryKillingStrategy {
private final long _maxEntriesScannedInFilter;
private final long _maxDocsScanned;
+ private final long _maxEntriesScannedPostFilter;
public ScanEntriesThresholdStrategy(long maxEntriesScannedInFilter, long
maxDocsScanned) {
+ this(maxEntriesScannedInFilter, maxDocsScanned, Long.MAX_VALUE);
+ }
+
+ public ScanEntriesThresholdStrategy(long maxEntriesScannedInFilter, long
maxDocsScanned,
+ long maxEntriesScannedPostFilter) {
_maxEntriesScannedInFilter = maxEntriesScannedInFilter;
_maxDocsScanned = maxDocsScanned;
+ _maxEntriesScannedPostFilter = maxEntriesScannedPostFilter;
}
@Override
@@ -60,12 +67,14 @@ public class ScanEntriesThresholdStrategy implements
QueryKillingStrategy {
return (_maxEntriesScannedInFilter < Long.MAX_VALUE
&& ctx.getNumEntriesScannedInFilter() > _maxEntriesScannedInFilter)
|| (_maxDocsScanned < Long.MAX_VALUE
- && ctx.getNumDocsScanned() > _maxDocsScanned);
+ && ctx.getNumDocsScanned() > _maxDocsScanned)
+ || (_maxEntriesScannedPostFilter < Long.MAX_VALUE
+ && ctx.getNumEntriesScannedPostFilter() >
_maxEntriesScannedPostFilter);
}
@Override
public QueryKillReport buildKillReport(QueryScanCostContext ctx,
- String queryId, String tableName, String configSource) {
+ long requestId, String queryId, String tableName, String configSource) {
String triggeringMetric;
long actualValue;
long thresholdValue;
@@ -74,12 +83,17 @@ public class ScanEntriesThresholdStrategy implements
QueryKillingStrategy {
triggeringMetric = "numEntriesScannedInFilter";
actualValue = ctx.getNumEntriesScannedInFilter();
thresholdValue = _maxEntriesScannedInFilter;
- } else {
+ } else if (_maxDocsScanned < Long.MAX_VALUE
+ && ctx.getNumDocsScanned() > _maxDocsScanned) {
triggeringMetric = "numDocsScanned";
actualValue = ctx.getNumDocsScanned();
thresholdValue = _maxDocsScanned;
+ } else {
+ triggeringMetric = "numEntriesScannedPostFilter";
+ actualValue = ctx.getNumEntriesScannedPostFilter();
+ thresholdValue = _maxEntriesScannedPostFilter;
}
- return new QueryKillReport(queryId, tableName, STRATEGY_NAME,
+ return new QueryKillReport(requestId, queryId, tableName, STRATEGY_NAME,
triggeringMetric, actualValue, thresholdValue, configSource, ctx);
}
@@ -101,12 +115,14 @@ public class ScanEntriesThresholdStrategy implements
QueryKillingStrategy {
}
Long tableEntries = queryConfig.getMaxEntriesScannedInFilter();
Long tableDocs = queryConfig.getMaxDocsScanned();
- if (tableEntries == null && tableDocs == null) {
+ Long tablePostFilter = queryConfig.getMaxEntriesScannedPostFilter();
+ if (tableEntries == null && tableDocs == null && tablePostFilter == null) {
return this;
}
return new ScanEntriesThresholdStrategy(
tableEntries != null ? tableEntries : _maxEntriesScannedInFilter,
- tableDocs != null ? tableDocs : _maxDocsScanned);
+ tableDocs != null ? tableDocs : _maxDocsScanned,
+ tablePostFilter != null ? tablePostFilter :
_maxEntriesScannedPostFilter);
}
public long getMaxEntriesScannedInFilter() {
@@ -117,6 +133,10 @@ public class ScanEntriesThresholdStrategy implements
QueryKillingStrategy {
return _maxDocsScanned;
}
+ public long getMaxEntriesScannedPostFilter() {
+ return _maxEntriesScannedPostFilter;
+ }
+
/**
* Factory that creates a {@link ScanEntriesThresholdStrategy} from
* {@link QueryMonitorConfig}. This is the default factory used when no
custom
@@ -134,19 +154,23 @@ public class ScanEntriesThresholdStrategy implements
QueryKillingStrategy {
public QueryKillingStrategy create(QueryMonitorConfig config) {
long maxEntries = config.getScanBasedKillingMaxEntriesScannedInFilter();
long maxDocs = config.getScanBasedKillingMaxDocsScanned();
+ long maxPostFilter =
config.getScanBasedKillingMaxEntriesScannedPostFilter();
- if (maxEntries == Long.MAX_VALUE && maxDocs == Long.MAX_VALUE) {
+ if (maxEntries == Long.MAX_VALUE && maxDocs == Long.MAX_VALUE &&
maxPostFilter == Long.MAX_VALUE) {
LOGGER.warn("Scan-based killing is enabled but no thresholds are
configured. "
+ "Set at least one of:
accounting.scan.based.killing.max.entries.scanned.in.filter, "
- + "accounting.scan.based.killing.max.docs.scanned. "
+ + "accounting.scan.based.killing.max.docs.scanned, "
+ + "accounting.scan.based.killing.max.entries.scanned.post.filter. "
+ "Scan-based killing will be effectively disabled until
thresholds are set.");
return null;
}
- LOGGER.info("Initialized ScanEntriesThresholdStrategy with
maxEntriesScannedInFilter={}, maxDocsScanned={}",
+ LOGGER.info("Initialized ScanEntriesThresholdStrategy with
maxEntriesScannedInFilter={}, "
+ + "maxDocsScanned={}, maxEntriesScannedPostFilter={}",
maxEntries == Long.MAX_VALUE ? "disabled" : maxEntries,
- maxDocs == Long.MAX_VALUE ? "disabled" : maxDocs);
- return new ScanEntriesThresholdStrategy(maxEntries, maxDocs);
+ maxDocs == Long.MAX_VALUE ? "disabled" : maxDocs,
+ maxPostFilter == Long.MAX_VALUE ? "disabled" : maxPostFilter);
+ return new ScanEntriesThresholdStrategy(maxEntries, maxDocs,
maxPostFilter);
}
@Override
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java
new file mode 100644
index 00000000000..cf180bf85f2
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.operator.BaseProjectOperator;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.core.plan.ProjectPlanNode;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Verifies that query operators correctly push scan cost metrics into {@link
QueryScanCostContext}
+ */
+public class OperatorScanCostTrackingTest {
+ private static final String RAW_TABLE_NAME = "scanCostTestTable";
+ private static final String SEGMENT_NAME = "scanCostTestSegment";
+ private static final int NUM_ROWS = 100;
+ private static final String COL_INT = "intCol";
+ private static final String COL_STRING = "stringCol";
+ private static final String COL_DOUBLE = "doubleCol";
+
+ private static final TableConfig TABLE_CONFIG =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ private static final Schema SCHEMA = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COL_INT, FieldSpec.DataType.INT)
+ .addSingleValueDimension(COL_STRING, FieldSpec.DataType.STRING)
+ .addMetric(COL_DOUBLE, FieldSpec.DataType.DOUBLE)
+ .build();
+
+ private File _tempDir;
+ private IndexSegment _segment;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ _tempDir =
Files.createTempDirectory("OperatorScanCostTrackingTest").toFile();
+
+ List<GenericRow> records = new ArrayList<>(NUM_ROWS);
+ for (int i = 0; i < NUM_ROWS; i++) {
+ GenericRow row = new GenericRow();
+ row.putValue(COL_INT, i);
+ row.putValue(COL_STRING, "val_" + (i % 10));
+ row.putValue(COL_DOUBLE, i * 1.5);
+ records.add(row);
+ }
+
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG,
SCHEMA);
+ config.setTableName(RAW_TABLE_NAME);
+ config.setSegmentName(SEGMENT_NAME);
+ config.setOutDir(_tempDir.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(config, new GenericRowRecordReader(records));
+ driver.build();
+
+ _segment = ImmutableSegmentLoader.load(new File(_tempDir, SEGMENT_NAME),
ReadMode.mmap);
+ }
+
+ @Test
+ public void testSelectionOnlyOperatorTracksScanCost() {
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT intCol, stringCol, doubleCol FROM scanCostTestTable LIMIT 50");
+
+ try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+ QueryScanCostContext scanCost = new QueryScanCostContext();
+
QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost);
+
+ List<ExpressionContext> expressions =
+ SelectionOperatorUtils.extractExpressions(queryContext, _segment);
+ BaseProjectOperator<?> projectOperator =
+ new ProjectPlanNode(new SegmentContext(_segment), queryContext,
expressions,
+ DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+ int numColumnsProjected = projectOperator.getNumColumnsProjected();
+
+ SelectionOnlyOperator operator =
+ new SelectionOnlyOperator(_segment, queryContext, expressions,
projectOperator);
+ operator.nextBlock();
+
+ long docsScanned = scanCost.getNumDocsScanned();
+ assertTrue(docsScanned > 0, "Should have scanned some docs");
+ assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned *
numColumnsProjected);
+ }
+ }
+
+ @Test
+ public void testSelectionOrderByOperatorTracksScanCost() {
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT intCol, stringCol FROM scanCostTestTable ORDER BY intCol LIMIT
10");
+
+ try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+ QueryScanCostContext scanCost = new QueryScanCostContext();
+
QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost);
+
+ List<ExpressionContext> expressions =
+ SelectionOperatorUtils.extractExpressions(queryContext, _segment);
+ BaseProjectOperator<?> projectOperator =
+ new ProjectPlanNode(new SegmentContext(_segment), queryContext,
expressions,
+ DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+ int numColumnsProjected = projectOperator.getNumColumnsProjected();
+
+ SelectionOrderByOperator operator =
+ new SelectionOrderByOperator(_segment, queryContext, expressions,
projectOperator);
+ operator.nextBlock();
+
+ long docsScanned = scanCost.getNumDocsScanned();
+ assertTrue(docsScanned > 0, "Should have scanned some docs");
+ assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned *
numColumnsProjected);
+ }
+ }
+
+ @Test
+ public void testDistinctOperatorTracksScanCost() {
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT DISTINCT stringCol FROM scanCostTestTable");
+
+ try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+ QueryScanCostContext scanCost = new QueryScanCostContext();
+
QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost);
+
+ List<ExpressionContext> expressions =
queryContext.getSelectExpressions();
+ BaseProjectOperator<?> projectOperator =
+ new ProjectPlanNode(new SegmentContext(_segment), queryContext,
expressions,
+ DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+ int numColumnsProjected = projectOperator.getNumColumnsProjected();
+
+ DistinctOperator operator = new DistinctOperator(_segment, queryContext,
projectOperator);
+ operator.nextBlock();
+
+ long docsScanned = scanCost.getNumDocsScanned();
+ assertTrue(docsScanned > 0, "Should have scanned some docs");
+ assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned *
numColumnsProjected);
+ }
+ }
+
+ @Test
+ public void testScanCostIsZeroWhenContextNotSet() {
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT intCol, stringCol FROM scanCostTestTable LIMIT 10");
+
+ try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+ // Intentionally NOT setting QueryScanCostContext — getScanCostContext()
returns null
+ List<ExpressionContext> expressions =
+ SelectionOperatorUtils.extractExpressions(queryContext, _segment);
+ BaseProjectOperator<?> projectOperator =
+ new ProjectPlanNode(new SegmentContext(_segment), queryContext,
expressions,
+ DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+
+ SelectionOnlyOperator operator =
+ new SelectionOnlyOperator(_segment, queryContext, expressions,
projectOperator);
+ operator.nextBlock();
+ // No exception — scan cost tracking is gracefully skipped when context
is null
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ _segment.destroy();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
index ea48ff49859..6a31a4b8d76 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
@@ -117,7 +117,7 @@ public class CompositeQueryKillingStrategyTest {
ctx.addDocsScanned(501);
assertTrue(composite.shouldTerminate(ctx));
- QueryKillReport report = composite.buildKillReport(ctx, "q1", "t1",
"cluster");
+ QueryKillReport report = composite.buildKillReport(ctx, 1L, "q1", "t1",
"cluster");
assertEquals(report.getTriggeringMetric(), "numDocsScanned");
assertEquals(report.getActualValue(), 501L);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
index 054197aeaed..6819a6991be 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
@@ -38,6 +38,7 @@ public class QueryKillReportTest {
context.addEntriesScannedPostFilter(200L);
QueryKillReport report = new QueryKillReport(
+ 101L,
"queryId-1",
"myTable_OFFLINE",
"EntriesScannedInFilterStrategy",
@@ -66,6 +67,7 @@ public class QueryKillReportTest {
context.addEntriesScannedPostFilter(50L);
QueryKillReport report = new QueryKillReport(
+ 202L,
"queryId-abc",
"salesTable_REALTIME",
"EntriesScannedInFilterStrategy",
@@ -83,6 +85,7 @@ public class QueryKillReportTest {
assertTrue(msg.contains("1,234,567"), "Should contain actual value with
commas");
assertTrue(msg.contains("1,000,000"), "Should contain threshold with
commas");
assertTrue(msg.contains("CLUSTER_CONFIG"), "Should contain config source");
+ assertTrue(msg.contains("requestId=202"), "Should contain requestId");
assertTrue(msg.contains("missing index") || msg.contains("index"),
"Should include advice about missing indexes");
}
@@ -95,6 +98,7 @@ public class QueryKillReportTest {
context.addEntriesScannedPostFilter(150L);
QueryKillReport report = new QueryKillReport(
+ 303L,
"queryId-xyz",
"ordersTable_OFFLINE",
"DocsScannedStrategy",
@@ -108,6 +112,7 @@ public class QueryKillReportTest {
String msg = report.toInternalLogMessage();
assertTrue(msg.startsWith("QUERY_KILLED"), "Should start with QUERY_KILLED
prefix");
+ assertTrue(msg.contains("requestId=303"), "Should have requestId
key=value");
assertTrue(msg.contains("queryId=queryId-xyz"), "Should have queryId
key=value");
assertTrue(msg.contains("table=ordersTable_OFFLINE"), "Should have table
key=value");
assertTrue(msg.contains("metric=numDocsScanned"), "Should have metric
key=value");
@@ -124,6 +129,7 @@ public class QueryKillReportTest {
context.addEntriesScannedPostFilter(25L);
QueryKillReport report = new QueryKillReport(
+ 404L,
"queryId-getters",
"testTable_OFFLINE",
"TestStrategy",
@@ -134,6 +140,7 @@ public class QueryKillReportTest {
context
);
+ assertEquals(report.getRequestId(), 404L);
assertEquals(report.getQueryId(), "queryId-getters");
assertEquals(report.getTableName(), "testTable_OFFLINE");
assertEquals(report.getStrategyName(), "TestStrategy");
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
index 5bde697b5a3..394da6926e3 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
@@ -19,8 +19,15 @@
package org.apache.pinot.core.query.killing;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.accounting.QueryMonitorConfig;
import
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy;
@@ -30,10 +37,17 @@ import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.spi.query.QueryScanCostContext;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -295,7 +309,592 @@ public class QueryKillingManagerTest {
assertNotNull(manager.getActiveStrategy(), "After config update, strategy
should be built");
}
- // --- Test fixtures for pluggable strategy ---
+ // --- onChange (dynamic config reload) ---
+
+ @Test
+ public void testOnChangeRebuildsStrategy() {
+ // Start with killing disabled
+ QueryMonitorConfig disabledConfig = buildConfig("disabled", 100L,
Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(disabledConfig);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+ assertNull(manager.getActiveStrategy(), "Strategy should be null when
disabled");
+
+ // Simulate cluster config change enabling killing with enforce mode +
threshold.
+ // Keys arrive from ZK with full "pinot.query.scheduler." prefix —
onChange() strips it.
+ String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + ".";
+
+ Set<String> changedKeys = new HashSet<>();
+ changedKeys.add(prefix +
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE);
+ changedKeys.add(prefix +
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+ clusterConfigs.put(prefix +
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "enforce");
+ clusterConfigs.put(
+ prefix +
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
"500");
+
+ manager.onChange(changedKeys, clusterConfigs);
+ assertNotNull(manager.getActiveStrategy(),
+ "Strategy should be rebuilt after onChange enables killing");
+ }
+
+ @Test
+ public void testOnChangeDisablesStrategy() {
+ // Start with killing enabled
+ QueryMonitorConfig enabledConfig = buildConfig("enforce", 100L,
Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(enabledConfig);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+ assertNotNull(manager.getActiveStrategy(), "Strategy should be active when
enabled");
+
+ // Simulate cluster config change to disable killing (full ZK-prefixed
keys)
+ String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + ".";
+
+ Set<String> changedKeys = new HashSet<>();
+ changedKeys.add(prefix +
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+ clusterConfigs.put(prefix +
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "disabled");
+
+ manager.onChange(changedKeys, clusterConfigs);
+ assertNull(manager.getActiveStrategy(),
+ "Strategy should be null after onChange disables killing");
+ }
+
+ @Test
+ public void testOnChangeIgnoresIrrelevantKeys() {
+ // Start with killing enabled
+ QueryMonitorConfig enabledConfig = buildConfig("enforce", 100L,
Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(enabledConfig);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+ assertNotNull(manager.getActiveStrategy(), "Strategy should be active when
enabled");
+
+ // Simulate a ZK change that only touches non-scheduler keys — should be
ignored
+ Set<String> changedKeys = new HashSet<>();
+ changedKeys.add("some.unrelated.config");
+ changedKeys.add("helix.rebalance.something");
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+ clusterConfigs.put("some.unrelated.config", "value");
+
+ manager.onChange(changedKeys, clusterConfigs);
+ assertNotNull(manager.getActiveStrategy(),
+ "Strategy should remain unchanged when no scheduler keys changed");
+ }
+
+ // --- Convenience overload (2-arg checkAndKillIfNeeded) ---
+
+ @Test
+ public void testConvenienceOverloadKillsViaCachedStrategy() {
+ // Create a manager with killing enabled, threshold = 50 for entries
scanned
+ QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ // Resolve the per-query strategy and cache it on the execution context
+ QueryKillingStrategy resolvedStrategy = manager.resolveQueryStrategy(null);
+ assertNotNull(resolvedStrategy);
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ execCtx.setTableName("testTable_OFFLINE");
+ execCtx.setQueryId("conv-q1");
+ execCtx.setCachedKillingStrategy(resolvedStrategy);
+
+ // Create scan cost exceeding the threshold
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(100L); // Above threshold of 50
+
+ // Use the 2-arg convenience overload
+ manager.checkAndKillIfNeeded(execCtx, scanCtx);
+ assertNotNull(execCtx.getTerminateException(),
+ "Query should be terminated via cached strategy when threshold is
exceeded");
+ }
+
+ @Test
+ public void testConvenienceOverloadNullScanCostContextNoOp() {
+ // When the manager's strategy is null (disabled), calling with null
scanCostContext is a no-op.
+ // In production, BaseOperator guards against null scanCostContext before
calling the manager.
+ // Here we verify the manager's early-return when disabled.
+ QueryMonitorConfig config = buildConfig("disabled", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ // No queryScanCostContext set on execCtx — mirrors the real scenario where
+ // scan-based killing is not initialized for this query.
+
+ // The 2-arg overload with null scanCostContext is safe when strategy is
null (disabled)
+ manager.checkAndKillIfNeeded(execCtx, null);
+ assertNull(execCtx.getTerminateException(),
+ "No exception should be set when killing is disabled and
scanCostContext is null");
+ }
+
+ // --- resolveQueryStrategy ---
+
+ @Test
+ public void testResolveQueryStrategyReturnsNullWhenDisabled() {
+ QueryMonitorConfig config = buildConfig("disabled", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryKillingStrategy resolved = manager.resolveQueryStrategy(null);
+ assertNull(resolved, "resolveQueryStrategy should return null when killing
is disabled");
+ }
+
+ @Test
+ public void testResolveQueryStrategyReturnsStrategyWhenEnabled() {
+ QueryMonitorConfig config = buildConfig("enforce", 200L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryKillingStrategy resolved = manager.resolveQueryStrategy(null);
+ assertNotNull(resolved, "resolveQueryStrategy should return a strategy
when killing is enabled");
+ assertTrue(resolved instanceof ScanEntriesThresholdStrategy);
+ }
+
+ @Test
+ public void testTableModeEnforceOverridesClusterLogOnly() {
+ // Cluster is logOnly — normally no kills
+ QueryMonitorConfig config = buildConfig("logOnly", 50L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ // Table override: enforce
+ execCtx.setEffectiveScanKillingMode(ScanKillingMode.ENFORCE);
+ execCtx.setTableName("testTable_OFFLINE");
+ execCtx.setQueryId("tbl-mode-q1");
+
+ QueryKillingStrategy strategy = manager.resolveQueryStrategy(null);
+ assertNotNull(strategy);
+ execCtx.setCachedKillingStrategy(strategy);
+
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx);
+ assertNotNull(execCtx.getTerminateException(),
+ "Table enforce override should cause real termination even when
cluster is logOnly");
+ assertEquals(execCtx.getTerminateException().getErrorCode(),
QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED);
+ }
+
+ @Test
+ public void testTableModeLogOnlyOverridesClusterEnforce() {
+ // Cluster is enforce — normally kills
+ QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ // Table override: logOnly — should downgrade to dry-run
+ execCtx.setEffectiveScanKillingMode(ScanKillingMode.LOG_ONLY);
+ execCtx.setTableName("testTable_OFFLINE");
+ execCtx.setQueryId("tbl-mode-q2");
+
+ QueryKillingStrategy strategy = manager.resolveQueryStrategy(null);
+ assertNotNull(strategy);
+ execCtx.setCachedKillingStrategy(strategy);
+
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx);
+ assertNull(execCtx.getTerminateException(),
+ "Table logOnly override should prevent real kill even when cluster is
enforce");
+ }
+
+ @Test
+ public void testTableModeDisabledOverridesClusterEnforce() {
+ // Cluster is enforce — normally kills
+ QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ // Table override: disabled — fully exempt
+ execCtx.setEffectiveScanKillingMode(ScanKillingMode.DISABLED);
+ execCtx.setTableName("testTable_OFFLINE");
+ execCtx.setQueryId("tbl-mode-q3");
+
+ QueryKillingStrategy strategy = manager.resolveQueryStrategy(null);
+ assertNotNull(strategy);
+ execCtx.setCachedKillingStrategy(strategy);
+
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx);
+ assertNull(execCtx.getTerminateException(),
+ "Table disabled override should fully exempt the table from killing");
+ }
+
+ @Test
+ public void testNoTableModeOverrideFallsBackToCluster() {
+ // Cluster is enforce, no table mode override
+ QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ // No setEffectiveScanKillingMode call — should use cluster mode (enforce)
+ execCtx.setTableName("testTable_OFFLINE");
+ execCtx.setQueryId("tbl-mode-q4");
+
+ QueryKillingStrategy strategy = manager.resolveQueryStrategy(null);
+ assertNotNull(strategy);
+ execCtx.setCachedKillingStrategy(strategy);
+
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx);
+ assertNotNull(execCtx.getTerminateException(),
+ "Without table mode override, cluster enforce mode should kill");
+ }
+
+ // --- Per-table metric emission ---
+
+ @Test
+ public void testEnforceKillEmitsPerTableMetric() {
+ QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L);
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx, "q-metric-1",
"myTable_REALTIME", null);
+
+ verify(_serverMetrics).addMeteredTableValue("myTable_REALTIME",
ServerMeter.QUERIES_KILLED_SCAN, 1L);
+ verify(_serverMetrics,
never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1L);
+ }
+
+ @Test
+ public void testLogOnlyKillEmitsPerTableDryRunMetric() {
+ QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L);
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx, "q-metric-2",
"myTable_REALTIME", null);
+
+ verify(_serverMetrics).addMeteredTableValue("myTable_REALTIME",
ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+ verify(_serverMetrics,
never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+ // logOnly should not actually terminate
+ assertNull(execCtx.getTerminateException());
+ }
+
+ @Test
+ public void testNullTableNameFallsBackToGlobalErrorMetric() {
+ // When the strategy throws inside checkAndKillIfNeeded, the catch block
emits the error
+ // metric. With a null table name, the helper falls back to global
emission so we do not
+ // silently drop the error signal.
+ QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ // Force the catch path with a cached strategy that throws inside
shouldTerminate.
+ execCtx.setCachedKillingStrategy(new QueryKillingStrategy() {
+ @Override
+ public boolean shouldTerminate(QueryScanCostContext context) {
+ throw new RuntimeException("boom");
+ }
+
+ @Override
+ public QueryKillReport buildKillReport(QueryScanCostContext context,
long requestId,
+ String queryId, String tableName, String configSource) {
+ // unused — shouldTerminate throws first
+ throw new UnsupportedOperationException();
+ }
+ });
+ execCtx.setTableName(null);
+ execCtx.setQueryId("q-metric-null");
+
+ manager.checkAndKillIfNeeded(execCtx, new QueryScanCostContext());
+
+ // Null table name → global fallback emission, never per-table
+
verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR,
1L);
+ verify(_serverMetrics, never()).addMeteredTableValue(anyString(),
+ eq(ServerMeter.QUERIES_KILLED_SCAN_ERROR), anyLong());
+ }
+
+ @Test
+ public void testNullTableNameInReportFallsBackToGlobalEnforceMetric() {
+ QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ execCtx.setCachedKillingStrategy(new QueryKillingStrategy() {
+ @Override
+ public boolean shouldTerminate(QueryScanCostContext context) {
+ return true;
+ }
+
+ @Override
+ public QueryKillReport buildKillReport(QueryScanCostContext context,
long requestId,
+ String queryId, String tableName, String configSource) {
+ // Intentionally drop the table name to exercise the null-fallback path
+ return new QueryKillReport(requestId, queryId, null, "TestStrategy",
"test", 0, 0,
+ configSource, context);
+ }
+
+ @Override
+ public org.apache.pinot.spi.exception.QueryErrorCode getErrorCode() {
+ return
org.apache.pinot.spi.exception.QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED;
+ }
+ });
+ execCtx.setTableName(null);
+ execCtx.setQueryId("q-null-report");
+
+ manager.checkAndKillIfNeeded(execCtx, new QueryScanCostContext());
+
+
verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN,
1L);
+ verify(_serverMetrics, never()).addMeteredTableValue(anyString(),
+ eq(ServerMeter.QUERIES_KILLED_SCAN), anyLong());
+ }
+
+ @Test
+ public void testUnknownTableNameSentinelFallsBackToGlobalMetric() {
+ QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L);
+
+ // Use the convenience overload with null tableName → routes through
"unknown" sentinel
+ execCtx.setTableName(null);
+ execCtx.setQueryId("q-unknown");
+ execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx);
+
+
verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN,
1L);
+ verify(_serverMetrics, never()).addMeteredTableValue(eq("unknown"),
+ eq(ServerMeter.QUERIES_KILLED_SCAN), anyLong());
+ }
+
+ // --- Dry-run emit-once guard ---
+
+ @Test
+ public void testLogOnlyEmitsExactlyOncePerQueryAcrossManyBlockChecks() {
+ // logOnly mode never terminates the query, so without a guard, every
subsequent block-level
+ // termination check after the threshold is crossed re-builds a report,
re-logs, and
+ // re-emits the metric. This test simulates 10,000 block checks and
verifies exactly one
+ // emission — the CAS guard on QueryExecutionContext suppresses the
duplicates.
+ QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ execCtx.setTableName("dryRunTable_REALTIME");
+ execCtx.setQueryId("q-dry-run-once");
+ execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L); // permanently over threshold
+
+ for (int i = 0; i < 10_000; i++) {
+ manager.checkAndKillIfNeeded(execCtx, scanCtx);
+ }
+
+ // Exactly one per-table dry-run metric emission across 10k block checks
+ verify(_serverMetrics).addMeteredTableValue("dryRunTable_REALTIME",
+ ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+ verify(_serverMetrics,
never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+ // logOnly never terminates
+ assertNull(execCtx.getTerminateException());
+ }
+
+ @Test
+ public void testEnforceEmitsOncePerQueryAcrossManyBlockChecks() {
+ // Sanity check: enforce mode is already guarded by
getTerminateException() != null, but the
+ // dry-run CAS must not regress that. Verify enforce still emits exactly
once.
+ QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ execCtx.setTableName("enforceTable_REALTIME");
+ execCtx.setQueryId("q-enforce-once");
+ execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L);
+
+ for (int i = 0; i < 10_000; i++) {
+ manager.checkAndKillIfNeeded(execCtx, scanCtx);
+ }
+
+ verify(_serverMetrics).addMeteredTableValue("enforceTable_REALTIME",
+ ServerMeter.QUERIES_KILLED_SCAN, 1L);
+ assertNotNull(execCtx.getTerminateException());
+ }
+
+ @Test
+ public void testDryRunGuardIsPerQueryNotGlobal() {
+ // Two independent queries in logOnly mode should each emit once — the CAS
lives on
+ // QueryExecutionContext, not on the manager or a static.
+ QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L);
+
+ QueryExecutionContext execCtxA = QueryExecutionContext.forSseTest();
+ execCtxA.setTableName("tableA_REALTIME");
+ execCtxA.setQueryId("qA");
+ execCtxA.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+ QueryExecutionContext execCtxB = QueryExecutionContext.forSseTest();
+ execCtxB.setTableName("tableB_REALTIME");
+ execCtxB.setQueryId("qB");
+ execCtxB.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+ for (int i = 0; i < 100; i++) {
+ manager.checkAndKillIfNeeded(execCtxA, scanCtx);
+ manager.checkAndKillIfNeeded(execCtxB, scanCtx);
+ }
+
+ verify(_serverMetrics).addMeteredTableValue("tableA_REALTIME",
+ ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+ verify(_serverMetrics).addMeteredTableValue("tableB_REALTIME",
+ ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+ }
+
+ @Test
+ public void testLogOnlyConcurrentCallersEmitExactlyOnce()
+ throws InterruptedException {
+ // Production reality: BaseOperator.checkTermination() is invoked from
multiple worker
+ // threads in parallel (one per segment / per block). Without an atomic
CAS, two threads
+ // racing past the threshold at the same instant could both pass the "not
yet emitted"
+ // check and double-emit. AtomicBoolean.compareAndSet guarantees exactly
one winner; this
+ // test fails if the guard is downgraded to a plain or volatile boolean.
+ QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ execCtx.setTableName("concurrentLogOnly_REALTIME");
+ execCtx.setQueryId("q-concurrent-dry-run");
+ execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L);
+
+ int threadCount = 16;
+ int iterationsPerThread = 5_000;
+ ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch startBarrier = new CountDownLatch(1);
+ CountDownLatch doneBarrier = new CountDownLatch(threadCount);
+
+ try {
+ for (int t = 0; t < threadCount; t++) {
+ pool.submit(() -> {
+ try {
+ startBarrier.await();
+ for (int i = 0; i < iterationsPerThread; i++) {
+ manager.checkAndKillIfNeeded(execCtx, scanCtx);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ doneBarrier.countDown();
+ }
+ });
+ }
+ startBarrier.countDown();
+ assertTrue(doneBarrier.await(30, TimeUnit.SECONDS), "Workers did not
complete in time");
+ } finally {
+ pool.shutdownNow();
+ }
+
+ // Even with 16 threads × 5000 iterations = 80,000 concurrent attempts,
the CAS guarantees
+ // exactly one emission.
+ verify(_serverMetrics,
times(1)).addMeteredTableValue("concurrentLogOnly_REALTIME",
+ ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+ verify(_serverMetrics,
never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+ assertNull(execCtx.getTerminateException(), "logOnly must not terminate");
+ }
+
+ @Test
+ public void testEnforceConcurrentCallersEmitExactlyOnce()
+ throws InterruptedException {
+ // Enforce-mode duplicate-emit prevention: terminate() is internally a
synchronized CAS that
+ // returns true only on the first transition. The manager gates the emit +
warn log on the
+ // return value, so two threads that both observe (!isTerminated) and
cross the threshold
+ // before either commits will still result in exactly one emission.
+ QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ execCtx.setTableName("concurrentEnforce_REALTIME");
+ execCtx.setQueryId("q-concurrent-enforce");
+ execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L);
+
+ int threadCount = 16;
+ int iterationsPerThread = 5_000;
+ ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch startBarrier = new CountDownLatch(1);
+ CountDownLatch doneBarrier = new CountDownLatch(threadCount);
+
+ try {
+ for (int t = 0; t < threadCount; t++) {
+ pool.submit(() -> {
+ try {
+ startBarrier.await();
+ for (int i = 0; i < iterationsPerThread; i++) {
+ manager.checkAndKillIfNeeded(execCtx, scanCtx);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ doneBarrier.countDown();
+ }
+ });
+ }
+ startBarrier.countDown();
+ assertTrue(doneBarrier.await(30, TimeUnit.SECONDS), "Workers did not
complete in time");
+ } finally {
+ pool.shutdownNow();
+ }
+
+ verify(_serverMetrics,
times(1)).addMeteredTableValue("concurrentEnforce_REALTIME",
+ ServerMeter.QUERIES_KILLED_SCAN, 1L);
+ verify(_serverMetrics,
never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1L);
+ assertNotNull(execCtx.getTerminateException());
+ }
/**
* A test strategy that always kills — used to verify custom factory loading.
@@ -308,8 +907,8 @@ public class QueryKillingManagerTest {
@Override
public QueryKillReport buildKillReport(QueryScanCostContext context,
- String queryId, String tableName, String configSource) {
- return new QueryKillReport(queryId, tableName, "AlwaysKillStrategy",
+ long requestId, String queryId, String tableName, String configSource)
{
+ return new QueryKillReport(requestId, queryId, tableName,
"AlwaysKillStrategy",
"always", 0, 0, configSource, context);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
index 48785498162..e64c1985c90 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
@@ -81,7 +81,7 @@ public class ScanEntriesThresholdStrategyTest {
QueryScanCostContext ctx = new QueryScanCostContext();
ctx.addEntriesScannedInFilter(150_000_000);
- QueryKillReport report = strategy.buildKillReport(ctx, "q1", "myTable",
"cluster");
+ QueryKillReport report = strategy.buildKillReport(ctx, 1L, "q1",
"myTable", "cluster");
assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter");
assertEquals(report.getActualValue(), 150_000_000L);
assertEquals(report.getThresholdValue(), 100_000_000L);
@@ -93,7 +93,7 @@ public class ScanEntriesThresholdStrategyTest {
QueryScanCostContext ctx = new QueryScanCostContext();
ctx.addDocsScanned(15_000_000);
- QueryKillReport report = strategy.buildKillReport(ctx, "q2", "myTable",
"table:myTable");
+ QueryKillReport report = strategy.buildKillReport(ctx, 2L, "q2",
"myTable", "table:myTable");
assertEquals(report.getTriggeringMetric(), "numDocsScanned");
assertEquals(report.getActualValue(), 15_000_000L);
assertEquals(report.getThresholdValue(), 10_000_000L);
@@ -122,6 +122,64 @@ public class ScanEntriesThresholdStrategyTest {
assertTrue(strategy.shouldTerminate(ctx2));
}
+ @Test
+ public void testAbovePostFilterThresholdKills() {
+ ScanEntriesThresholdStrategy strategy =
+ new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE,
1000L);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedPostFilter(1001);
+ assertTrue(strategy.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testBelowPostFilterThresholdDoesNotKill() {
+ ScanEntriesThresholdStrategy strategy =
+ new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE,
1000L);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedPostFilter(999);
+ assertFalse(strategy.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testBuildKillReportForPostFilter() {
+ ScanEntriesThresholdStrategy strategy =
+ new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE,
5000L);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedPostFilter(7500);
+
+ QueryKillReport report = strategy.buildKillReport(ctx, 3L, "q3",
"myTable", "cluster");
+ assertEquals(report.getTriggeringMetric(), "numEntriesScannedPostFilter");
+ assertEquals(report.getActualValue(), 7500L);
+ assertEquals(report.getThresholdValue(), 5000L);
+ }
+
+ @Test
+ public void testPostFilterOverrideViaForQuery() {
+ ScanEntriesThresholdStrategy strategy =
+ new ScanEntriesThresholdStrategy(100L, 200L, 300L);
+ QueryConfig queryConfig = new QueryConfig(null, null, null, null, null,
null, null, null, 600L);
+
+ QueryKillingStrategy result = strategy.forQuery(queryConfig, null);
+ assertTrue(result != strategy);
+ ScanEntriesThresholdStrategy overridden = (ScanEntriesThresholdStrategy)
result;
+ assertEquals(overridden.getMaxEntriesScannedInFilter(), 100L);
+ assertEquals(overridden.getMaxDocsScanned(), 200L);
+ assertEquals(overridden.getMaxEntriesScannedPostFilter(), 600L);
+ }
+
+ @Test
+ public void testPostFilterPriorityInBuildKillReport() {
+ // When both entries-in-filter and post-filter exceed, entries-in-filter
takes priority
+ ScanEntriesThresholdStrategy strategy =
+ new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE, 500L);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(200);
+ ctx.addEntriesScannedPostFilter(1000);
+
+ QueryKillReport report = strategy.buildKillReport(ctx, 4L, "q4",
"myTable", "cluster");
+ assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter");
+ }
+
// --- forQuery() table override tests ---
@Test
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 71c4e6539f8..b22f220a4df 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -87,6 +87,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
import
org.apache.pinot.core.data.manager.realtime.ServerRateLimitConfigChangeListener;
import org.apache.pinot.core.instance.context.ServerContext;
+import org.apache.pinot.core.query.killing.QueryKillingManager;
import
org.apache.pinot.core.query.scheduler.QuerySchedulerThreadPoolConfigChangeListener;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.transport.ListenerConfig;
@@ -189,6 +190,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
protected PinotEnvironmentProvider _pinotEnvironmentProvider;
protected SegmentOperationsThrottlerSet _segmentOperationsThrottlerSet;
protected ThreadAccountant _threadAccountant;
+ protected QueryKillingManager _queryKillingManager;
protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler;
protected volatile boolean _isServerReadyToServeQueries = false;
protected ScheduledExecutorService _helixMessageCountScheduler;
@@ -772,6 +774,10 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_threadAccountant =
ThreadAccountantUtils.createAccountant(accountingConfig, _instanceId,
org.apache.pinot.spi.config.instance.InstanceType.SERVER);
+ // Initialize scan-based query killing
+ PinotConfiguration schedulerConfig =
_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX);
+ _queryKillingManager = QueryKillingManager.init(schedulerConfig,
ServerMetrics.get());
+
SendStatsPredicate sendStatsPredicate =
SendStatsPredicate.create(_serverConf, _helixManager);
KeepPipelineBreakerStatsPredicate keepPipelineBreakerStatsPredicate =
KeepPipelineBreakerStatsPredicate.create(_serverConf);
@@ -790,6 +796,11 @@ public abstract class BaseServerStarter implements
ServiceStartable {
new ServerRateLimitConfigChangeListener(_serverMetrics);
_clusterConfigChangeHandler.registerClusterConfigChangeListener(serverRateLimitConfigChangeListener);
+ // Register query killing manager for dynamic config updates
(threshold/mode changes via ZK)
+ if (_queryKillingManager != null) {
+
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_queryKillingManager);
+ }
+
initSegmentFetcher(_serverConf);
StateModelFactory<?> stateModelFactory =
createSegmentOnlineOfflineStateModelFactory(instanceDataManager,
_transitionThreadPoolManager);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
index 093b7c340fa..92fb399f584 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
/**
@@ -62,6 +63,10 @@ public class QueryConfig extends BaseJsonConfig {
private final Long _maxEntriesScannedPostFilter;
+ // Per-table scan-based killing mode override. Null means use the
cluster-level mode.
+ // Valid values: "disabled", "logOnly", "enforce" (case-insensitive).
+ private final String _scanKillingMode;
+
public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy,
@Nullable Boolean useApproximateFunction, @Nullable Map<String, String>
expressionOverrideMap,
@@ -70,6 +75,16 @@ public class QueryConfig extends BaseJsonConfig {
maxQueryResponseSizeBytes, maxServerResponseSizeBytes, null, null,
null);
}
+ public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy,
+ @Nullable Boolean useApproximateFunction, @Nullable Map<String, String>
expressionOverrideMap,
+ @Nullable Long maxQueryResponseSizeBytes, @Nullable Long
maxServerResponseSizeBytes,
+ @Nullable Long maxEntriesScannedInFilter, @Nullable Long maxDocsScanned,
+ @Nullable Long maxEntriesScannedPostFilter) {
+ this(timeoutMs, disableGroovy, useApproximateFunction,
expressionOverrideMap,
+ maxQueryResponseSizeBytes, maxServerResponseSizeBytes,
maxEntriesScannedInFilter, maxDocsScanned,
+ maxEntriesScannedPostFilter, null);
+ }
+
@JsonCreator
public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs,
@JsonProperty("disableGroovy") @Nullable Boolean disableGroovy,
@@ -79,7 +94,8 @@ public class QueryConfig extends BaseJsonConfig {
@JsonProperty("maxServerResponseSizeBytes") @Nullable Long
maxServerResponseSizeBytes,
@JsonProperty("maxEntriesScannedInFilter") @Nullable Long
maxEntriesScannedInFilter,
@JsonProperty("maxDocsScanned") @Nullable Long maxDocsScanned,
- @JsonProperty("maxEntriesScannedPostFilter") @Nullable Long
maxEntriesScannedPostFilter) {
+ @JsonProperty("maxEntriesScannedPostFilter") @Nullable Long
maxEntriesScannedPostFilter,
+ @JsonProperty("scanKillingMode") @Nullable String scanKillingMode) {
Preconditions.checkArgument(timeoutMs == null || timeoutMs > 0, "Invalid
'timeoutMs': %s", timeoutMs);
Preconditions.checkArgument(maxQueryResponseSizeBytes == null ||
maxQueryResponseSizeBytes > 0,
"Invalid 'maxQueryResponseSizeBytes': %s", maxQueryResponseSizeBytes);
@@ -91,6 +107,8 @@ public class QueryConfig extends BaseJsonConfig {
"Invalid 'maxDocsScanned': %s", maxDocsScanned);
Preconditions.checkArgument(maxEntriesScannedPostFilter == null ||
maxEntriesScannedPostFilter > 0,
"Invalid 'maxEntriesScannedPostFilter': %s",
maxEntriesScannedPostFilter);
+ Preconditions.checkArgument(scanKillingMode == null ||
ScanKillingMode.fromConfigValue(scanKillingMode) != null,
+ "Invalid 'scanKillingMode': %s. Valid values: disabled, logOnly,
enforce", scanKillingMode);
_timeoutMs = timeoutMs;
_disableGroovy = disableGroovy;
@@ -101,6 +119,7 @@ public class QueryConfig extends BaseJsonConfig {
_maxEntriesScannedInFilter = maxEntriesScannedInFilter;
_maxDocsScanned = maxDocsScanned;
_maxEntriesScannedPostFilter = maxEntriesScannedPostFilter;
+ _scanKillingMode = scanKillingMode;
}
@Nullable
@@ -156,4 +175,10 @@ public class QueryConfig extends BaseJsonConfig {
public Long getMaxEntriesScannedPostFilter() {
return _maxEntriesScannedPostFilter;
}
+
+ @Nullable
+ @JsonProperty("scanKillingMode")
+ public String getScanKillingMode() {
+ return _scanKillingMode;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java
index ca970cf5ff1..dd79faa2aff 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.pinot.spi.exception.QueryErrorCode;
@@ -65,6 +66,25 @@ public class QueryExecutionContext {
private volatile TerminationException _terminateException;
+ /// Per-query scan cost accumulators for scan-based killing, tracking
cumulative scan cost across all segments.
+ @Nullable
+ private volatile QueryScanCostContext _queryScanCostContext;
+
+ @Nullable
+ private volatile Object _cachedKillingStrategy;
+
+ @Nullable
+ private volatile String _tableName;
+
+ @Nullable
+ private volatile String _queryId;
+
+ @Nullable
+ private volatile Accounting.ScanKillingMode _effectiveScanKillingMode;
+
+ /// Guards single-emission of the scan-based killing dry-run log line and
metric for this query
+ private final AtomicBoolean _scanKillingDryRunEmitted = new
AtomicBoolean(false);
+
public QueryExecutionContext(QueryType queryType, long requestId, String
cid, String workloadName, long startTimeMs,
long activeDeadlineMs, long passiveDeadlineMs, String brokerId, String
instanceId, String queryHash) {
_queryType = queryType;
@@ -193,4 +213,65 @@ public class QueryExecutionContext {
public TerminationException getTerminateException() {
return _terminateException;
}
+
+ @Nullable
+ public QueryScanCostContext getQueryScanCostContext() {
+ return _queryScanCostContext;
+ }
+
+ public void setQueryScanCostContext(@Nullable QueryScanCostContext
queryScanCostContext) {
+ _queryScanCostContext = queryScanCostContext;
+ }
+
+ @Nullable
+ public Object getCachedKillingStrategy() {
+ return _cachedKillingStrategy;
+ }
+
+ public void setCachedKillingStrategy(@Nullable Object cachedKillingStrategy)
{
+ _cachedKillingStrategy = cachedKillingStrategy;
+ }
+
+ @Nullable
+ public String getTableName() {
+ return _tableName;
+ }
+
+ public void setTableName(@Nullable String tableName) {
+ _tableName = tableName;
+ }
+
+ @Nullable
+ public String getQueryId() {
+ return _queryId;
+ }
+
+ public void setQueryId(@Nullable String queryId) {
+ _queryId = queryId;
+ }
+
+ /**
+ * Returns the per-table scan killing mode override set for this query, or
{@code null} if no
+ * table-level override is configured. When {@code null}, the cluster-level
mode from
+ * {@link org.apache.pinot.spi.utils.CommonConstants.Accounting} applies.
+ */
+ @Nullable
+ public Accounting.ScanKillingMode getEffectiveScanKillingMode() {
+ return _effectiveScanKillingMode;
+ }
+
+ /**
+ * Sets the per-table scan killing mode for this query. Pass {@code null} to
fall back to the
+ * cluster-level mode. Called once during query initialization; thread-safe
via {@code volatile}.
+ */
+ public void setEffectiveScanKillingMode(@Nullable Accounting.ScanKillingMode
effectiveScanKillingMode) {
+ _effectiveScanKillingMode = effectiveScanKillingMode;
+ }
+
+ /**
+ * Atomically marks that the scan-based killing dry-run signal has been
emitted for this query
+ */
+ public boolean markScanKillingDryRunEmitted() {
+ return _scanKillingDryRunEmitted.compareAndSet(false, true);
+ }
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
index 5d6ea187d2d..fce5748652c 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.spi.config.table;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -92,4 +93,71 @@ public class QueryConfigScanKillingTest {
public void testZeroMaxDocsScannedThrows() {
new QueryConfig(null, null, null, null, null, null, null, 0L, null);
}
+
+ @Test
+ public void testScanKillingModeDefaultsToNull() {
+ QueryConfig config = new QueryConfig(null, null, null, null, null, null,
null, null, null);
+ assertNull(config.getScanKillingMode());
+ }
+
+ @Test
+ public void testScanKillingModeSetExplicitly() {
+ QueryConfig config = new QueryConfig(null, null, null, null, null, null,
null, null, null, "enforce");
+ assertEquals(config.getScanKillingMode(), "enforce");
+ }
+
+ @Test
+ public void testScanKillingModeJsonRoundTrip()
+ throws Exception {
+ QueryConfig config = new QueryConfig(null, null, null, null, null, null,
500_000L, null, null, "logOnly");
+
+ String json = OBJECT_MAPPER.writeValueAsString(config);
+ QueryConfig deserialized = OBJECT_MAPPER.readValue(json,
QueryConfig.class);
+
+ assertEquals(deserialized.getScanKillingMode(), "logOnly");
+ assertEquals(deserialized.getMaxEntriesScannedInFilter(),
Long.valueOf(500_000L));
+ }
+
+ @Test
+ public void testScanKillingModeDeserializesFromJson()
+ throws Exception {
+ String json = "{\"maxDocsScanned\": 5000000, \"scanKillingMode\":
\"disabled\"}";
+ QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class);
+ assertEquals(config.getScanKillingMode(), "disabled");
+ assertEquals(config.getMaxDocsScanned(), Long.valueOf(5_000_000L));
+ }
+
+ @Test
+ public void testScanKillingModeAbsentInJsonDeserializesToNull()
+ throws Exception {
+ String json = "{\"maxDocsScanned\": 5000000}";
+ QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class);
+ assertNull(config.getScanKillingMode());
+ }
+
+ @Test
+ public void testScanKillingModeStringParsesToCorrectEnum() {
+ // Verify the production parse path: getScanKillingMode() string →
ScanKillingMode enum
+ QueryConfig enforce = new QueryConfig(null, null, null, null, null, null,
null, null, null, "enforce");
+
assertEquals(ScanKillingMode.fromConfigValue(enforce.getScanKillingMode()),
ScanKillingMode.ENFORCE);
+
+ QueryConfig logOnly = new QueryConfig(null, null, null, null, null, null,
null, null, null, "logOnly");
+
assertEquals(ScanKillingMode.fromConfigValue(logOnly.getScanKillingMode()),
ScanKillingMode.LOG_ONLY);
+
+ QueryConfig disabled = new QueryConfig(null, null, null, null, null, null,
null, null, null, "disabled");
+
assertEquals(ScanKillingMode.fromConfigValue(disabled.getScanKillingMode()),
ScanKillingMode.DISABLED);
+ }
+
+ @Test
+ public void testScanKillingModeCaseInsensitiveParsing() {
+ // fromConfigValue is case-insensitive, so "Enforce" is valid
+ QueryConfig config = new QueryConfig(null, null, null, null, null, null,
null, null, null, "Enforce");
+ assertEquals(ScanKillingMode.fromConfigValue(config.getScanKillingMode()),
ScanKillingMode.ENFORCE);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testInvalidScanKillingModeThrowsAtConstruction() {
+ // A completely unrecognized value fails at construction time, not
silently at query time
+ new QueryConfig(null, null, null, null, null, null, null, null, null,
"enforced");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]