This is an automated email from the ASF dual-hosted git repository.
shauryachats pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5fd31838a1e [pinot-server/ proactive-query-killing] (1/2) add initial
SPI implementation for supporting query killing based on Scan Cost (#18102)
5fd31838a1e is described below
commit 5fd31838a1e7b818341ed01db6e6adfede87e410
Author: Anurag Rai <[email protected]>
AuthorDate: Tue May 12 06:01:10 2026 +0530
[pinot-server/ proactive-query-killing] (1/2) add initial SPI
implementation for supporting query killing based on Scan Cost (#18102)
---
.../apache/pinot/common/metrics/ServerMeter.java | 3 +
.../pinot/controller/helix/TableCacheTest.java | 3 +-
.../pinot/core/accounting/QueryMonitorConfig.java | 145 +++++++++
.../killing/CompositeQueryKillingStrategy.java | 84 ++++++
.../pinot/core/query/killing/QueryKillReport.java | 148 +++++++++
.../core/query/killing/QueryKillingManager.java | 185 ++++++++++++
.../core/query/killing/QueryKillingStrategy.java | 79 +++++
.../query/killing/QueryKillingStrategyFactory.java | 58 ++++
.../strategy/ScanEntriesThresholdStrategy.java | 157 ++++++++++
.../QueryMonitorConfigScanKillingTest.java | 191 ++++++++++++
.../killing/CompositeQueryKillingStrategyTest.java | 143 +++++++++
.../core/query/killing/QueryKillReportTest.java | 149 ++++++++++
.../query/killing/QueryKillingManagerTest.java | 331 +++++++++++++++++++++
.../strategy/ScanEntriesThresholdStrategyTest.java | 183 ++++++++++++
.../tests/OfflineClusterIntegrationTest.java | 3 +-
.../apache/pinot/spi/config/table/QueryConfig.java | 46 ++-
.../apache/pinot/spi/exception/QueryErrorCode.java | 1 +
.../pinot/spi/query/QueryScanCostContext.java | 81 +++++
.../apache/pinot/spi/utils/CommonConstants.java | 51 ++++
.../config/table/QueryConfigScanKillingTest.java | 95 ++++++
.../pinot/spi/query/QueryScanCostContextTest.java | 98 ++++++
21 files changed, 2230 insertions(+), 4 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 2e40a399f98..49ea60bb56c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -115,6 +115,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
READINESS_CHECK_OK_CALLS("readinessCheck", true),
READINESS_CHECK_BAD_CALLS("readinessCheck", true),
QUERIES_KILLED("query", true),
+ QUERIES_KILLED_SCAN("queriesKilledScan", true),
+ QUERIES_KILLED_SCAN_DRY_RUN("queriesKilledScanDryRun", true),
+ QUERIES_KILLED_SCAN_ERROR("queriesKilledScanError", true),
QUERIES_THROTTLED("query", true),
HEAP_CRITICAL_LEVEL_EXCEEDED("count", true),
HEAP_PANIC_LEVEL_EXCEEDED("count", true),
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index 6fcf472a69d..b8242dff1ae 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -218,8 +218,7 @@ public class TableCacheTest {
logicalTableConfig =
ControllerTest.getDummyLogicalTableConfig(LOGICAL_TABLE_NAME,
List.of(OFFLINE_TABLE_NAME, ANOTHER_TABLE_OFFLINE), "DefaultTenant");
logicalTableConfig.setQueryConfig(new QueryConfig(
- 1L, false, false, Map.of("DaysSinceEpoch * 24",
"NewAddedDerivedHoursSinceEpoch"), 1L, 1L
- ));
+ 1L, false, false, Map.of("DaysSinceEpoch * 24",
"NewAddedDerivedHoursSinceEpoch"), 1L, 1L));
TEST_INSTANCE.getHelixResourceManager().updateLogicalTableConfig(logicalTableConfig);
TestUtils.waitForCondition(
aVoid ->
Objects.requireNonNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME))
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
index 280e989c3ca..920648ca506 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
@@ -18,13 +18,21 @@
*/
package org.apache.pinot.core.accounting;
+import java.util.Arrays;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Accounting;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class QueryMonitorConfig {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryMonitorConfig.class);
+
private final long _maxHeapSize;
// don't kill a query if its memory footprint is below some ratio of
_maxHeapSize
@@ -72,6 +80,16 @@ public class QueryMonitorConfig {
private final boolean _workloadCostEnforcementEnabled;
+ private final ScanKillingMode _scanBasedKillingMode;
+
+ private final long _scanBasedKillingMaxEntriesScannedInFilter;
+
+ private final long _scanBasedKillingMaxDocsScanned;
+
+ private final long _scanBasedKillingMaxEntriesScannedPostFilter;
+
+ private final String _scanBasedKillingStrategyFactoryClassName;
+
public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) {
_maxHeapSize = maxHeapSize;
@@ -122,6 +140,22 @@ public class QueryMonitorConfig {
_workloadCostEnforcementEnabled =
config.getProperty(Accounting.Keys.WORKLOAD_ENABLE_COST_ENFORCEMENT,
Accounting.DEFAULT_WORKLOAD_ENABLE_COST_ENFORCEMENT);
+
+ _scanBasedKillingMode = validateScanKillingMode(config.getProperty(
+ CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
+
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MODE.getConfigValue()));
+ _scanBasedKillingMaxEntriesScannedInFilter = config.getProperty(
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
+
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER);
+ _scanBasedKillingMaxDocsScanned = config.getProperty(
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED,
+
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_DOCS_SCANNED);
+ _scanBasedKillingMaxEntriesScannedPostFilter = config.getProperty(
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER,
+
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER);
+ _scanBasedKillingStrategyFactoryClassName = config.getProperty(
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME,
+ (String) null);
}
QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String> changedConfigs,
Map<String, String> clusterConfigs) {
@@ -244,6 +278,88 @@ public class QueryMonitorConfig {
} else {
_workloadCostEnforcementEnabled =
oldConfig._workloadCostEnforcementEnabled;
}
+
+ if
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE))
{
+ _scanBasedKillingMode =
validateScanKillingMode(clusterConfigs.getOrDefault(
+ CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
+
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MODE.getConfigValue()));
+ } else {
+ _scanBasedKillingMode = oldConfig.getScanBasedKillingMode();
+ }
+
+ if (changedConfigs.contains(
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER))
{
+ _scanBasedKillingMaxEntriesScannedInFilter = parseLongOrDefault(
+
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER),
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
+
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER);
+ } else {
+ _scanBasedKillingMaxEntriesScannedInFilter =
+ oldConfig.getScanBasedKillingMaxEntriesScannedInFilter();
+ }
+
+ if (changedConfigs.contains(
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED)) {
+ _scanBasedKillingMaxDocsScanned = parseLongOrDefault(
+
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED),
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED,
+
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_DOCS_SCANNED);
+ } else {
+ _scanBasedKillingMaxDocsScanned =
oldConfig.getScanBasedKillingMaxDocsScanned();
+ }
+
+ if (changedConfigs.contains(
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER))
{
+ _scanBasedKillingMaxEntriesScannedPostFilter = parseLongOrDefault(
+
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER),
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER,
+
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER);
+ } else {
+ _scanBasedKillingMaxEntriesScannedPostFilter =
+ oldConfig.getScanBasedKillingMaxEntriesScannedPostFilter();
+ }
+
+ if (changedConfigs.contains(
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME))
{
+ _scanBasedKillingStrategyFactoryClassName = clusterConfigs.getOrDefault(
+
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME,
null);
+ } else {
+ _scanBasedKillingStrategyFactoryClassName =
oldConfig.getScanBasedKillingStrategyFactoryClassName();
+ }
+ }
+
+ /**
+ * Validates the scan-based killing mode. If the value is not one of the
recognized modes
+ * (disabled, logOnly, enforce), logs an error and falls back to {@link
ScanKillingMode#DISABLED}
+ * so the server continues to start normally.
+ */
+ private static ScanKillingMode validateScanKillingMode(String mode) {
+ ScanKillingMode parsed = ScanKillingMode.fromConfigValue(mode);
+ if (parsed != null) {
+ return parsed;
+ }
+ LOGGER.error("Invalid value '{}' for config '{}'. Valid values are: {}. "
+ + "Defaulting to '{}'. Scan-based killing will be disabled.",
+ mode, CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
+ Arrays.toString(ScanKillingMode.values()),
ScanKillingMode.DISABLED.getConfigValue());
+ return ScanKillingMode.DISABLED;
+ }
+
+ /**
+ * Parses a long value from a config string, falling back to the default if
the value
+ * is null, empty, or not a valid number.
+ */
+ private static long parseLongOrDefault(@Nullable String value, String
configKey, long defaultValue) {
+ if (value == null || value.isEmpty()) {
+ return defaultValue;
+ }
+ try {
+ return Long.parseLong(value);
+ } catch (NumberFormatException e) {
+ LOGGER.error("Invalid numeric value '{}' for config '{}'. Defaulting to
{}.",
+ value, configKey, defaultValue);
+ return defaultValue;
+ }
}
public long getMaxHeapSize() {
@@ -313,4 +429,33 @@ public class QueryMonitorConfig {
public boolean isWorkloadCostEnforcementEnabled() {
return _workloadCostEnforcementEnabled;
}
+
+ public ScanKillingMode getScanBasedKillingMode() {
+ return _scanBasedKillingMode;
+ }
+
+ public boolean isScanBasedKillingEnabled() {
+ return _scanBasedKillingMode != ScanKillingMode.DISABLED;
+ }
+
+ public boolean isScanBasedKillingLogOnly() {
+ return _scanBasedKillingMode == ScanKillingMode.LOG_ONLY;
+ }
+
+ public long getScanBasedKillingMaxEntriesScannedInFilter() {
+ return _scanBasedKillingMaxEntriesScannedInFilter;
+ }
+
+ public long getScanBasedKillingMaxDocsScanned() {
+ return _scanBasedKillingMaxDocsScanned;
+ }
+
+ public long getScanBasedKillingMaxEntriesScannedPostFilter() {
+ return _scanBasedKillingMaxEntriesScannedPostFilter;
+ }
+
+ @Nullable
+ public String getScanBasedKillingStrategyFactoryClassName() {
+ return _scanBasedKillingStrategyFactoryClassName;
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
new file mode 100644
index 00000000000..aa363d4d360
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+
+
+/**
+ * Combines multiple {@link QueryKillingStrategy} instances with AND/OR
semantics.
+ *
+ * <p>Strategies are sorted by {@link QueryKillingStrategy#priority()} (lower
= checked first).
+ * In {@link Mode#ANY} mode, the first strategy that triggers produces the
kill report.
+ * In {@link Mode#ALL} mode, all strategies must trigger for a kill.</p>
+ *
+ */
+public class CompositeQueryKillingStrategy implements QueryKillingStrategy {
+
+ /** Composition mode for combining strategies. */
+ public enum Mode {
+ /** Kill if ANY strategy triggers (OR). */
+ ANY,
+ /** Kill only if ALL strategies trigger (AND). */
+ ALL
+ }
+
+ private final List<QueryKillingStrategy> _strategies;
+ private final Mode _mode;
+
+ public CompositeQueryKillingStrategy(List<QueryKillingStrategy> strategies,
Mode mode) {
+ _strategies = strategies.stream()
+ .sorted(Comparator.comparingInt(QueryKillingStrategy::priority))
+ .collect(Collectors.toList());
+ _mode = mode;
+ }
+
+ @Override
+ public boolean shouldTerminate(QueryScanCostContext ctx) {
+ if (_mode == Mode.ANY) {
+ for (QueryKillingStrategy s : _strategies) {
+ if (s.shouldTerminate(ctx)) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ for (QueryKillingStrategy s : _strategies) {
+ if (!s.shouldTerminate(ctx)) {
+ return false;
+ }
+ }
+ return !_strategies.isEmpty();
+ }
+ }
+
+ @Override
+ public QueryKillReport buildKillReport(QueryScanCostContext ctx,
+ String queryId, String tableName, String configSource) {
+ for (QueryKillingStrategy s : _strategies) {
+ if (s.shouldTerminate(ctx)) {
+ return s.buildKillReport(ctx, queryId, tableName, configSource);
+ }
+ }
+ throw new IllegalStateException("buildKillReport called but no strategy
triggered");
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
new file mode 100644
index 00000000000..d44d873188d
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import org.apache.pinot.spi.query.QueryScanCostContext;
+
+
+/**
+ * Immutable snapshot of a query-kill event
+ */
+public final class QueryKillReport {
+ private final String _queryId;
+ private final String _tableName;
+ private final String _strategyName;
+ private final String _triggeringMetric;
+ private final long _actualValue;
+ private final long _thresholdValue;
+ private final String _configSource;
+ private final long _snapshotEntriesScannedInFilter;
+ private final long _snapshotDocsScanned;
+ private final long _snapshotEntriesScannedPostFilter;
+ private final long _elapsedTimeMs;
+
+ /**
+ * Creates a {@code QueryKillReport} by snapshotting the current state of
{@code context}.
+ *
+ * @param queryId unique identifier of the killed query
+ * @param tableName fully-qualified table name (e.g. {@code
myTable_OFFLINE})
+ * @param strategyName name of the kill strategy that triggered the kill
+ * @param triggeringMetric name of the metric that exceeded the threshold
+ * @param actualValue observed metric value at kill time
+ * @param thresholdValue configured threshold that was exceeded
+ * @param configSource source of the threshold config (e.g. {@code
TABLE_CONFIG})
+ * @param context live scan-cost context; values are snapshotted
immediately
+ */
+ public QueryKillReport(String queryId, String tableName, String
strategyName, String triggeringMetric,
+ long actualValue, long thresholdValue, String configSource,
QueryScanCostContext context) {
+ _queryId = queryId;
+ _tableName = tableName;
+ _strategyName = strategyName;
+ _triggeringMetric = triggeringMetric;
+ _actualValue = actualValue;
+ _thresholdValue = thresholdValue;
+ _configSource = configSource;
+ // Snapshot mutable LongAdder values immediately to decouple from live
context
+ _snapshotEntriesScannedInFilter = context.getNumEntriesScannedInFilter();
+ _snapshotDocsScanned = context.getNumDocsScanned();
+ _snapshotEntriesScannedPostFilter =
context.getNumEntriesScannedPostFilter();
+ _elapsedTimeMs = context.getElapsedTimeMs();
+ }
+
+ // ----- Getters -----
+
+ public String getQueryId() {
+ return _queryId;
+ }
+
+ public String getTableName() {
+ return _tableName;
+ }
+
+ public String getStrategyName() {
+ return _strategyName;
+ }
+
+ public String getTriggeringMetric() {
+ return _triggeringMetric;
+ }
+
+ public long getActualValue() {
+ return _actualValue;
+ }
+
+ public long getThresholdValue() {
+ return _thresholdValue;
+ }
+
+ public String getConfigSource() {
+ return _configSource;
+ }
+
+ public long getSnapshotEntriesScannedInFilter() {
+ return _snapshotEntriesScannedInFilter;
+ }
+
+ public long getSnapshotDocsScanned() {
+ return _snapshotDocsScanned;
+ }
+
+ public long getSnapshotEntriesScannedPostFilter() {
+ return _snapshotEntriesScannedPostFilter;
+ }
+
+ public long getElapsedTimeMs() {
+ return _elapsedTimeMs;
+ }
+
+ // ----- Message formatters -----
+
+ /**
+ * Returns a user-facing message describing why the query was killed and
what action to take.
+ *
+ * <p>Numbers are formatted with commas (e.g. {@code 1,234,567}) for
readability.
+ * Includes actionable advice about adding a missing index to reduce scan
cost.</p>
+ */
+ public String toCustomerMessage() {
+ return String.format(
+ "Query '%s' on table '%s' was killed because '%s' (%,d) exceeded the
threshold (%,d) "
+ + "configured in %s. "
+ + "At kill time: entriesScannedInFilter=%,d, docsScanned=%,d, "
+ + "entriesScannedPostFilter=%,d, elapsedMs=%d. "
+ + "To reduce scan cost, consider adding a missing index (e.g.
inverted or range index) "
+ + "on the filter columns.",
+ _queryId, _tableName, _triggeringMetric, _actualValue,
_thresholdValue, _configSource,
+ _snapshotEntriesScannedInFilter, _snapshotDocsScanned,
_snapshotEntriesScannedPostFilter, _elapsedTimeMs);
+ }
+
+ /**
+ * Returns a structured log line suitable for grep and alerting pipelines.
+ *
+ * <p>Format: {@code QUERY_KILLED key=value ...} with plain
(non-comma-formatted) numbers.</p>
+ */
+ public String toInternalLogMessage() {
+ return String.format(
+ "QUERY_KILLED queryId=%s table=%s strategy=%s metric=%s actual=%d
threshold=%d "
+ + "configSource=%s entriesScannedInFilter=%d docsScanned=%d "
+ + "entriesScannedPostFilter=%d elapsedMs=%d",
+ _queryId, _tableName, _strategyName, _triggeringMetric, _actualValue,
_thresholdValue,
+ _configSource, _snapshotEntriesScannedInFilter, _snapshotDocsScanned,
+ _snapshotEntriesScannedPostFilter, _elapsedTimeMs);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
new file mode 100644
index 00000000000..ee8f0678113
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+import
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Central manager for scan-based query killing. Owns the guard rails and
delegates the
+ * actual kill decision to a {@link QueryKillingStrategy}.
+ *
+ * The default factory is {@link ScanEntriesThresholdStrategy.Factory}, which
reads
+ * scan thresholds from {@link QueryMonitorConfig}. Custom factories can be
configured
+ * via {@code accounting.scan.based.killing.strategy.factory.class.name}.
+ *
+ */
+public class QueryKillingManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryKillingManager.class);
+
+ private static volatile QueryKillingManager _instance;
+
+ private final AtomicReference<QueryMonitorConfig> _configRef;
+ private final ServerMetrics _serverMetrics;
+
+ /**
+ * Null if: killing is disabled, config is insufficient, or factory failed
to load.
+ * Rebuilt when config changes via {@link #rebuildStrategy()}.
+ */
+ @Nullable
+ private volatile QueryKillingStrategy _strategy;
+
+ public QueryKillingManager(AtomicReference<QueryMonitorConfig> configRef,
ServerMetrics serverMetrics) {
+ _configRef = configRef;
+ _serverMetrics = serverMetrics;
+ }
+
+ /**
+ * Initializes the singleton instance and builds the strategy from config.
+ * Called once during server startup.
+ */
+ public static void init(AtomicReference<QueryMonitorConfig> configRef,
ServerMetrics serverMetrics) {
+ QueryKillingManager manager = new QueryKillingManager(configRef,
serverMetrics);
+ manager.rebuildStrategy();
+ _instance = manager;
+ }
+
+ @Nullable
+ public static QueryKillingManager getInstance() {
+ return _instance;
+ }
+
+ /**
+ * Rebuilds the strategy from the current config. Called at init and when
+ * cluster config changes (via the same onChange path that rebuilds
QueryMonitorConfig).
+ */
+ public void rebuildStrategy() {
+ QueryMonitorConfig config = _configRef.get();
+ if (config == null || !config.isScanBasedKillingEnabled()) {
+ _strategy = null;
+ return;
+ }
+
+ try {
+ QueryKillingStrategyFactory factory = loadFactory(config);
+ _strategy = factory.create(config);
+ if (_strategy == null) {
+ LOGGER.warn("Scan-based killing is enabled but strategy factory '{}'
returned null — "
+ + "required configuration may be missing. Scan-based killing will
be effectively disabled.",
+ factory.getName());
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to initialize scan-based killing strategy. "
+ + "Scan-based killing will be disabled.", e);
+ _strategy = null;
+ }
+ }
+
+ /**
+ * Loads the strategy factory from config. If a custom factory class name is
configured,
+ * loads it by reflection (following the same pattern as {@code
ThreadAccountantUtils.createAccountant()}).
+ * Otherwise, returns the default {@link
ScanEntriesThresholdStrategy.Factory}.
+ */
+ private QueryKillingStrategyFactory loadFactory(QueryMonitorConfig config) {
+ String factoryClassName =
config.getScanBasedKillingStrategyFactoryClassName();
+ if (factoryClassName != null && !factoryClassName.isEmpty()) {
+ LOGGER.info("Loading custom query killing strategy factory: {}",
factoryClassName);
+ try {
+ return (QueryKillingStrategyFactory) Class.forName(factoryClassName)
+ .getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ LOGGER.error("Failed to load custom strategy factory '{}', falling
back to default",
+ factoryClassName, e);
+ }
+ }
+ return new ScanEntriesThresholdStrategy.Factory();
+ }
+
+ /**
+ * Returns the active strategy. Visible for testing.
+ */
+ @Nullable
+ public QueryKillingStrategy getActiveStrategy() {
+ return _strategy;
+ }
+
+ /**
+ * Evaluates whether the query should be killed based on the active strategy.
+ *
+ * <p>Calls {@link QueryKillingStrategy#forQuery(QueryConfig,
QueryMonitorConfig)}
+ * to resolve table-level overrides before evaluating.</p>
+ */
+ public void checkAndKillIfNeeded(QueryExecutionContext executionContext,
+ QueryScanCostContext scanCostContext, String queryId, String tableName,
+ @Nullable QueryConfig queryConfig) {
+ // no strategy means killing is disabled or unconfigured
+ QueryKillingStrategy strategy = _strategy;
+ if (strategy == null) {
+ return;
+ }
+
+ QueryMonitorConfig config = _configRef.get();
+ if (config == null || !config.isScanBasedKillingEnabled()) {
+ return;
+ }
+
+ // Prevent duplicate kills
+ if (executionContext.getTerminateException() != null) {
+ return;
+ }
+
+ try {
+ // Resolve per-query table overrides (returns same instance if no
overrides)
+ QueryKillingStrategy queryStrategy = strategy.forQuery(queryConfig,
config);
+
+ String configSource = (queryStrategy != strategy) ? "table:" + tableName
: "cluster";
+
+ // Delegate to strategy
+ if (!queryStrategy.shouldTerminate(scanCostContext)) {
+ return;
+ }
+
+ QueryKillReport report = queryStrategy.buildKillReport(
+ scanCostContext, queryId, tableName, configSource);
+
+ if (config.isScanBasedKillingLogOnly()) {
+ LOGGER.info("Query killed in LogOnly mode: {}",
report.toInternalLogMessage());
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN,
1);
+ return;
+ }
+
+ LOGGER.warn("Query Killed in enforce mode: {}",
report.toInternalLogMessage());
+ executionContext.terminate(queryStrategy.getErrorCode(),
report.toCustomerMessage());
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1);
+ } catch (Exception e) {
+ LOGGER.error("Error in scan-based killing evaluation for query {}",
queryId, e);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR, 1);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
new file mode 100644
index 00000000000..4bf4390d59e
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+
+
+/**
+ * Determines whether a query should be terminated based on accumulated scan
cost.
+ *
+ *
+ * <p>To create a custom strategy:
+ * <ol>
+ * <li>Implement this interface</li>
+ * <li>Implement {@link QueryKillingStrategyFactory} to create it from
config</li>
+ * <li>Set {@code accounting.scan.based.killing.strategy.factory.class.name}
+ * to your factory class name</li>
+ * </ol>
+ */
+public interface QueryKillingStrategy {
+
+ /** Returns true if the query should be terminated immediately. */
+ boolean shouldTerminate(QueryScanCostContext context);
+
+ /**
+ * Builds a structured kill report with full context.
+ * Only called when {@link #shouldTerminate} returns true.
+ */
+ QueryKillReport buildKillReport(QueryScanCostContext context,
+ String queryId, String tableName, String configSource);
+
+ /** Error code for the termination response. */
+ default QueryErrorCode getErrorCode() {
+ return QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED;
+ }
+
+ /** Priority: lower number = checked first in composite strategies. */
+ default int priority() {
+ return 100;
+ }
+
+ /**
+ * Returns a query-specific variant of this strategy with table-level
overrides applied.
+ * If no table overrides are relevant, returns {@code this} (no new
allocation).
+ *
+ * <p>The default implementation returns {@code this}, meaning the strategy
does not
+ * support table-level overrides. Strategies that support per-table
thresholds
+ * (like {@link
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy})
+ * should override this method.</p>
+ *
+ * @param queryConfig table-level query config (nullable — null means no
table overrides)
+ * @param clusterConfig cluster-level config for resolving fallback
thresholds
+ * @return this strategy or a new instance with resolved thresholds
+ */
+ default QueryKillingStrategy forQuery(@Nullable QueryConfig queryConfig,
+ QueryMonitorConfig clusterConfig) {
+ return this;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategyFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategyFactory.java
new file mode 100644
index 00000000000..cdab5a51de5
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategyFactory.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+
+
+/**
+ * Factory for creating {@link QueryKillingStrategy} instances from config.
+ *
+ * <p>Implementations are loaded by class name via the config key
+ * {@code accounting.scan.based.killing.strategy.factory.class.name}.
+ * Each factory reads whatever config keys it needs from {@link
QueryMonitorConfig}
+ * and creates a strategy instance. If required config is missing, the factory
+ * returns {@code null} and logs a warning.</p>
+ *
+ * To add a new strategy:
+ * <ol>
+ * <li>Implement {@link QueryKillingStrategy} with your kill logic</li>
+ * <li>Implement this interface to create your strategy from config</li>
+ * <li>Set the config key to your factory class name</li>
+ * <li>Add your strategy-specific config keys to the server config</li>
+ * </ol>
+ *
+ * @see
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy.Factory
+ */
+public interface QueryKillingStrategyFactory {
+
+ /**
+ * Creates a strategy from the current config. Returns {@code null} if
required
+ * configuration is not present (e.g., no thresholds set), in which case the
+ * manager will log a warning and scan-based killing will be effectively
disabled.
+ */
+ @Nullable
+ QueryKillingStrategy create(QueryMonitorConfig config);
+
+ /**
+ * Human-readable name of this factory for logging.
+ */
+ String getName();
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
new file mode 100644
index 00000000000..92b13dcdc60
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing.strategy;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+import org.apache.pinot.core.query.killing.QueryKillReport;
+import org.apache.pinot.core.query.killing.QueryKillingStrategy;
+import org.apache.pinot.core.query.killing.QueryKillingStrategyFactory;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Kills queries that exceed scan entry or doc thresholds.
+ *
+ * <p>Primary strategy for proactive query killing. Checks
+ * {@code numEntriesScannedInFilter} (primary signal — catches expensive filter
+ * predicates / missing indexes) and {@code numDocsScanned} (secondary signal —
+ * catches large aggregations).</p>
+ *
+ * <p>A threshold of {@link Long#MAX_VALUE} disables that metric's check.</p>
+ *
+ *
+ * <p>Supports table-level overrides via {@link #forQuery(QueryConfig,
QueryMonitorConfig)}.
+ * When a table has specific thresholds in its {@link QueryConfig}, a new
instance is
+ * created with the resolved values. Otherwise, the same instance is
returned.</p>
+ */
+public class ScanEntriesThresholdStrategy implements QueryKillingStrategy {
+ private static final String STRATEGY_NAME = "ScanEntriesThresholdStrategy";
+
+ private final long _maxEntriesScannedInFilter;
+ private final long _maxDocsScanned;
+
+ public ScanEntriesThresholdStrategy(long maxEntriesScannedInFilter, long
maxDocsScanned) {
+ _maxEntriesScannedInFilter = maxEntriesScannedInFilter;
+ _maxDocsScanned = maxDocsScanned;
+ }
+
+ @Override
+ public boolean shouldTerminate(QueryScanCostContext ctx) {
+ return (_maxEntriesScannedInFilter < Long.MAX_VALUE
+ && ctx.getNumEntriesScannedInFilter() > _maxEntriesScannedInFilter)
+ || (_maxDocsScanned < Long.MAX_VALUE
+ && ctx.getNumDocsScanned() > _maxDocsScanned);
+ }
+
+ @Override
+ public QueryKillReport buildKillReport(QueryScanCostContext ctx,
+ String queryId, String tableName, String configSource) {
+ String triggeringMetric;
+ long actualValue;
+ long thresholdValue;
+ if (_maxEntriesScannedInFilter < Long.MAX_VALUE
+ && ctx.getNumEntriesScannedInFilter() > _maxEntriesScannedInFilter) {
+ triggeringMetric = "numEntriesScannedInFilter";
+ actualValue = ctx.getNumEntriesScannedInFilter();
+ thresholdValue = _maxEntriesScannedInFilter;
+ } else {
+ triggeringMetric = "numDocsScanned";
+ actualValue = ctx.getNumDocsScanned();
+ thresholdValue = _maxDocsScanned;
+ }
+ return new QueryKillReport(queryId, tableName, STRATEGY_NAME,
+ triggeringMetric, actualValue, thresholdValue, configSource, ctx);
+ }
+
+ @Override
+ public int priority() {
+ return 10;
+ }
+
+ /**
+ * Returns a query-specific variant with table-level threshold overrides
applied.
+ * If the table's {@link QueryConfig} has non-null threshold fields, they
take precedence
+ * over this strategy's thresholds. Otherwise, returns {@code this} (no
allocation).
+ */
+ @Override
+ public QueryKillingStrategy forQuery(@Nullable QueryConfig queryConfig,
+ QueryMonitorConfig clusterConfig) {
+ if (queryConfig == null) {
+ return this;
+ }
+ Long tableEntries = queryConfig.getMaxEntriesScannedInFilter();
+ Long tableDocs = queryConfig.getMaxDocsScanned();
+ if (tableEntries == null && tableDocs == null) {
+ return this;
+ }
+ return new ScanEntriesThresholdStrategy(
+ tableEntries != null ? tableEntries : _maxEntriesScannedInFilter,
+ tableDocs != null ? tableDocs : _maxDocsScanned);
+ }
+
+ public long getMaxEntriesScannedInFilter() {
+ return _maxEntriesScannedInFilter;
+ }
+
+ public long getMaxDocsScanned() {
+ return _maxDocsScanned;
+ }
+
+ /**
+ * Factory that creates a {@link ScanEntriesThresholdStrategy} from
+ * {@link QueryMonitorConfig}. This is the default factory used when no
custom
+ * strategy factory is configured.
+ *
+ * <p>Returns {@code null} if no scan thresholds are configured (all are
+ * {@link Long#MAX_VALUE}), which causes the manager to log a warning that
+ * scan-based killing is enabled but effectively unconfigured.</p>
+ */
+ public static class Factory implements QueryKillingStrategyFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Factory.class);
+
+ @Override
+ @Nullable
+ public QueryKillingStrategy create(QueryMonitorConfig config) {
+ long maxEntries = config.getScanBasedKillingMaxEntriesScannedInFilter();
+ long maxDocs = config.getScanBasedKillingMaxDocsScanned();
+
+ if (maxEntries == Long.MAX_VALUE && maxDocs == Long.MAX_VALUE) {
+ LOGGER.warn("Scan-based killing is enabled but no thresholds are
configured. "
+ + "Set at least one of:
accounting.scan.based.killing.max.entries.scanned.in.filter, "
+ + "accounting.scan.based.killing.max.docs.scanned. "
+ + "Scan-based killing will be effectively disabled until
thresholds are set.");
+ return null;
+ }
+
+ LOGGER.info("Initialized ScanEntriesThresholdStrategy with
maxEntriesScannedInFilter={}, maxDocsScanned={}",
+ maxEntries == Long.MAX_VALUE ? "disabled" : maxEntries,
+ maxDocs == Long.MAX_VALUE ? "disabled" : maxDocs);
+ return new ScanEntriesThresholdStrategy(maxEntries, maxDocs);
+ }
+
+ @Override
+ public String getName() {
+ return "ScanEntriesThresholdStrategyFactory";
+ }
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigScanKillingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigScanKillingTest.java
new file mode 100644
index 00000000000..4bf4b5ba0c5
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigScanKillingTest.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.accounting;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class QueryMonitorConfigScanKillingTest {
+
+ private static final long MAX_HEAP = 1_000_000_000L;
+
+ @Test
+ public void testDefaultModeIsDisabled() {
+ PinotConfiguration config = new PinotConfiguration();
+ QueryMonitorConfig qmc = new QueryMonitorConfig(config, MAX_HEAP);
+ assertEquals(qmc.getScanBasedKillingMode(), ScanKillingMode.DISABLED);
+ assertFalse(qmc.isScanBasedKillingEnabled());
+ assertFalse(qmc.isScanBasedKillingLogOnly());
+ }
+
+ @Test
+ public void testEnforceMode() {
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
"enforce");
+
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
"500000000");
+
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED,
"50000000");
+
+ QueryMonitorConfig qmc = new QueryMonitorConfig(config, MAX_HEAP);
+ assertEquals(qmc.getScanBasedKillingMode(), ScanKillingMode.ENFORCE);
+ assertTrue(qmc.isScanBasedKillingEnabled());
+ assertFalse(qmc.isScanBasedKillingLogOnly());
+ assertEquals(qmc.getScanBasedKillingMaxEntriesScannedInFilter(),
500_000_000L);
+ assertEquals(qmc.getScanBasedKillingMaxDocsScanned(), 50_000_000L);
+ }
+
+ @Test
+ public void testLogOnlyMode() {
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
"logOnly");
+
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
"500000000");
+
+ QueryMonitorConfig qmc = new QueryMonitorConfig(config, MAX_HEAP);
+ assertEquals(qmc.getScanBasedKillingMode(), ScanKillingMode.LOG_ONLY);
+ assertTrue(qmc.isScanBasedKillingEnabled());
+ assertTrue(qmc.isScanBasedKillingLogOnly());
+ }
+
+ @Test
+ public void testDynamicConfigUpdateToEnforce() {
+ PinotConfiguration config = new PinotConfiguration();
+ QueryMonitorConfig oldConfig = new QueryMonitorConfig(config, MAX_HEAP);
+ assertFalse(oldConfig.isScanBasedKillingEnabled());
+
+ Set<String> changedConfigs = new HashSet<>();
+ changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE);
+
changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+ clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
"enforce");
+
clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
"200000000");
+
+ QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig,
changedConfigs, clusterConfigs);
+ assertTrue(newConfig.isScanBasedKillingEnabled());
+ assertFalse(newConfig.isScanBasedKillingLogOnly());
+ assertEquals(newConfig.getScanBasedKillingMaxEntriesScannedInFilter(),
200_000_000L);
+ assertEquals(newConfig.getScanBasedKillingMaxDocsScanned(),
Long.MAX_VALUE);
+ }
+
+ @Test
+ public void testDynamicConfigUpdateToLogOnly() {
+ // Start in enforce mode
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
"enforce");
+
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
"100000000");
+ QueryMonitorConfig oldConfig = new QueryMonitorConfig(config, MAX_HEAP);
+ assertTrue(oldConfig.isScanBasedKillingEnabled());
+ assertFalse(oldConfig.isScanBasedKillingLogOnly());
+
+ // Switch to logOnly
+ Set<String> changedConfigs = new HashSet<>();
+ changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+ clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
"logOnly");
+
+ QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig,
changedConfigs, clusterConfigs);
+ assertTrue(newConfig.isScanBasedKillingEnabled());
+ assertTrue(newConfig.isScanBasedKillingLogOnly());
+ // Threshold should carry over from old config
+ assertEquals(newConfig.getScanBasedKillingMaxEntriesScannedInFilter(),
100_000_000L);
+ }
+
+ @Test
+ public void testInvalidModeFallsBackToDisabled() {
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
"invalidValue");
+
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
"100000000");
+
+ QueryMonitorConfig qmc = new QueryMonitorConfig(config, MAX_HEAP);
+ assertEquals(qmc.getScanBasedKillingMode(), ScanKillingMode.DISABLED);
+ assertFalse(qmc.isScanBasedKillingEnabled());
+ assertFalse(qmc.isScanBasedKillingLogOnly());
+ }
+
+ @Test
+ public void testInvalidModeInDynamicUpdateFallsBackToDisabled() {
+ // Start in enforce mode
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
"enforce");
+
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
"100000000");
+ QueryMonitorConfig oldConfig = new QueryMonitorConfig(config, MAX_HEAP);
+ assertTrue(oldConfig.isScanBasedKillingEnabled());
+
+ // Dynamic update with invalid mode
+ Set<String> changedConfigs = new HashSet<>();
+ changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+ clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
"typoEnforce");
+
+ QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig,
changedConfigs, clusterConfigs);
+ assertEquals(newConfig.getScanBasedKillingMode(),
ScanKillingMode.DISABLED);
+ assertFalse(newConfig.isScanBasedKillingEnabled());
+ }
+
+ @Test
+ public void testCaseInsensitiveMode() {
+ // "Enforce" (capital E) should parse correctly — mode values are
case-insensitive
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
"Enforce");
+
+ QueryMonitorConfig qmc = new QueryMonitorConfig(config, MAX_HEAP);
+ assertEquals(qmc.getScanBasedKillingMode(), ScanKillingMode.ENFORCE);
+ assertTrue(qmc.isScanBasedKillingEnabled());
+ }
+
+ @Test
+ public void testInvalidThresholdValuesFallBackToDefaults() {
+ // Start with valid config
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
"enforce");
+
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
"100000000");
+
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED,
"50000000");
+ QueryMonitorConfig oldConfig = new QueryMonitorConfig(config, MAX_HEAP);
+
+ // Dynamic update with invalid values (empty string, non-numeric)
+ Set<String> changedConfigs = new HashSet<>();
+
changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER);
+
changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED);
+
changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+
clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
"");
+
clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED,
"notANumber");
+
clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER,
" ");
+
+ QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig,
changedConfigs, clusterConfigs);
+
+ // All should fall back to defaults instead of throwing
+ assertEquals(newConfig.getScanBasedKillingMaxEntriesScannedInFilter(),
Long.MAX_VALUE);
+ assertEquals(newConfig.getScanBasedKillingMaxDocsScanned(),
Long.MAX_VALUE);
+ assertEquals(newConfig.getScanBasedKillingMaxEntriesScannedPostFilter(),
Long.MAX_VALUE);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
new file mode 100644
index 00000000000..ea48ff49859
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class CompositeQueryKillingStrategyTest {
+
+ @Test
+ public void testAnyModeKillsWhenOneTriggered() {
+ List<QueryKillingStrategy> strategies = Arrays.asList(
+ new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE),
+ new ScanEntriesThresholdStrategy(Long.MAX_VALUE, 200L));
+ CompositeQueryKillingStrategy composite =
+ new CompositeQueryKillingStrategy(strategies,
CompositeQueryKillingStrategy.Mode.ANY);
+
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(101);
+ ctx.addDocsScanned(50);
+ assertTrue(composite.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testAnyModeNoKillWhenNoneTriggered() {
+ List<QueryKillingStrategy> strategies = Arrays.asList(
+ new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE),
+ new ScanEntriesThresholdStrategy(Long.MAX_VALUE, 200L));
+ CompositeQueryKillingStrategy composite =
+ new CompositeQueryKillingStrategy(strategies,
CompositeQueryKillingStrategy.Mode.ANY);
+
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(50);
+ ctx.addDocsScanned(100);
+ assertFalse(composite.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testAllModeNoKillWhenOnlyOneTriggered() {
+ List<QueryKillingStrategy> strategies = Arrays.asList(
+ new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE),
+ new ScanEntriesThresholdStrategy(Long.MAX_VALUE, 200L));
+ CompositeQueryKillingStrategy composite =
+ new CompositeQueryKillingStrategy(strategies,
CompositeQueryKillingStrategy.Mode.ALL);
+
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(101);
+ ctx.addDocsScanned(50);
+ assertFalse(composite.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testAllModeKillsWhenAllTriggered() {
+ List<QueryKillingStrategy> strategies = Arrays.asList(
+ new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE),
+ new ScanEntriesThresholdStrategy(Long.MAX_VALUE, 200L));
+ CompositeQueryKillingStrategy composite =
+ new CompositeQueryKillingStrategy(strategies,
CompositeQueryKillingStrategy.Mode.ALL);
+
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(101);
+ ctx.addDocsScanned(201);
+ assertTrue(composite.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testEmptyStrategiesNeverKills() {
+ CompositeQueryKillingStrategy composite =
+ new CompositeQueryKillingStrategy(Collections.emptyList(),
CompositeQueryKillingStrategy.Mode.ANY);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(Long.MAX_VALUE - 1);
+ assertFalse(composite.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testEmptyStrategiesAllModeNeverKills() {
+ CompositeQueryKillingStrategy composite =
+ new CompositeQueryKillingStrategy(Collections.emptyList(),
CompositeQueryKillingStrategy.Mode.ALL);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ assertFalse(composite.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testBuildKillReportDelegatesToTriggeringStrategy() {
+ List<QueryKillingStrategy> strategies = Arrays.asList(
+ new ScanEntriesThresholdStrategy(1000L, Long.MAX_VALUE),
+ new ScanEntriesThresholdStrategy(Long.MAX_VALUE, 500L));
+ CompositeQueryKillingStrategy composite =
+ new CompositeQueryKillingStrategy(strategies,
CompositeQueryKillingStrategy.Mode.ANY);
+
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addDocsScanned(501);
+ assertTrue(composite.shouldTerminate(ctx));
+
+ QueryKillReport report = composite.buildKillReport(ctx, "q1", "t1",
"cluster");
+ assertEquals(report.getTriggeringMetric(), "numDocsScanned");
+ assertEquals(report.getActualValue(), 501L);
+ }
+
+ @Test
+ public void testStrategiesSortedByPriority() {
+ // ScanEntriesThresholdStrategy has priority 10
+ // Both trigger, but the first in priority order should produce the report
+ ScanEntriesThresholdStrategy entriesStrategy = new
ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE);
+ ScanEntriesThresholdStrategy docsStrategy = new
ScanEntriesThresholdStrategy(Long.MAX_VALUE, 50L);
+
+ CompositeQueryKillingStrategy composite =
+ new CompositeQueryKillingStrategy(Arrays.asList(docsStrategy,
entriesStrategy),
+ CompositeQueryKillingStrategy.Mode.ANY);
+
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(101);
+ ctx.addDocsScanned(51);
+
+ // Both trigger, but result should come from the first that matches in
sorted order
+ assertTrue(composite.shouldTerminate(ctx));
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
new file mode 100644
index 00000000000..054197aeaed
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for {@link QueryKillReport}.
+ */
+public class QueryKillReportTest {
+
+ @Test
+ public void testSnapshotsValuesAtCreationTime() {
+ QueryScanCostContext context = new QueryScanCostContext();
+ context.addEntriesScannedInFilter(1000L);
+ context.addDocsScanned(500L);
+ context.addEntriesScannedPostFilter(200L);
+
+ QueryKillReport report = new QueryKillReport(
+ "queryId-1",
+ "myTable_OFFLINE",
+ "EntriesScannedInFilterStrategy",
+ "numEntriesScannedInFilter",
+ 1000L,
+ 800L,
+ "TABLE_CONFIG",
+ context
+ );
+
+ // Mutate context after report creation — report must be unaffected
+ context.addEntriesScannedInFilter(99999L);
+ context.addDocsScanned(99999L);
+ context.addEntriesScannedPostFilter(99999L);
+
+ assertEquals(report.getSnapshotEntriesScannedInFilter(), 1000L);
+ assertEquals(report.getSnapshotDocsScanned(), 500L);
+ assertEquals(report.getSnapshotEntriesScannedPostFilter(), 200L);
+ }
+
+ @Test
+ public void testCustomerMessageContainsAllFields() {
+ QueryScanCostContext context = new QueryScanCostContext();
+ context.addEntriesScannedInFilter(1234567L);
+ context.addDocsScanned(100L);
+ context.addEntriesScannedPostFilter(50L);
+
+ QueryKillReport report = new QueryKillReport(
+ "queryId-abc",
+ "salesTable_REALTIME",
+ "EntriesScannedInFilterStrategy",
+ "numEntriesScannedInFilter",
+ 1234567L,
+ 1000000L,
+ "CLUSTER_CONFIG",
+ context
+ );
+
+ String msg = report.toCustomerMessage();
+
+ assertTrue(msg.contains("salesTable_REALTIME"), "Should contain table
name");
+ assertTrue(msg.contains("numEntriesScannedInFilter"), "Should contain
metric name");
+ assertTrue(msg.contains("1,234,567"), "Should contain actual value with
commas");
+ assertTrue(msg.contains("1,000,000"), "Should contain threshold with
commas");
+ assertTrue(msg.contains("CLUSTER_CONFIG"), "Should contain config source");
+ assertTrue(msg.contains("missing index") || msg.contains("index"),
+ "Should include advice about missing indexes");
+ }
+
+ @Test
+ public void testInternalLogMessageIsStructured() {
+ QueryScanCostContext context = new QueryScanCostContext();
+ context.addEntriesScannedInFilter(500L);
+ context.addDocsScanned(300L);
+ context.addEntriesScannedPostFilter(150L);
+
+ QueryKillReport report = new QueryKillReport(
+ "queryId-xyz",
+ "ordersTable_OFFLINE",
+ "DocsScannedStrategy",
+ "numDocsScanned",
+ 500L,
+ 400L,
+ "TABLE_CONFIG",
+ context
+ );
+
+ String msg = report.toInternalLogMessage();
+
+ assertTrue(msg.startsWith("QUERY_KILLED"), "Should start with QUERY_KILLED
prefix");
+ assertTrue(msg.contains("queryId=queryId-xyz"), "Should have queryId
key=value");
+ assertTrue(msg.contains("table=ordersTable_OFFLINE"), "Should have table
key=value");
+ assertTrue(msg.contains("metric=numDocsScanned"), "Should have metric
key=value");
+ // Internal log should use plain numbers, not comma-formatted
+ assertTrue(msg.contains("actual=500"), "Should have actual key=value
without commas");
+ assertTrue(msg.contains("threshold=400"), "Should have threshold key=value
without commas");
+ }
+
+ @Test
+ public void testGetters() {
+ QueryScanCostContext context = new QueryScanCostContext();
+ context.addEntriesScannedInFilter(100L);
+ context.addDocsScanned(50L);
+ context.addEntriesScannedPostFilter(25L);
+
+ QueryKillReport report = new QueryKillReport(
+ "queryId-getters",
+ "testTable_OFFLINE",
+ "TestStrategy",
+ "numEntriesScannedInFilter",
+ 100L,
+ 90L,
+ "SERVER_CONFIG",
+ context
+ );
+
+ assertEquals(report.getQueryId(), "queryId-getters");
+ assertEquals(report.getTableName(), "testTable_OFFLINE");
+ assertEquals(report.getStrategyName(), "TestStrategy");
+ assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter");
+ assertEquals(report.getActualValue(), 100L);
+ assertEquals(report.getThresholdValue(), 90L);
+ assertEquals(report.getConfigSource(), "SERVER_CONFIG");
+ assertEquals(report.getSnapshotEntriesScannedInFilter(), 100L);
+ assertEquals(report.getSnapshotDocsScanned(), 50L);
+ assertEquals(report.getSnapshotEntriesScannedPostFilter(), 25L);
+ assertTrue(report.getElapsedTimeMs() >= 0L, "Elapsed time must be
non-negative");
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
new file mode 100644
index 00000000000..5bde697b5a3
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+import
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for {@link QueryKillingManager}.
+ */
+public class QueryKillingManagerTest {
+
+ private ServerMetrics _serverMetrics;
+
+ @BeforeMethod
+ public void setUp() {
+ _serverMetrics = mock(ServerMetrics.class);
+ }
+
+ private QueryMonitorConfig buildConfig(String mode, long maxEntriesInFilter,
+ long maxDocsScanned) {
+ Map<String, Object> props = new HashMap<>();
+ props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
mode);
+
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
+ maxEntriesInFilter);
+
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED,
maxDocsScanned);
+
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER,
Long.MAX_VALUE);
+ PinotConfiguration pinotConfig = new PinotConfiguration(props);
+ return new QueryMonitorConfig(pinotConfig,
Runtime.getRuntime().maxMemory());
+ }
+
+ // --- Strategy built from config (init-time validation) ---
+
+ @Test
+ public void testInitBuildsStrategyFromConfig() {
+ QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ assertNotNull(manager.getActiveStrategy(), "Strategy should be built when
thresholds are configured");
+ assertTrue(manager.getActiveStrategy() instanceof
ScanEntriesThresholdStrategy);
+ }
+
+ @Test
+ public void testInitWithNoThresholdsLogsWarningAndReturnsNullStrategy() {
+ // All thresholds are MAX_VALUE — factory should return null
+ QueryMonitorConfig config = buildConfig("enforce", Long.MAX_VALUE,
Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ assertNull(manager.getActiveStrategy(),
+ "Strategy should be null when no thresholds are configured");
+ }
+
+ @Test
+ public void testInitWithDisabledReturnsNullStrategy() {
+ QueryMonitorConfig config = buildConfig("disabled", 100L, 100L);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ assertNull(manager.getActiveStrategy(),
+ "Strategy should be null when killing is disabled");
+ }
+
+ // --- Default strategy (ScanEntriesThresholdStrategy from config) ---
+
+ @Test
+ public void testDisabledDoesNotKill() {
+ QueryMonitorConfig config = buildConfig("disabled", 100L, 100L);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(500L);
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx, "q1", "testTable_OFFLINE",
null);
+ assertNull(execCtx.getTerminateException());
+ }
+
+ @Test
+ public void testEnabledKillsWhenThresholdExceeded() {
+ QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L);
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx, "q2", "testTable_OFFLINE",
null);
+ assertNotNull(execCtx.getTerminateException());
+ assertEquals(execCtx.getTerminateException().getErrorCode(),
QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED);
+ }
+
+ @Test
+ public void testLogOnlyDoesNotKill() {
+ QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L);
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx, "q3", "testTable_OFFLINE",
null);
+ assertNull(execCtx.getTerminateException());
+ }
+
+ @Test
+ public void testBelowThresholdDoesNotKill() {
+ QueryMonitorConfig config = buildConfig("enforce", 1000L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(500L);
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx, "q4", "testTable_OFFLINE",
null);
+ assertNull(execCtx.getTerminateException());
+ }
+
+ @Test
+ public void testAlreadyTerminatedSkipsEvaluation() {
+ QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ execCtx.terminate(QueryErrorCode.QUERY_CANCELLATION, "cancelled");
+
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L);
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx, "q5", "testTable_OFFLINE",
null);
+ assertNotNull(execCtx.getTerminateException());
+ }
+
+ @Test
+ public void testDocsScannedThreshold() {
+ QueryMonitorConfig config = buildConfig("enforce", Long.MAX_VALUE, 100L);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addDocsScanned(200L);
+
+ manager.checkAndKillIfNeeded(execCtx, scanCtx, "q8", "testTable_OFFLINE",
null);
+ assertNotNull(execCtx.getTerminateException());
+ }
+
+ // --- Table overrides via forQuery() ---
+
+ @Test
+ public void testTableOverrideRaisesThreshold() {
+ QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(200L); // Above cluster (100), below
table (500)
+
+ QueryConfig queryConfig = new QueryConfig(null, null, null, null, null,
null, 500L, null, null);
+ manager.checkAndKillIfNeeded(execCtx, scanCtx, "q6", "testTable_OFFLINE",
queryConfig);
+ assertNull(execCtx.getTerminateException(),
+ "Table override should raise threshold, preventing kill");
+ }
+
+ @Test
+ public void testTableOverrideLowersThreshold() {
+ QueryMonitorConfig config = buildConfig("enforce", 1000L, Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ scanCtx.addEntriesScannedInFilter(100L); // Below cluster (1000), above
table (50)
+
+ QueryConfig queryConfig = new QueryConfig(null, null, null, null, null,
null, 50L, null, null);
+ manager.checkAndKillIfNeeded(execCtx, scanCtx, "q7", "testTable_OFFLINE",
queryConfig);
+ assertNotNull(execCtx.getTerminateException(),
+ "Table override should lower threshold, causing kill");
+ }
+
+ // --- Custom strategy factory pluggability ---
+
+ @Test
+ public void testCustomFactoryClassFromConfig() {
+ // Configure a custom factory class name
+ Map<String, Object> props = new HashMap<>();
+ props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
+ "enforce");
+
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME,
+ AlwaysKillStrategyFactory.class.getName());
+ PinotConfiguration pinotConfig = new PinotConfiguration(props);
+ QueryMonitorConfig config = new QueryMonitorConfig(pinotConfig,
Runtime.getRuntime().maxMemory());
+
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ assertNotNull(manager.getActiveStrategy(), "Custom factory should create a
strategy");
+
+ // Should kill even with zero scan entries (AlwaysKillStrategy always
kills)
+ QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+ QueryScanCostContext scanCtx = new QueryScanCostContext();
+ manager.checkAndKillIfNeeded(execCtx, scanCtx, "q10", "testTable_OFFLINE",
null);
+ assertNotNull(execCtx.getTerminateException(),
+ "Custom AlwaysKillStrategy should kill regardless of scan counts");
+ }
+
+ @Test
+ public void testInvalidFactoryClassFallsBackGracefully() {
+ Map<String, Object> props = new HashMap<>();
+ props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
+ "enforce");
+
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME,
+ "com.nonexistent.FakeFactory");
+
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
100L);
+ PinotConfiguration pinotConfig = new PinotConfiguration(props);
+ QueryMonitorConfig config = new QueryMonitorConfig(pinotConfig,
Runtime.getRuntime().maxMemory());
+
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+
+ // Should not crash — falls back to default factory
+ assertNotNull(manager.getActiveStrategy(),
+ "Invalid factory should fall back to default
ScanEntriesThresholdStrategy");
+ assertTrue(manager.getActiveStrategy() instanceof
ScanEntriesThresholdStrategy);
+ }
+
+ @Test
+ public void testRebuildStrategyPicksUpConfigChanges() {
+ // Start with no thresholds
+ QueryMonitorConfig config1 = buildConfig("enforce", Long.MAX_VALUE,
Long.MAX_VALUE);
+ AtomicReference<QueryMonitorConfig> configRef = new
AtomicReference<>(config1);
+ QueryKillingManager manager = new QueryKillingManager(configRef,
_serverMetrics);
+ manager.rebuildStrategy();
+ assertNull(manager.getActiveStrategy(), "No thresholds = no strategy");
+
+ // Update config with thresholds
+ QueryMonitorConfig config2 = buildConfig("enforce", 100L, Long.MAX_VALUE);
+ configRef.set(config2);
+ manager.rebuildStrategy();
+ assertNotNull(manager.getActiveStrategy(), "After config update, strategy
should be built");
+ }
+
+ // --- Test fixtures for pluggable strategy ---
+
+ /**
+ * A test strategy that always kills — used to verify custom factory loading.
+ */
+ public static class AlwaysKillStrategy implements QueryKillingStrategy {
+ @Override
+ public boolean shouldTerminate(QueryScanCostContext context) {
+ return true;
+ }
+
+ @Override
+ public QueryKillReport buildKillReport(QueryScanCostContext context,
+ String queryId, String tableName, String configSource) {
+ return new QueryKillReport(queryId, tableName, "AlwaysKillStrategy",
+ "always", 0, 0, configSource, context);
+ }
+ }
+
+ /**
+ * A test factory that creates an AlwaysKillStrategy — loaded by class name
via config.
+ */
+ public static class AlwaysKillStrategyFactory implements
QueryKillingStrategyFactory {
+ @Override
+ public QueryKillingStrategy create(QueryMonitorConfig config) {
+ return new AlwaysKillStrategy();
+ }
+
+ @Override
+ public String getName() {
+ return "AlwaysKillStrategyFactory";
+ }
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
new file mode 100644
index 00000000000..48785498162
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing.strategy;
+
+import org.apache.pinot.core.query.killing.QueryKillReport;
+import org.apache.pinot.core.query.killing.QueryKillingStrategy;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class ScanEntriesThresholdStrategyTest {
+
+ @Test
+ public void testBelowThresholdDoesNotKill() {
+ ScanEntriesThresholdStrategy strategy = new
ScanEntriesThresholdStrategy(100_000_000L, 10_000_000L);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(50_000_000);
+ ctx.addDocsScanned(5_000_000);
+ assertFalse(strategy.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testAtThresholdDoesNotKill() {
+ ScanEntriesThresholdStrategy strategy = new
ScanEntriesThresholdStrategy(100_000_000L, 10_000_000L);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(100_000_000);
+ assertFalse(strategy.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testAboveEntriesScannedThresholdKills() {
+ ScanEntriesThresholdStrategy strategy = new
ScanEntriesThresholdStrategy(100_000_000L, Long.MAX_VALUE);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(100_000_001);
+ assertTrue(strategy.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testAboveDocsScannedThresholdKills() {
+ ScanEntriesThresholdStrategy strategy = new
ScanEntriesThresholdStrategy(Long.MAX_VALUE, 10_000_000L);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addDocsScanned(10_000_001);
+ assertTrue(strategy.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testMaxValueThresholdEffectivelyDisables() {
+ ScanEntriesThresholdStrategy strategy = new
ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(999_999_999_999L);
+ ctx.addDocsScanned(999_999_999_999L);
+ assertFalse(strategy.shouldTerminate(ctx));
+ }
+
+ @Test
+ public void testBuildKillReportForEntriesScanned() {
+ ScanEntriesThresholdStrategy strategy = new
ScanEntriesThresholdStrategy(100_000_000L, Long.MAX_VALUE);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(150_000_000);
+
+ QueryKillReport report = strategy.buildKillReport(ctx, "q1", "myTable",
"cluster");
+ assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter");
+ assertEquals(report.getActualValue(), 150_000_000L);
+ assertEquals(report.getThresholdValue(), 100_000_000L);
+ }
+
+ @Test
+ public void testBuildKillReportForDocsScanned() {
+ ScanEntriesThresholdStrategy strategy = new
ScanEntriesThresholdStrategy(Long.MAX_VALUE, 10_000_000L);
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addDocsScanned(15_000_000);
+
+ QueryKillReport report = strategy.buildKillReport(ctx, "q2", "myTable",
"table:myTable");
+ assertEquals(report.getTriggeringMetric(), "numDocsScanned");
+ assertEquals(report.getActualValue(), 15_000_000L);
+ assertEquals(report.getThresholdValue(), 10_000_000L);
+ }
+
+ @Test
+ public void testPriority() {
+ ScanEntriesThresholdStrategy strategy = new
ScanEntriesThresholdStrategy(100L, 100L);
+ assertEquals(strategy.priority(), 10);
+ }
+
+ @Test
+ public void testEitherMetricCanTrigger() {
+ ScanEntriesThresholdStrategy strategy = new
ScanEntriesThresholdStrategy(100L, 200L);
+
+ // Only entries exceeds
+ QueryScanCostContext ctx1 = new QueryScanCostContext();
+ ctx1.addEntriesScannedInFilter(101);
+ ctx1.addDocsScanned(50);
+ assertTrue(strategy.shouldTerminate(ctx1));
+
+ // Only docs exceeds
+ QueryScanCostContext ctx2 = new QueryScanCostContext();
+ ctx2.addEntriesScannedInFilter(50);
+ ctx2.addDocsScanned(201);
+ assertTrue(strategy.shouldTerminate(ctx2));
+ }
+
+ // --- forQuery() table override tests ---
+
+ @Test
+ public void testForQueryReturnsThisWhenNoOverrides() {
+ ScanEntriesThresholdStrategy strategy = new
ScanEntriesThresholdStrategy(100L, 200L);
+ QueryKillingStrategy result = strategy.forQuery(null, null);
+ assertTrue(result == strategy, "forQuery(null) should return same
instance");
+
+ QueryConfig emptyConfig = new QueryConfig(null, null, null, null, null,
null, null, null, null);
+ result = strategy.forQuery(emptyConfig, null);
+ assertTrue(result == strategy, "forQuery with no scan overrides should
return same instance");
+ }
+
+ @Test
+ public void testForQueryAppliesTableOverride() {
+ ScanEntriesThresholdStrategy strategy = new
ScanEntriesThresholdStrategy(100L, 200L);
+ QueryConfig queryConfig = new QueryConfig(null, null, null, null, null,
null, 500L, null, null);
+
+ QueryKillingStrategy result = strategy.forQuery(queryConfig, null);
+ assertTrue(result != strategy, "forQuery with override should return new
instance");
+ assertTrue(result instanceof ScanEntriesThresholdStrategy);
+
+ ScanEntriesThresholdStrategy overridden = (ScanEntriesThresholdStrategy)
result;
+ assertEquals(overridden.getMaxEntriesScannedInFilter(), 500L);
+ assertEquals(overridden.getMaxDocsScanned(), 200L); // inherited from
original
+ }
+
+ // --- Factory tests ---
+
+ @Test
+ public void testFactoryCreatesStrategyWithThresholds() {
+ java.util.Map<String, Object> props = new java.util.HashMap<>();
+ props.put("accounting.scan.based.killing.enabled", true);
+ props.put("accounting.scan.based.killing.max.entries.scanned.in.filter",
500_000_000L);
+ props.put("accounting.scan.based.killing.max.docs.scanned", 50_000_000L);
+ org.apache.pinot.spi.env.PinotConfiguration pinotConfig = new
org.apache.pinot.spi.env.PinotConfiguration(props);
+ org.apache.pinot.core.accounting.QueryMonitorConfig config =
+ new org.apache.pinot.core.accounting.QueryMonitorConfig(pinotConfig,
1_000_000_000L);
+
+ ScanEntriesThresholdStrategy.Factory factory = new
ScanEntriesThresholdStrategy.Factory();
+ QueryKillingStrategy strategy = factory.create(config);
+ assertNotNull(strategy);
+ assertTrue(strategy instanceof ScanEntriesThresholdStrategy);
+ }
+
+ @Test
+ public void testFactoryReturnsNullWhenNoThresholds() {
+ java.util.Map<String, Object> props = new java.util.HashMap<>();
+ props.put("accounting.scan.based.killing.enabled", true);
+ // No thresholds set — defaults are Long.MAX_VALUE
+ org.apache.pinot.spi.env.PinotConfiguration pinotConfig = new
org.apache.pinot.spi.env.PinotConfiguration(props);
+ org.apache.pinot.core.accounting.QueryMonitorConfig config =
+ new org.apache.pinot.core.accounting.QueryMonitorConfig(pinotConfig,
1_000_000_000L);
+
+ ScanEntriesThresholdStrategy.Factory factory = new
ScanEntriesThresholdStrategy.Factory();
+ QueryKillingStrategy strategy = factory.create(config);
+ assertNull(strategy, "Factory should return null when no thresholds are
configured");
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index eda5859962a..b5312fa72e3 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2296,7 +2296,8 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// Add expression override
TableConfig tableConfig = createOfflineTableConfig();
tableConfig.setQueryConfig(
- new QueryConfig(null, null, null, Map.of("DaysSinceEpoch * 24",
"NewAddedDerivedHoursSinceEpoch"), null, null));
+ new QueryConfig(null, null, null, Map.of("DaysSinceEpoch * 24",
"NewAddedDerivedHoursSinceEpoch"), null,
+ null));
updateTableConfig(tableConfig);
TestUtils.waitForCondition(aVoid -> {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
index b2d6c28bead..093b7c340fa 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
@@ -56,18 +56,41 @@ public class QueryConfig extends BaseJsonConfig {
// Indicates the maximum length of the serialized response per server for a
query.
private final Long _maxServerResponseSizeBytes;
+ private final Long _maxEntriesScannedInFilter;
+
+ private final Long _maxDocsScanned;
+
+ private final Long _maxEntriesScannedPostFilter;
+
+
+ public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy,
+ @Nullable Boolean useApproximateFunction, @Nullable Map<String, String>
expressionOverrideMap,
+ @Nullable Long maxQueryResponseSizeBytes, @Nullable Long
maxServerResponseSizeBytes) {
+ this(timeoutMs, disableGroovy, useApproximateFunction,
expressionOverrideMap,
+ maxQueryResponseSizeBytes, maxServerResponseSizeBytes, null, null,
null);
+ }
+
@JsonCreator
public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs,
@JsonProperty("disableGroovy") @Nullable Boolean disableGroovy,
@JsonProperty("useApproximateFunction") @Nullable Boolean
useApproximateFunction,
@JsonProperty("expressionOverrideMap") @Nullable Map<String, String>
expressionOverrideMap,
@JsonProperty("maxQueryResponseSizeBytes") @Nullable Long
maxQueryResponseSizeBytes,
- @JsonProperty("maxServerResponseSizeBytes") @Nullable Long
maxServerResponseSizeBytes) {
+ @JsonProperty("maxServerResponseSizeBytes") @Nullable Long
maxServerResponseSizeBytes,
+ @JsonProperty("maxEntriesScannedInFilter") @Nullable Long
maxEntriesScannedInFilter,
+ @JsonProperty("maxDocsScanned") @Nullable Long maxDocsScanned,
+ @JsonProperty("maxEntriesScannedPostFilter") @Nullable Long
maxEntriesScannedPostFilter) {
Preconditions.checkArgument(timeoutMs == null || timeoutMs > 0, "Invalid
'timeoutMs': %s", timeoutMs);
Preconditions.checkArgument(maxQueryResponseSizeBytes == null ||
maxQueryResponseSizeBytes > 0,
"Invalid 'maxQueryResponseSizeBytes': %s", maxQueryResponseSizeBytes);
Preconditions.checkArgument(maxServerResponseSizeBytes == null ||
maxServerResponseSizeBytes > 0,
"Invalid 'maxServerResponseSizeBytes': %s",
maxServerResponseSizeBytes);
+ Preconditions.checkArgument(maxEntriesScannedInFilter == null ||
maxEntriesScannedInFilter > 0,
+ "Invalid 'maxEntriesScannedInFilter': %s", maxEntriesScannedInFilter);
+ Preconditions.checkArgument(maxDocsScanned == null || maxDocsScanned > 0,
+ "Invalid 'maxDocsScanned': %s", maxDocsScanned);
+ Preconditions.checkArgument(maxEntriesScannedPostFilter == null ||
maxEntriesScannedPostFilter > 0,
+ "Invalid 'maxEntriesScannedPostFilter': %s",
maxEntriesScannedPostFilter);
_timeoutMs = timeoutMs;
_disableGroovy = disableGroovy;
@@ -75,6 +98,9 @@ public class QueryConfig extends BaseJsonConfig {
_expressionOverrideMap = expressionOverrideMap;
_maxQueryResponseSizeBytes = maxQueryResponseSizeBytes;
_maxServerResponseSizeBytes = maxServerResponseSizeBytes;
+ _maxEntriesScannedInFilter = maxEntriesScannedInFilter;
+ _maxDocsScanned = maxDocsScanned;
+ _maxEntriesScannedPostFilter = maxEntriesScannedPostFilter;
}
@Nullable
@@ -112,4 +138,22 @@ public class QueryConfig extends BaseJsonConfig {
public Long getMaxServerResponseSizeBytes() {
return _maxServerResponseSizeBytes;
}
+
+ @Nullable
+ @JsonProperty("maxEntriesScannedInFilter")
+ public Long getMaxEntriesScannedInFilter() {
+ return _maxEntriesScannedInFilter;
+ }
+
+ @Nullable
+ @JsonProperty("maxDocsScanned")
+ public Long getMaxDocsScanned() {
+ return _maxDocsScanned;
+ }
+
+ @Nullable
+ @JsonProperty("maxEntriesScannedPostFilter")
+ public Long getMaxEntriesScannedPostFilter() {
+ return _maxEntriesScannedPostFilter;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
index ea2629b7488..1786dfc977c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
@@ -47,6 +47,7 @@ public enum QueryErrorCode {
SERVER_SEGMENT_MISSING(235, "ServerSegmentMissing",
Response.Status.NOT_FOUND),
QUERY_SCHEDULING_TIMEOUT(240, "QuerySchedulingTimeoutError",
Response.Status.REQUEST_TIMEOUT),
SERVER_RESOURCE_LIMIT_EXCEEDED(245, "ServerResourceLimitExceededError",
Response.Status.SERVICE_UNAVAILABLE),
+ QUERY_SCAN_LIMIT_EXCEEDED(246, "QueryScanLimitExceededError",
Response.Status.BAD_REQUEST),
EXECUTION_TIMEOUT(250, "ExecutionTimeoutError",
Response.Status.REQUEST_TIMEOUT),
BROKER_SEGMENT_UNAVAILABLE(305, "", Response.Status.SERVICE_UNAVAILABLE),
BROKER_TIMEOUT(400, "BrokerTimeoutError", Response.Status.REQUEST_TIMEOUT),
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryScanCostContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryScanCostContext.java
new file mode 100644
index 00000000000..aa1e6b5e825
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryScanCostContext.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.query;
+
+import java.util.concurrent.atomic.LongAdder;
+
+
+/**
+ * Lightweight, thread-safe accumulator for query scan cost metrics.
+ *
+ * <p>One instance is created per query when scan-based killing is enabled.
+ * Attached to {@link QueryExecutionContext} and shared across all segment
+ * worker threads executing that query.</p>
+ *
+ */
+public final class QueryScanCostContext {
+ private final LongAdder _numEntriesScannedInFilter = new LongAdder();
+ private final LongAdder _numDocsScanned = new LongAdder();
+ private final LongAdder _numEntriesScannedPostFilter = new LongAdder();
+ private final long _startTimeMs = System.currentTimeMillis();
+
+ /**
+ * Increment entries scanned during filter evaluation.
+ * Called from {@code DocIdSetOperator} after each block.
+ */
+ public void addEntriesScannedInFilter(long count) {
+ _numEntriesScannedInFilter.add(count);
+ }
+
+ /**
+ * Increment documents scanned (passed filter, processed by query operator).
+ * Called from {@code AggregationOperator}, {@code SelectionOnlyOperator},
+ * {@code GroupByOperator} after each value block.
+ */
+ public void addDocsScanned(long count) {
+ _numDocsScanned.add(count);
+ }
+
+ /**
+ * Increment entries scanned post-filter (docs * projected columns).
+ * Called from query operators after projection.
+ */
+ public void addEntriesScannedPostFilter(long count) {
+ _numEntriesScannedPostFilter.add(count);
+ }
+
+ public long getNumEntriesScannedInFilter() {
+ return _numEntriesScannedInFilter.sum();
+ }
+
+ public long getNumDocsScanned() {
+ return _numDocsScanned.sum();
+ }
+
+ public long getNumEntriesScannedPostFilter() {
+ return _numEntriesScannedPostFilter.sum();
+ }
+
+ /**
+ * Returns wall-clock time elapsed since this context was created.
+ */
+ public long getElapsedTimeMs() {
+ return System.currentTimeMillis() - _startTimeMs;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 30c074a422c..fbe1fe743f3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1874,6 +1874,57 @@ public class CommonConstants {
@Deprecated(since = "1.6.0", forRemoval = true)
public static final String CONFIG_OF_SECONDARY_WORKLOAD_CPU_PERCENTAGE =
"accounting.secondary.workload.cpu.percentage";
+
+ // Scan-based query killing
+ public enum ScanKillingMode {
+ DISABLED("disabled"),
+ LOG_ONLY("logOnly"),
+ ENFORCE("enforce");
+
+ private final String _configValue;
+
+ ScanKillingMode(String configValue) {
+ _configValue = configValue;
+ }
+
+ public String getConfigValue() {
+ return _configValue;
+ }
+
+ /**
+ * Parses a config string into a {@link ScanKillingMode}.
Case-insensitive.
+ * Returns {@code null} if the value is not recognized.
+ */
+ public static ScanKillingMode fromConfigValue(String value) {
+ if (value == null) {
+ return null;
+ }
+ for (ScanKillingMode mode : values()) {
+ if (mode._configValue.equalsIgnoreCase(value)) {
+ return mode;
+ }
+ }
+ return null;
+ }
+ }
+
+ public static final String CONFIG_OF_SCAN_BASED_KILLING_MODE =
"accounting.scan.based.killing.mode";
+ public static final ScanKillingMode DEFAULT_SCAN_BASED_KILLING_MODE =
ScanKillingMode.DISABLED;
+
+ public static final String
CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME =
+ "accounting.scan.based.killing.strategy.factory.class.name";
+
+ public static final String
CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER =
+ "accounting.scan.based.killing.max.entries.scanned.in.filter";
+ public static final long
DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER = Long.MAX_VALUE;
+
+ public static final String CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED =
+ "accounting.scan.based.killing.max.docs.scanned";
+ public static final long DEFAULT_SCAN_BASED_KILLING_MAX_DOCS_SCANNED =
Long.MAX_VALUE;
+
+ public static final String
CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER =
+ "accounting.scan.based.killing.max.entries.scanned.post.filter";
+ public static final long
DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER = Long.MAX_VALUE;
}
public static class ExecutorService {
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
new file mode 100644
index 00000000000..5d6ea187d2d
--- /dev/null
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+public class QueryConfigScanKillingTest {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @Test
+ public void testScanFieldsDefaultToNull() {
+ QueryConfig config = new QueryConfig(null, null, null, null, null, null,
null, null, null);
+ assertNull(config.getMaxEntriesScannedInFilter());
+ assertNull(config.getMaxDocsScanned());
+ assertNull(config.getMaxEntriesScannedPostFilter());
+ }
+
+ @Test
+ public void testScanFieldsSetExplicitly() {
+ QueryConfig config = new QueryConfig(null, null, null, null, null, null,
+ 500_000_000L, 50_000_000L, 100_000_000L);
+ assertEquals(config.getMaxEntriesScannedInFilter(),
Long.valueOf(500_000_000L));
+ assertEquals(config.getMaxDocsScanned(), Long.valueOf(50_000_000L));
+ assertEquals(config.getMaxEntriesScannedPostFilter(),
Long.valueOf(100_000_000L));
+ }
+
+ @Test
+ public void testJsonSerializationWithScanFields()
+ throws Exception {
+ QueryConfig config = new QueryConfig(30000L, null, null, null, null, null,
+ 500_000_000L, 50_000_000L, null);
+
+ String json = OBJECT_MAPPER.writeValueAsString(config);
+ QueryConfig deserialized = OBJECT_MAPPER.readValue(json,
QueryConfig.class);
+
+ assertEquals(deserialized.getTimeoutMs(), Long.valueOf(30000L));
+ assertEquals(deserialized.getMaxEntriesScannedInFilter(),
Long.valueOf(500_000_000L));
+ assertEquals(deserialized.getMaxDocsScanned(), Long.valueOf(50_000_000L));
+ assertNull(deserialized.getMaxEntriesScannedPostFilter());
+ }
+
+ @Test
+ public void testJsonDeserializationWithoutScanFields()
+ throws Exception {
+ String json = "{\"timeoutMs\": 5000}";
+ QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class);
+ assertEquals(config.getTimeoutMs(), Long.valueOf(5000L));
+ assertNull(config.getMaxEntriesScannedInFilter());
+ assertNull(config.getMaxDocsScanned());
+ assertNull(config.getMaxEntriesScannedPostFilter());
+ }
+
+ @Test
+ public void testJsonDeserializationWithOnlyScanFields()
+ throws Exception {
+ String json = "{\"maxEntriesScannedInFilter\": 1000000000,
\"maxDocsScanned\": 100000000}";
+ QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class);
+ assertNull(config.getTimeoutMs());
+ assertEquals(config.getMaxEntriesScannedInFilter(),
Long.valueOf(1_000_000_000L));
+ assertEquals(config.getMaxDocsScanned(), Long.valueOf(100_000_000L));
+ assertNull(config.getMaxEntriesScannedPostFilter());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testNegativeMaxEntriesScannedInFilterThrows() {
+ new QueryConfig(null, null, null, null, null, null, -1L, null, null);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testZeroMaxDocsScannedThrows() {
+ new QueryConfig(null, null, null, null, null, null, null, 0L, null);
+ }
+}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/query/QueryScanCostContextTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/query/QueryScanCostContextTest.java
new file mode 100644
index 00000000000..6f5a0f3da62
--- /dev/null
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/query/QueryScanCostContextTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.query;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class QueryScanCostContextTest {
+
+ @Test
+ public void testInitialValuesAreZero() {
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ assertEquals(ctx.getNumEntriesScannedInFilter(), 0L);
+ assertEquals(ctx.getNumDocsScanned(), 0L);
+ assertEquals(ctx.getNumEntriesScannedPostFilter(), 0L);
+ assertTrue(ctx.getElapsedTimeMs() >= 0);
+ }
+
+ @Test
+ public void testSingleThreadIncrements() {
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(100);
+ ctx.addEntriesScannedInFilter(200);
+ assertEquals(ctx.getNumEntriesScannedInFilter(), 300L);
+
+ ctx.addDocsScanned(50);
+ ctx.addDocsScanned(25);
+ assertEquals(ctx.getNumDocsScanned(), 75L);
+
+ ctx.addEntriesScannedPostFilter(1000);
+ assertEquals(ctx.getNumEntriesScannedPostFilter(), 1000L);
+ }
+
+ @Test
+ public void testElapsedTime()
+ throws InterruptedException {
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ Thread.sleep(50);
+ assertTrue(ctx.getElapsedTimeMs() >= 40, "Elapsed time should be at least
~50ms");
+ }
+
+ @Test
+ public void testConcurrentWritesAreCorrect()
+ throws InterruptedException {
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ int numThreads = 16;
+ long incrementsPerThread = 100_000;
+
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch latch = new CountDownLatch(numThreads);
+
+ for (int i = 0; i < numThreads; i++) {
+ executor.submit(() -> {
+ for (long j = 0; j < incrementsPerThread; j++) {
+ ctx.addEntriesScannedInFilter(1);
+ ctx.addDocsScanned(1);
+ }
+ latch.countDown();
+ });
+ }
+ latch.await();
+ executor.shutdown();
+
+ long expected = numThreads * incrementsPerThread;
+ assertEquals(ctx.getNumEntriesScannedInFilter(), expected);
+ assertEquals(ctx.getNumDocsScanned(), expected);
+ }
+
+ @Test
+ public void testLargeIncrements() {
+ QueryScanCostContext ctx = new QueryScanCostContext();
+ ctx.addEntriesScannedInFilter(500_000_000L);
+ ctx.addEntriesScannedInFilter(500_000_000L);
+ assertEquals(ctx.getNumEntriesScannedInFilter(), 1_000_000_000L);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]