This is an automated email from the ASF dual-hosted git repository.

shauryachats pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fd31838a1e [pinot-server/ proactive-query-killing] (1/2) add initial 
SPI implementation for supporting query killing based on Scan Cost (#18102)
5fd31838a1e is described below

commit 5fd31838a1e7b818341ed01db6e6adfede87e410
Author: Anurag Rai <[email protected]>
AuthorDate: Tue May 12 06:01:10 2026 +0530

    [pinot-server/ proactive-query-killing] (1/2) add initial SPI 
implementation for supporting query killing based on Scan Cost (#18102)
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   3 +
 .../pinot/controller/helix/TableCacheTest.java     |   3 +-
 .../pinot/core/accounting/QueryMonitorConfig.java  | 145 +++++++++
 .../killing/CompositeQueryKillingStrategy.java     |  84 ++++++
 .../pinot/core/query/killing/QueryKillReport.java  | 148 +++++++++
 .../core/query/killing/QueryKillingManager.java    | 185 ++++++++++++
 .../core/query/killing/QueryKillingStrategy.java   |  79 +++++
 .../query/killing/QueryKillingStrategyFactory.java |  58 ++++
 .../strategy/ScanEntriesThresholdStrategy.java     | 157 ++++++++++
 .../QueryMonitorConfigScanKillingTest.java         | 191 ++++++++++++
 .../killing/CompositeQueryKillingStrategyTest.java | 143 +++++++++
 .../core/query/killing/QueryKillReportTest.java    | 149 ++++++++++
 .../query/killing/QueryKillingManagerTest.java     | 331 +++++++++++++++++++++
 .../strategy/ScanEntriesThresholdStrategyTest.java | 183 ++++++++++++
 .../tests/OfflineClusterIntegrationTest.java       |   3 +-
 .../apache/pinot/spi/config/table/QueryConfig.java |  46 ++-
 .../apache/pinot/spi/exception/QueryErrorCode.java |   1 +
 .../pinot/spi/query/QueryScanCostContext.java      |  81 +++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  51 ++++
 .../config/table/QueryConfigScanKillingTest.java   |  95 ++++++
 .../pinot/spi/query/QueryScanCostContextTest.java  |  98 ++++++
 21 files changed, 2230 insertions(+), 4 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 2e40a399f98..49ea60bb56c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -115,6 +115,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   READINESS_CHECK_OK_CALLS("readinessCheck", true),
   READINESS_CHECK_BAD_CALLS("readinessCheck", true),
   QUERIES_KILLED("query", true),
+  QUERIES_KILLED_SCAN("queriesKilledScan", true),
+  QUERIES_KILLED_SCAN_DRY_RUN("queriesKilledScanDryRun", true),
+  QUERIES_KILLED_SCAN_ERROR("queriesKilledScanError", true),
   QUERIES_THROTTLED("query", true),
   HEAP_CRITICAL_LEVEL_EXCEEDED("count", true),
   HEAP_PANIC_LEVEL_EXCEEDED("count", true),
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index 6fcf472a69d..b8242dff1ae 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -218,8 +218,7 @@ public class TableCacheTest {
     logicalTableConfig = 
ControllerTest.getDummyLogicalTableConfig(LOGICAL_TABLE_NAME,
         List.of(OFFLINE_TABLE_NAME, ANOTHER_TABLE_OFFLINE), "DefaultTenant");
     logicalTableConfig.setQueryConfig(new QueryConfig(
-        1L, false, false, Map.of("DaysSinceEpoch * 24", 
"NewAddedDerivedHoursSinceEpoch"), 1L, 1L
-    ));
+        1L, false, false, Map.of("DaysSinceEpoch * 24", 
"NewAddedDerivedHoursSinceEpoch"), 1L, 1L));
     
TEST_INSTANCE.getHelixResourceManager().updateLogicalTableConfig(logicalTableConfig);
     TestUtils.waitForCondition(
         aVoid -> 
Objects.requireNonNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME))
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
index 280e989c3ca..920648ca506 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
@@ -18,13 +18,21 @@
  */
 package org.apache.pinot.core.accounting;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Accounting;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class QueryMonitorConfig {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryMonitorConfig.class);
+
   private final long _maxHeapSize;
 
   // don't kill a query if its memory footprint is below some ratio of 
_maxHeapSize
@@ -72,6 +80,16 @@ public class QueryMonitorConfig {
 
   private final boolean _workloadCostEnforcementEnabled;
 
+  private final ScanKillingMode _scanBasedKillingMode;
+
+  private final long _scanBasedKillingMaxEntriesScannedInFilter;
+
+  private final long _scanBasedKillingMaxDocsScanned;
+
+  private final long _scanBasedKillingMaxEntriesScannedPostFilter;
+
+  private final String _scanBasedKillingStrategyFactoryClassName;
+
   public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) {
     _maxHeapSize = maxHeapSize;
 
@@ -122,6 +140,22 @@ public class QueryMonitorConfig {
 
     _workloadCostEnforcementEnabled = 
config.getProperty(Accounting.Keys.WORKLOAD_ENABLE_COST_ENFORCEMENT,
         Accounting.DEFAULT_WORKLOAD_ENABLE_COST_ENFORCEMENT);
+
+    _scanBasedKillingMode = validateScanKillingMode(config.getProperty(
+        CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
+        
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MODE.getConfigValue()));
+    _scanBasedKillingMaxEntriesScannedInFilter = config.getProperty(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
+        
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER);
+    _scanBasedKillingMaxDocsScanned = config.getProperty(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED,
+        
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_DOCS_SCANNED);
+    _scanBasedKillingMaxEntriesScannedPostFilter = config.getProperty(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER,
+        
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER);
+    _scanBasedKillingStrategyFactoryClassName = config.getProperty(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME,
+        (String) null);
   }
 
   QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String> changedConfigs, 
Map<String, String> clusterConfigs) {
@@ -244,6 +278,88 @@ public class QueryMonitorConfig {
     } else {
       _workloadCostEnforcementEnabled = 
oldConfig._workloadCostEnforcementEnabled;
     }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE))
 {
+      _scanBasedKillingMode = 
validateScanKillingMode(clusterConfigs.getOrDefault(
+          CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
+          
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MODE.getConfigValue()));
+    } else {
+      _scanBasedKillingMode = oldConfig.getScanBasedKillingMode();
+    }
+
+    if (changedConfigs.contains(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER))
 {
+      _scanBasedKillingMaxEntriesScannedInFilter = parseLongOrDefault(
+          
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER),
+          
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
+          
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER);
+    } else {
+      _scanBasedKillingMaxEntriesScannedInFilter =
+          oldConfig.getScanBasedKillingMaxEntriesScannedInFilter();
+    }
+
+    if (changedConfigs.contains(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED)) {
+      _scanBasedKillingMaxDocsScanned = parseLongOrDefault(
+          
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED),
+          
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED,
+          
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_DOCS_SCANNED);
+    } else {
+      _scanBasedKillingMaxDocsScanned = 
oldConfig.getScanBasedKillingMaxDocsScanned();
+    }
+
+    if (changedConfigs.contains(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER))
 {
+      _scanBasedKillingMaxEntriesScannedPostFilter = parseLongOrDefault(
+          
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER),
+          
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER,
+          
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER);
+    } else {
+      _scanBasedKillingMaxEntriesScannedPostFilter =
+          oldConfig.getScanBasedKillingMaxEntriesScannedPostFilter();
+    }
+
+    if (changedConfigs.contains(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME))
 {
+      _scanBasedKillingStrategyFactoryClassName = clusterConfigs.getOrDefault(
+          
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME,
 null);
+    } else {
+      _scanBasedKillingStrategyFactoryClassName = 
oldConfig.getScanBasedKillingStrategyFactoryClassName();
+    }
+  }
+
+  /**
+   * Validates the scan-based killing mode. If the value is not one of the 
recognized modes
+   * (disabled, logOnly, enforce), logs an error and falls back to {@link 
ScanKillingMode#DISABLED}
+   * so the server continues to start normally.
+   */
+  private static ScanKillingMode validateScanKillingMode(String mode) {
+    ScanKillingMode parsed = ScanKillingMode.fromConfigValue(mode);
+    if (parsed != null) {
+      return parsed;
+    }
+    LOGGER.error("Invalid value '{}' for config '{}'. Valid values are: {}. "
+            + "Defaulting to '{}'. Scan-based killing will be disabled.",
+        mode, CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
+        Arrays.toString(ScanKillingMode.values()), 
ScanKillingMode.DISABLED.getConfigValue());
+    return ScanKillingMode.DISABLED;
+  }
+
+  /**
+   * Parses a long value from a config string, falling back to the default if 
the value
+   * is null, empty, or not a valid number.
+   */
+  private static long parseLongOrDefault(@Nullable String value, String 
configKey, long defaultValue) {
+    if (value == null || value.isEmpty()) {
+      return defaultValue;
+    }
+    try {
+      return Long.parseLong(value);
+    } catch (NumberFormatException e) {
+      LOGGER.error("Invalid numeric value '{}' for config '{}'. Defaulting to 
{}.",
+          value, configKey, defaultValue);
+      return defaultValue;
+    }
   }
 
   public long getMaxHeapSize() {
@@ -313,4 +429,33 @@ public class QueryMonitorConfig {
   public boolean isWorkloadCostEnforcementEnabled() {
     return _workloadCostEnforcementEnabled;
   }
+
+  public ScanKillingMode getScanBasedKillingMode() {
+    return _scanBasedKillingMode;
+  }
+
+  public boolean isScanBasedKillingEnabled() {
+    return _scanBasedKillingMode != ScanKillingMode.DISABLED;
+  }
+
+  public boolean isScanBasedKillingLogOnly() {
+    return _scanBasedKillingMode == ScanKillingMode.LOG_ONLY;
+  }
+
+  public long getScanBasedKillingMaxEntriesScannedInFilter() {
+    return _scanBasedKillingMaxEntriesScannedInFilter;
+  }
+
+  public long getScanBasedKillingMaxDocsScanned() {
+    return _scanBasedKillingMaxDocsScanned;
+  }
+
+  public long getScanBasedKillingMaxEntriesScannedPostFilter() {
+    return _scanBasedKillingMaxEntriesScannedPostFilter;
+  }
+
+  @Nullable
+  public String getScanBasedKillingStrategyFactoryClassName() {
+    return _scanBasedKillingStrategyFactoryClassName;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
new file mode 100644
index 00000000000..aa363d4d360
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+
+
+/**
+ * Combines multiple {@link QueryKillingStrategy} instances with AND/OR 
semantics.
+ *
+ * <p>Strategies are sorted by {@link QueryKillingStrategy#priority()} (lower 
= checked first).
+ * In {@link Mode#ANY} mode, the first strategy that triggers produces the 
kill report.
+ * In {@link Mode#ALL} mode, all strategies must trigger for a kill.</p>
+ *
+ */
+public class CompositeQueryKillingStrategy implements QueryKillingStrategy {
+
+  /** Composition mode for combining strategies. */
+  public enum Mode {
+    /** Kill if ANY strategy triggers (OR). */
+    ANY,
+    /** Kill only if ALL strategies trigger (AND). */
+    ALL
+  }
+
+  private final List<QueryKillingStrategy> _strategies;
+  private final Mode _mode;
+
+  public CompositeQueryKillingStrategy(List<QueryKillingStrategy> strategies, 
Mode mode) {
+    _strategies = strategies.stream()
+        .sorted(Comparator.comparingInt(QueryKillingStrategy::priority))
+        .collect(Collectors.toList());
+    _mode = mode;
+  }
+
+  @Override
+  public boolean shouldTerminate(QueryScanCostContext ctx) {
+    if (_mode == Mode.ANY) {
+      for (QueryKillingStrategy s : _strategies) {
+        if (s.shouldTerminate(ctx)) {
+          return true;
+        }
+      }
+      return false;
+    } else {
+      for (QueryKillingStrategy s : _strategies) {
+        if (!s.shouldTerminate(ctx)) {
+          return false;
+        }
+      }
+      return !_strategies.isEmpty();
+    }
+  }
+
+  @Override
+  public QueryKillReport buildKillReport(QueryScanCostContext ctx,
+      String queryId, String tableName, String configSource) {
+    for (QueryKillingStrategy s : _strategies) {
+      if (s.shouldTerminate(ctx)) {
+        return s.buildKillReport(ctx, queryId, tableName, configSource);
+      }
+    }
+    throw new IllegalStateException("buildKillReport called but no strategy 
triggered");
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
new file mode 100644
index 00000000000..d44d873188d
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import org.apache.pinot.spi.query.QueryScanCostContext;
+
+
+/**
+ * Immutable snapshot of a query-kill event
+ */
+public final class QueryKillReport {
+  private final String _queryId;
+  private final String _tableName;
+  private final String _strategyName;
+  private final String _triggeringMetric;
+  private final long _actualValue;
+  private final long _thresholdValue;
+  private final String _configSource;
+  private final long _snapshotEntriesScannedInFilter;
+  private final long _snapshotDocsScanned;
+  private final long _snapshotEntriesScannedPostFilter;
+  private final long _elapsedTimeMs;
+
+  /**
+   * Creates a {@code QueryKillReport} by snapshotting the current state of 
{@code context}.
+   *
+   * @param queryId          unique identifier of the killed query
+   * @param tableName        fully-qualified table name (e.g. {@code 
myTable_OFFLINE})
+   * @param strategyName     name of the kill strategy that triggered the kill
+   * @param triggeringMetric name of the metric that exceeded the threshold
+   * @param actualValue      observed metric value at kill time
+   * @param thresholdValue   configured threshold that was exceeded
+   * @param configSource     source of the threshold config (e.g. {@code 
TABLE_CONFIG})
+   * @param context          live scan-cost context; values are snapshotted 
immediately
+   */
+  public QueryKillReport(String queryId, String tableName, String 
strategyName, String triggeringMetric,
+      long actualValue, long thresholdValue, String configSource, 
QueryScanCostContext context) {
+    _queryId = queryId;
+    _tableName = tableName;
+    _strategyName = strategyName;
+    _triggeringMetric = triggeringMetric;
+    _actualValue = actualValue;
+    _thresholdValue = thresholdValue;
+    _configSource = configSource;
+    // Snapshot mutable LongAdder values immediately to decouple from live 
context
+    _snapshotEntriesScannedInFilter = context.getNumEntriesScannedInFilter();
+    _snapshotDocsScanned = context.getNumDocsScanned();
+    _snapshotEntriesScannedPostFilter = 
context.getNumEntriesScannedPostFilter();
+    _elapsedTimeMs = context.getElapsedTimeMs();
+  }
+
+  // ----- Getters -----
+
+  public String getQueryId() {
+    return _queryId;
+  }
+
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public String getStrategyName() {
+    return _strategyName;
+  }
+
+  public String getTriggeringMetric() {
+    return _triggeringMetric;
+  }
+
+  public long getActualValue() {
+    return _actualValue;
+  }
+
+  public long getThresholdValue() {
+    return _thresholdValue;
+  }
+
+  public String getConfigSource() {
+    return _configSource;
+  }
+
+  public long getSnapshotEntriesScannedInFilter() {
+    return _snapshotEntriesScannedInFilter;
+  }
+
+  public long getSnapshotDocsScanned() {
+    return _snapshotDocsScanned;
+  }
+
+  public long getSnapshotEntriesScannedPostFilter() {
+    return _snapshotEntriesScannedPostFilter;
+  }
+
+  public long getElapsedTimeMs() {
+    return _elapsedTimeMs;
+  }
+
+  // ----- Message formatters -----
+
+  /**
+   * Returns a user-facing message describing why the query was killed and 
what action to take.
+   *
+   * <p>Numbers are formatted with commas (e.g. {@code 1,234,567}) for 
readability.
+   * Includes actionable advice about adding a missing index to reduce scan 
cost.</p>
+   */
+  public String toCustomerMessage() {
+    return String.format(
+        "Query '%s' on table '%s' was killed because '%s' (%,d) exceeded the 
threshold (%,d) "
+            + "configured in %s. "
+            + "At kill time: entriesScannedInFilter=%,d, docsScanned=%,d, "
+            + "entriesScannedPostFilter=%,d, elapsedMs=%d. "
+            + "To reduce scan cost, consider adding a missing index (e.g. 
inverted or range index) "
+            + "on the filter columns.",
+        _queryId, _tableName, _triggeringMetric, _actualValue, 
_thresholdValue, _configSource,
+        _snapshotEntriesScannedInFilter, _snapshotDocsScanned, 
_snapshotEntriesScannedPostFilter, _elapsedTimeMs);
+  }
+
+  /**
+   * Returns a structured log line suitable for grep and alerting pipelines.
+   *
+   * <p>Format: {@code QUERY_KILLED key=value ...} with plain 
(non-comma-formatted) numbers.</p>
+   */
+  public String toInternalLogMessage() {
+    return String.format(
+        "QUERY_KILLED queryId=%s table=%s strategy=%s metric=%s actual=%d 
threshold=%d "
+            + "configSource=%s entriesScannedInFilter=%d docsScanned=%d "
+            + "entriesScannedPostFilter=%d elapsedMs=%d",
+        _queryId, _tableName, _strategyName, _triggeringMetric, _actualValue, 
_thresholdValue,
+        _configSource, _snapshotEntriesScannedInFilter, _snapshotDocsScanned,
+        _snapshotEntriesScannedPostFilter, _elapsedTimeMs);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
new file mode 100644
index 00000000000..ee8f0678113
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+import 
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Central manager for scan-based query killing. Owns the guard rails and 
delegates the
+ * actual kill decision to a {@link QueryKillingStrategy}.
+ *
+ * The default factory is {@link ScanEntriesThresholdStrategy.Factory}, which 
reads
+ * scan thresholds from {@link QueryMonitorConfig}. Custom factories can be 
configured
+ * via {@code accounting.scan.based.killing.strategy.factory.class.name}.
+ *
+ */
+public class QueryKillingManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryKillingManager.class);
+
+  private static volatile QueryKillingManager _instance;
+
+  private final AtomicReference<QueryMonitorConfig> _configRef;
+  private final ServerMetrics _serverMetrics;
+
+  /**
+   * Null if: killing is disabled, config is insufficient, or factory failed 
to load.
+   * Rebuilt when config changes via {@link #rebuildStrategy()}.
+   */
+  @Nullable
+  private volatile QueryKillingStrategy _strategy;
+
+  public QueryKillingManager(AtomicReference<QueryMonitorConfig> configRef, 
ServerMetrics serverMetrics) {
+    _configRef = configRef;
+    _serverMetrics = serverMetrics;
+  }
+
+  /**
+   * Initializes the singleton instance and builds the strategy from config.
+   * Called once during server startup.
+   */
+  public static void init(AtomicReference<QueryMonitorConfig> configRef, 
ServerMetrics serverMetrics) {
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
serverMetrics);
+    manager.rebuildStrategy();
+    _instance = manager;
+  }
+
+  @Nullable
+  public static QueryKillingManager getInstance() {
+    return _instance;
+  }
+
+  /**
+   * Rebuilds the strategy from the current config. Called at init and when
+   * cluster config changes (via the same onChange path that rebuilds 
QueryMonitorConfig).
+   */
+  public void rebuildStrategy() {
+    QueryMonitorConfig config = _configRef.get();
+    if (config == null || !config.isScanBasedKillingEnabled()) {
+      _strategy = null;
+      return;
+    }
+
+    try {
+      QueryKillingStrategyFactory factory = loadFactory(config);
+      _strategy = factory.create(config);
+      if (_strategy == null) {
+        LOGGER.warn("Scan-based killing is enabled but strategy factory '{}' 
returned null — "
+            + "required configuration may be missing. Scan-based killing will 
be effectively disabled.",
+            factory.getName());
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to initialize scan-based killing strategy. "
+          + "Scan-based killing will be disabled.", e);
+      _strategy = null;
+    }
+  }
+
+  /**
+   * Loads the strategy factory from config. If a custom factory class name is 
configured,
+   * loads it by reflection (following the same pattern as {@code 
ThreadAccountantUtils.createAccountant()}).
+   * Otherwise, returns the default {@link 
ScanEntriesThresholdStrategy.Factory}.
+   */
+  private QueryKillingStrategyFactory loadFactory(QueryMonitorConfig config) {
+    String factoryClassName = 
config.getScanBasedKillingStrategyFactoryClassName();
+    if (factoryClassName != null && !factoryClassName.isEmpty()) {
+      LOGGER.info("Loading custom query killing strategy factory: {}", 
factoryClassName);
+      try {
+        return (QueryKillingStrategyFactory) Class.forName(factoryClassName)
+            .getDeclaredConstructor().newInstance();
+      } catch (Exception e) {
+        LOGGER.error("Failed to load custom strategy factory '{}', falling 
back to default",
+            factoryClassName, e);
+      }
+    }
+    return new ScanEntriesThresholdStrategy.Factory();
+  }
+
+  /**
+   * Returns the active strategy. Visible for testing.
+   */
+  @Nullable
+  public QueryKillingStrategy getActiveStrategy() {
+    return _strategy;
+  }
+
+  /**
+   * Evaluates whether the query should be killed based on the active strategy.
+   *
+   * <p>Calls {@link QueryKillingStrategy#forQuery(QueryConfig, 
QueryMonitorConfig)}
+   * to resolve table-level overrides before evaluating.</p>
+   */
+  public void checkAndKillIfNeeded(QueryExecutionContext executionContext,
+      QueryScanCostContext scanCostContext, String queryId, String tableName,
+      @Nullable QueryConfig queryConfig) {
+    // no strategy means killing is disabled or unconfigured
+    QueryKillingStrategy strategy = _strategy;
+    if (strategy == null) {
+      return;
+    }
+
+    QueryMonitorConfig config = _configRef.get();
+    if (config == null || !config.isScanBasedKillingEnabled()) {
+      return;
+    }
+
+    // Prevent duplicate kills
+    if (executionContext.getTerminateException() != null) {
+      return;
+    }
+
+    try {
+      // Resolve per-query table overrides (returns same instance if no 
overrides)
+      QueryKillingStrategy queryStrategy = strategy.forQuery(queryConfig, 
config);
+
+      String configSource = (queryStrategy != strategy) ? "table:" + tableName 
: "cluster";
+
+      // Delegate to strategy
+      if (!queryStrategy.shouldTerminate(scanCostContext)) {
+        return;
+      }
+
+      QueryKillReport report = queryStrategy.buildKillReport(
+          scanCostContext, queryId, tableName, configSource);
+
+      if (config.isScanBasedKillingLogOnly()) {
+        LOGGER.info("Query killed in LogOnly mode: {}", 
report.toInternalLogMessage());
+        
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 
1);
+        return;
+      }
+
+      LOGGER.warn("Query Killed in enforce mode: {}", 
report.toInternalLogMessage());
+      executionContext.terminate(queryStrategy.getErrorCode(), 
report.toCustomerMessage());
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1);
+    } catch (Exception e) {
+      LOGGER.error("Error in scan-based killing evaluation for query {}", 
queryId, e);
+      
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR, 1);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
new file mode 100644
index 00000000000..4bf4390d59e
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+
+
+/**
+ * Determines whether a query should be terminated based on accumulated scan 
cost.
+ *
+ *
+ * <p>To create a custom strategy:
+ * <ol>
+ *   <li>Implement this interface</li>
+ *   <li>Implement {@link QueryKillingStrategyFactory} to create it from 
config</li>
+ *   <li>Set {@code accounting.scan.based.killing.strategy.factory.class.name}
+ *       to your factory class name</li>
+ * </ol>
+ */
+public interface QueryKillingStrategy {
+
+  /** Returns true if the query should be terminated immediately. */
+  boolean shouldTerminate(QueryScanCostContext context);
+
+  /**
+   * Builds a structured kill report with full context.
+   * Only called when {@link #shouldTerminate} returns true.
+   */
+  QueryKillReport buildKillReport(QueryScanCostContext context,
+      String queryId, String tableName, String configSource);
+
+  /** Error code for the termination response. */
+  default QueryErrorCode getErrorCode() {
+    return QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED;
+  }
+
+  /** Priority: lower number = checked first in composite strategies. */
+  default int priority() {
+    return 100;
+  }
+
+  /**
+   * Returns a query-specific variant of this strategy with table-level 
overrides applied.
+   * If no table overrides are relevant, returns {@code this} (no new 
allocation).
+   *
+   * <p>The default implementation returns {@code this}, meaning the strategy 
does not
+   * support table-level overrides. Strategies that support per-table 
thresholds
+   * (like {@link 
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy})
+   * should override this method.</p>
+   *
+   * @param queryConfig table-level query config (nullable — null means no 
table overrides)
+   * @param clusterConfig cluster-level config for resolving fallback 
thresholds
+   * @return this strategy or a new instance with resolved thresholds
+   */
+  default QueryKillingStrategy forQuery(@Nullable QueryConfig queryConfig,
+      QueryMonitorConfig clusterConfig) {
+    return this;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategyFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategyFactory.java
new file mode 100644
index 00000000000..cdab5a51de5
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategyFactory.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+
+
+/**
+ * Factory for creating {@link QueryKillingStrategy} instances from config.
+ *
+ * <p>Implementations are loaded by class name via the config key
+ * {@code accounting.scan.based.killing.strategy.factory.class.name}.
+ * Each factory reads whatever config keys it needs from {@link 
QueryMonitorConfig}
+ * and creates a strategy instance. If required config is missing, the factory
+ * returns {@code null} and logs a warning.</p>
+ *
+ * To add a new strategy:
+ * <ol>
+ *   <li>Implement {@link QueryKillingStrategy} with your kill logic</li>
+ *   <li>Implement this interface to create your strategy from config</li>
+ *   <li>Set the config key to your factory class name</li>
+ *   <li>Add your strategy-specific config keys to the server config</li>
+ * </ol>
+ *
+ * @see 
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy.Factory
+ */
+public interface QueryKillingStrategyFactory {
+
+  /**
+   * Creates a strategy from the current config. Returns {@code null} if 
required
+   * configuration is not present (e.g., no thresholds set), in which case the
+   * manager will log a warning and scan-based killing will be effectively 
disabled.
+   */
+  @Nullable
+  QueryKillingStrategy create(QueryMonitorConfig config);
+
+  /**
+   * Human-readable name of this factory for logging.
+   */
+  String getName();
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
new file mode 100644
index 00000000000..92b13dcdc60
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing.strategy;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+import org.apache.pinot.core.query.killing.QueryKillReport;
+import org.apache.pinot.core.query.killing.QueryKillingStrategy;
+import org.apache.pinot.core.query.killing.QueryKillingStrategyFactory;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Kills queries that exceed scan entry or doc thresholds.
+ *
+ * <p>Primary strategy for proactive query killing. Checks
+ * {@code numEntriesScannedInFilter} (primary signal — catches expensive filter
+ * predicates / missing indexes) and {@code numDocsScanned} (secondary signal —
+ * catches large aggregations).</p>
+ *
+ * <p>A threshold of {@link Long#MAX_VALUE} disables that metric's check.</p>
+ *
+ *
+ * <p>Supports table-level overrides via {@link #forQuery(QueryConfig, 
QueryMonitorConfig)}.
+ * When a table has specific thresholds in its {@link QueryConfig}, a new 
instance is
+ * created with the resolved values. Otherwise, the same instance is 
returned.</p>
+ */
+public class ScanEntriesThresholdStrategy implements QueryKillingStrategy {
+  private static final String STRATEGY_NAME = "ScanEntriesThresholdStrategy";
+
+  private final long _maxEntriesScannedInFilter;
+  private final long _maxDocsScanned;
+
+  public ScanEntriesThresholdStrategy(long maxEntriesScannedInFilter, long 
maxDocsScanned) {
+    _maxEntriesScannedInFilter = maxEntriesScannedInFilter;
+    _maxDocsScanned = maxDocsScanned;
+  }
+
+  @Override
+  public boolean shouldTerminate(QueryScanCostContext ctx) {
+    return (_maxEntriesScannedInFilter < Long.MAX_VALUE
+            && ctx.getNumEntriesScannedInFilter() > _maxEntriesScannedInFilter)
+        || (_maxDocsScanned < Long.MAX_VALUE
+            && ctx.getNumDocsScanned() > _maxDocsScanned);
+  }
+
+  @Override
+  public QueryKillReport buildKillReport(QueryScanCostContext ctx,
+      String queryId, String tableName, String configSource) {
+    String triggeringMetric;
+    long actualValue;
+    long thresholdValue;
+    if (_maxEntriesScannedInFilter < Long.MAX_VALUE
+        && ctx.getNumEntriesScannedInFilter() > _maxEntriesScannedInFilter) {
+      triggeringMetric = "numEntriesScannedInFilter";
+      actualValue = ctx.getNumEntriesScannedInFilter();
+      thresholdValue = _maxEntriesScannedInFilter;
+    } else {
+      triggeringMetric = "numDocsScanned";
+      actualValue = ctx.getNumDocsScanned();
+      thresholdValue = _maxDocsScanned;
+    }
+    return new QueryKillReport(queryId, tableName, STRATEGY_NAME,
+        triggeringMetric, actualValue, thresholdValue, configSource, ctx);
+  }
+
+  @Override
+  public int priority() {
+    return 10;
+  }
+
+  /**
+   * Returns a query-specific variant with table-level threshold overrides 
applied.
+   * If the table's {@link QueryConfig} has non-null threshold fields, they 
take precedence
+   * over this strategy's thresholds. Otherwise, returns {@code this} (no 
allocation).
+   */
+  @Override
+  public QueryKillingStrategy forQuery(@Nullable QueryConfig queryConfig,
+      QueryMonitorConfig clusterConfig) {
+    if (queryConfig == null) {
+      return this;
+    }
+    Long tableEntries = queryConfig.getMaxEntriesScannedInFilter();
+    Long tableDocs = queryConfig.getMaxDocsScanned();
+    if (tableEntries == null && tableDocs == null) {
+      return this;
+    }
+    return new ScanEntriesThresholdStrategy(
+        tableEntries != null ? tableEntries : _maxEntriesScannedInFilter,
+        tableDocs != null ? tableDocs : _maxDocsScanned);
+  }
+
+  public long getMaxEntriesScannedInFilter() {
+    return _maxEntriesScannedInFilter;
+  }
+
+  public long getMaxDocsScanned() {
+    return _maxDocsScanned;
+  }
+
+  /**
+   * Factory that creates a {@link ScanEntriesThresholdStrategy} from
+   * {@link QueryMonitorConfig}. This is the default factory used when no 
custom
+   * strategy factory is configured.
+   *
+   * <p>Returns {@code null} if no scan thresholds are configured (all are
+   * {@link Long#MAX_VALUE}), which causes the manager to log a warning that
+   * scan-based killing is enabled but effectively unconfigured.</p>
+   */
+  public static class Factory implements QueryKillingStrategyFactory {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Factory.class);
+
+    @Override
+    @Nullable
+    public QueryKillingStrategy create(QueryMonitorConfig config) {
+      long maxEntries = config.getScanBasedKillingMaxEntriesScannedInFilter();
+      long maxDocs = config.getScanBasedKillingMaxDocsScanned();
+
+      if (maxEntries == Long.MAX_VALUE && maxDocs == Long.MAX_VALUE) {
+        LOGGER.warn("Scan-based killing is enabled but no thresholds are 
configured. "
+            + "Set at least one of: 
accounting.scan.based.killing.max.entries.scanned.in.filter, "
+            + "accounting.scan.based.killing.max.docs.scanned. "
+            + "Scan-based killing will be effectively disabled until 
thresholds are set.");
+        return null;
+      }
+
+      LOGGER.info("Initialized ScanEntriesThresholdStrategy with 
maxEntriesScannedInFilter={}, maxDocsScanned={}",
+          maxEntries == Long.MAX_VALUE ? "disabled" : maxEntries,
+          maxDocs == Long.MAX_VALUE ? "disabled" : maxDocs);
+      return new ScanEntriesThresholdStrategy(maxEntries, maxDocs);
+    }
+
+    @Override
+    public String getName() {
+      return "ScanEntriesThresholdStrategyFactory";
+    }
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigScanKillingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigScanKillingTest.java
new file mode 100644
index 00000000000..4bf4b5ba0c5
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigScanKillingTest.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.accounting;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class QueryMonitorConfigScanKillingTest {
+
+  private static final long MAX_HEAP = 1_000_000_000L;
+
+  @Test
+  public void testDefaultModeIsDisabled() {
+    PinotConfiguration config = new PinotConfiguration();
+    QueryMonitorConfig qmc = new QueryMonitorConfig(config, MAX_HEAP);
+    assertEquals(qmc.getScanBasedKillingMode(), ScanKillingMode.DISABLED);
+    assertFalse(qmc.isScanBasedKillingEnabled());
+    assertFalse(qmc.isScanBasedKillingLogOnly());
+  }
+
+  @Test
+  public void testEnforceMode() {
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, 
"enforce");
+    
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
 "500000000");
+    
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED, 
"50000000");
+
+    QueryMonitorConfig qmc = new QueryMonitorConfig(config, MAX_HEAP);
+    assertEquals(qmc.getScanBasedKillingMode(), ScanKillingMode.ENFORCE);
+    assertTrue(qmc.isScanBasedKillingEnabled());
+    assertFalse(qmc.isScanBasedKillingLogOnly());
+    assertEquals(qmc.getScanBasedKillingMaxEntriesScannedInFilter(), 
500_000_000L);
+    assertEquals(qmc.getScanBasedKillingMaxDocsScanned(), 50_000_000L);
+  }
+
+  @Test
+  public void testLogOnlyMode() {
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, 
"logOnly");
+    
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
 "500000000");
+
+    QueryMonitorConfig qmc = new QueryMonitorConfig(config, MAX_HEAP);
+    assertEquals(qmc.getScanBasedKillingMode(), ScanKillingMode.LOG_ONLY);
+    assertTrue(qmc.isScanBasedKillingEnabled());
+    assertTrue(qmc.isScanBasedKillingLogOnly());
+  }
+
+  @Test
+  public void testDynamicConfigUpdateToEnforce() {
+    PinotConfiguration config = new PinotConfiguration();
+    QueryMonitorConfig oldConfig = new QueryMonitorConfig(config, MAX_HEAP);
+    assertFalse(oldConfig.isScanBasedKillingEnabled());
+
+    Set<String> changedConfigs = new HashSet<>();
+    changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE);
+    
changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, 
"enforce");
+    
clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
 "200000000");
+
+    QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig, 
changedConfigs, clusterConfigs);
+    assertTrue(newConfig.isScanBasedKillingEnabled());
+    assertFalse(newConfig.isScanBasedKillingLogOnly());
+    assertEquals(newConfig.getScanBasedKillingMaxEntriesScannedInFilter(), 
200_000_000L);
+    assertEquals(newConfig.getScanBasedKillingMaxDocsScanned(), 
Long.MAX_VALUE);
+  }
+
+  @Test
+  public void testDynamicConfigUpdateToLogOnly() {
+    // Start in enforce mode
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, 
"enforce");
+    
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
 "100000000");
+    QueryMonitorConfig oldConfig = new QueryMonitorConfig(config, MAX_HEAP);
+    assertTrue(oldConfig.isScanBasedKillingEnabled());
+    assertFalse(oldConfig.isScanBasedKillingLogOnly());
+
+    // Switch to logOnly
+    Set<String> changedConfigs = new HashSet<>();
+    changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, 
"logOnly");
+
+    QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig, 
changedConfigs, clusterConfigs);
+    assertTrue(newConfig.isScanBasedKillingEnabled());
+    assertTrue(newConfig.isScanBasedKillingLogOnly());
+    // Threshold should carry over from old config
+    assertEquals(newConfig.getScanBasedKillingMaxEntriesScannedInFilter(), 
100_000_000L);
+  }
+
+  @Test
+  public void testInvalidModeFallsBackToDisabled() {
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, 
"invalidValue");
+    
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
 "100000000");
+
+    QueryMonitorConfig qmc = new QueryMonitorConfig(config, MAX_HEAP);
+    assertEquals(qmc.getScanBasedKillingMode(), ScanKillingMode.DISABLED);
+    assertFalse(qmc.isScanBasedKillingEnabled());
+    assertFalse(qmc.isScanBasedKillingLogOnly());
+  }
+
+  @Test
+  public void testInvalidModeInDynamicUpdateFallsBackToDisabled() {
+    // Start in enforce mode
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, 
"enforce");
+    
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
 "100000000");
+    QueryMonitorConfig oldConfig = new QueryMonitorConfig(config, MAX_HEAP);
+    assertTrue(oldConfig.isScanBasedKillingEnabled());
+
+    // Dynamic update with invalid mode
+    Set<String> changedConfigs = new HashSet<>();
+    changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, 
"typoEnforce");
+
+    QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig, 
changedConfigs, clusterConfigs);
+    assertEquals(newConfig.getScanBasedKillingMode(), 
ScanKillingMode.DISABLED);
+    assertFalse(newConfig.isScanBasedKillingEnabled());
+  }
+
+  @Test
+  public void testCaseInsensitiveMode() {
+    // "Enforce" (capital E) should parse correctly — mode values are 
case-insensitive
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, 
"Enforce");
+
+    QueryMonitorConfig qmc = new QueryMonitorConfig(config, MAX_HEAP);
+    assertEquals(qmc.getScanBasedKillingMode(), ScanKillingMode.ENFORCE);
+    assertTrue(qmc.isScanBasedKillingEnabled());
+  }
+
+  @Test
+  public void testInvalidThresholdValuesFallBackToDefaults() {
+    // Start with valid config
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, 
"enforce");
+    
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
 "100000000");
+    
config.setProperty(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED, 
"50000000");
+    QueryMonitorConfig oldConfig = new QueryMonitorConfig(config, MAX_HEAP);
+
+    // Dynamic update with invalid values (empty string, non-numeric)
+    Set<String> changedConfigs = new HashSet<>();
+    
changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER);
+    
changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED);
+    
changedConfigs.add(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    
clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
 "");
+    
clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED, 
"notANumber");
+    
clusterConfigs.put(Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER,
 "  ");
+
+    QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig, 
changedConfigs, clusterConfigs);
+
+    // All should fall back to defaults instead of throwing
+    assertEquals(newConfig.getScanBasedKillingMaxEntriesScannedInFilter(), 
Long.MAX_VALUE);
+    assertEquals(newConfig.getScanBasedKillingMaxDocsScanned(), 
Long.MAX_VALUE);
+    assertEquals(newConfig.getScanBasedKillingMaxEntriesScannedPostFilter(), 
Long.MAX_VALUE);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
new file mode 100644
index 00000000000..ea48ff49859
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import 
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class CompositeQueryKillingStrategyTest {
+
+  @Test
+  public void testAnyModeKillsWhenOneTriggered() {
+    List<QueryKillingStrategy> strategies = Arrays.asList(
+        new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE),
+        new ScanEntriesThresholdStrategy(Long.MAX_VALUE, 200L));
+    CompositeQueryKillingStrategy composite =
+        new CompositeQueryKillingStrategy(strategies, 
CompositeQueryKillingStrategy.Mode.ANY);
+
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(101);
+    ctx.addDocsScanned(50);
+    assertTrue(composite.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testAnyModeNoKillWhenNoneTriggered() {
+    List<QueryKillingStrategy> strategies = Arrays.asList(
+        new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE),
+        new ScanEntriesThresholdStrategy(Long.MAX_VALUE, 200L));
+    CompositeQueryKillingStrategy composite =
+        new CompositeQueryKillingStrategy(strategies, 
CompositeQueryKillingStrategy.Mode.ANY);
+
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(50);
+    ctx.addDocsScanned(100);
+    assertFalse(composite.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testAllModeNoKillWhenOnlyOneTriggered() {
+    List<QueryKillingStrategy> strategies = Arrays.asList(
+        new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE),
+        new ScanEntriesThresholdStrategy(Long.MAX_VALUE, 200L));
+    CompositeQueryKillingStrategy composite =
+        new CompositeQueryKillingStrategy(strategies, 
CompositeQueryKillingStrategy.Mode.ALL);
+
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(101);
+    ctx.addDocsScanned(50);
+    assertFalse(composite.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testAllModeKillsWhenAllTriggered() {
+    List<QueryKillingStrategy> strategies = Arrays.asList(
+        new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE),
+        new ScanEntriesThresholdStrategy(Long.MAX_VALUE, 200L));
+    CompositeQueryKillingStrategy composite =
+        new CompositeQueryKillingStrategy(strategies, 
CompositeQueryKillingStrategy.Mode.ALL);
+
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(101);
+    ctx.addDocsScanned(201);
+    assertTrue(composite.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testEmptyStrategiesNeverKills() {
+    CompositeQueryKillingStrategy composite =
+        new CompositeQueryKillingStrategy(Collections.emptyList(), 
CompositeQueryKillingStrategy.Mode.ANY);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(Long.MAX_VALUE - 1);
+    assertFalse(composite.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testEmptyStrategiesAllModeNeverKills() {
+    CompositeQueryKillingStrategy composite =
+        new CompositeQueryKillingStrategy(Collections.emptyList(), 
CompositeQueryKillingStrategy.Mode.ALL);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    assertFalse(composite.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testBuildKillReportDelegatesToTriggeringStrategy() {
+    List<QueryKillingStrategy> strategies = Arrays.asList(
+        new ScanEntriesThresholdStrategy(1000L, Long.MAX_VALUE),
+        new ScanEntriesThresholdStrategy(Long.MAX_VALUE, 500L));
+    CompositeQueryKillingStrategy composite =
+        new CompositeQueryKillingStrategy(strategies, 
CompositeQueryKillingStrategy.Mode.ANY);
+
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addDocsScanned(501);
+    assertTrue(composite.shouldTerminate(ctx));
+
+    QueryKillReport report = composite.buildKillReport(ctx, "q1", "t1", 
"cluster");
+    assertEquals(report.getTriggeringMetric(), "numDocsScanned");
+    assertEquals(report.getActualValue(), 501L);
+  }
+
+  @Test
+  public void testStrategiesSortedByPriority() {
+    // ScanEntriesThresholdStrategy has priority 10
+    // Both trigger, but the first in priority order should produce the report
+    ScanEntriesThresholdStrategy entriesStrategy = new 
ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE);
+    ScanEntriesThresholdStrategy docsStrategy = new 
ScanEntriesThresholdStrategy(Long.MAX_VALUE, 50L);
+
+    CompositeQueryKillingStrategy composite =
+        new CompositeQueryKillingStrategy(Arrays.asList(docsStrategy, 
entriesStrategy),
+            CompositeQueryKillingStrategy.Mode.ANY);
+
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(101);
+    ctx.addDocsScanned(51);
+
+    // Both trigger, but result should come from the first that matches in 
sorted order
+    assertTrue(composite.shouldTerminate(ctx));
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
new file mode 100644
index 00000000000..054197aeaed
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for {@link QueryKillReport}.
+ */
+public class QueryKillReportTest {
+
+  @Test
+  public void testSnapshotsValuesAtCreationTime() {
+    QueryScanCostContext context = new QueryScanCostContext();
+    context.addEntriesScannedInFilter(1000L);
+    context.addDocsScanned(500L);
+    context.addEntriesScannedPostFilter(200L);
+
+    QueryKillReport report = new QueryKillReport(
+        "queryId-1",
+        "myTable_OFFLINE",
+        "EntriesScannedInFilterStrategy",
+        "numEntriesScannedInFilter",
+        1000L,
+        800L,
+        "TABLE_CONFIG",
+        context
+    );
+
+    // Mutate context after report creation — report must be unaffected
+    context.addEntriesScannedInFilter(99999L);
+    context.addDocsScanned(99999L);
+    context.addEntriesScannedPostFilter(99999L);
+
+    assertEquals(report.getSnapshotEntriesScannedInFilter(), 1000L);
+    assertEquals(report.getSnapshotDocsScanned(), 500L);
+    assertEquals(report.getSnapshotEntriesScannedPostFilter(), 200L);
+  }
+
+  @Test
+  public void testCustomerMessageContainsAllFields() {
+    QueryScanCostContext context = new QueryScanCostContext();
+    context.addEntriesScannedInFilter(1234567L);
+    context.addDocsScanned(100L);
+    context.addEntriesScannedPostFilter(50L);
+
+    QueryKillReport report = new QueryKillReport(
+        "queryId-abc",
+        "salesTable_REALTIME",
+        "EntriesScannedInFilterStrategy",
+        "numEntriesScannedInFilter",
+        1234567L,
+        1000000L,
+        "CLUSTER_CONFIG",
+        context
+    );
+
+    String msg = report.toCustomerMessage();
+
+    assertTrue(msg.contains("salesTable_REALTIME"), "Should contain table 
name");
+    assertTrue(msg.contains("numEntriesScannedInFilter"), "Should contain 
metric name");
+    assertTrue(msg.contains("1,234,567"), "Should contain actual value with 
commas");
+    assertTrue(msg.contains("1,000,000"), "Should contain threshold with 
commas");
+    assertTrue(msg.contains("CLUSTER_CONFIG"), "Should contain config source");
+    assertTrue(msg.contains("missing index") || msg.contains("index"),
+        "Should include advice about missing indexes");
+  }
+
+  @Test
+  public void testInternalLogMessageIsStructured() {
+    QueryScanCostContext context = new QueryScanCostContext();
+    context.addEntriesScannedInFilter(500L);
+    context.addDocsScanned(300L);
+    context.addEntriesScannedPostFilter(150L);
+
+    QueryKillReport report = new QueryKillReport(
+        "queryId-xyz",
+        "ordersTable_OFFLINE",
+        "DocsScannedStrategy",
+        "numDocsScanned",
+        500L,
+        400L,
+        "TABLE_CONFIG",
+        context
+    );
+
+    String msg = report.toInternalLogMessage();
+
+    assertTrue(msg.startsWith("QUERY_KILLED"), "Should start with QUERY_KILLED 
prefix");
+    assertTrue(msg.contains("queryId=queryId-xyz"), "Should have queryId 
key=value");
+    assertTrue(msg.contains("table=ordersTable_OFFLINE"), "Should have table 
key=value");
+    assertTrue(msg.contains("metric=numDocsScanned"), "Should have metric 
key=value");
+    // Internal log should use plain numbers, not comma-formatted
+    assertTrue(msg.contains("actual=500"), "Should have actual key=value 
without commas");
+    assertTrue(msg.contains("threshold=400"), "Should have threshold key=value 
without commas");
+  }
+
+  @Test
+  public void testGetters() {
+    QueryScanCostContext context = new QueryScanCostContext();
+    context.addEntriesScannedInFilter(100L);
+    context.addDocsScanned(50L);
+    context.addEntriesScannedPostFilter(25L);
+
+    QueryKillReport report = new QueryKillReport(
+        "queryId-getters",
+        "testTable_OFFLINE",
+        "TestStrategy",
+        "numEntriesScannedInFilter",
+        100L,
+        90L,
+        "SERVER_CONFIG",
+        context
+    );
+
+    assertEquals(report.getQueryId(), "queryId-getters");
+    assertEquals(report.getTableName(), "testTable_OFFLINE");
+    assertEquals(report.getStrategyName(), "TestStrategy");
+    assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter");
+    assertEquals(report.getActualValue(), 100L);
+    assertEquals(report.getThresholdValue(), 90L);
+    assertEquals(report.getConfigSource(), "SERVER_CONFIG");
+    assertEquals(report.getSnapshotEntriesScannedInFilter(), 100L);
+    assertEquals(report.getSnapshotDocsScanned(), 50L);
+    assertEquals(report.getSnapshotEntriesScannedPostFilter(), 25L);
+    assertTrue(report.getElapsedTimeMs() >= 0L, "Elapsed time must be 
non-negative");
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
new file mode 100644
index 00000000000..5bde697b5a3
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+import 
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for {@link QueryKillingManager}.
+ */
+public class QueryKillingManagerTest {
+
+  private ServerMetrics _serverMetrics;
+
+  @BeforeMethod
+  public void setUp() {
+    _serverMetrics = mock(ServerMetrics.class);
+  }
+
+  private QueryMonitorConfig buildConfig(String mode, long maxEntriesInFilter,
+      long maxDocsScanned) {
+    Map<String, Object> props = new HashMap<>();
+    props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, 
mode);
+    
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
+        maxEntriesInFilter);
+    
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED,
 maxDocsScanned);
+    
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER,
 Long.MAX_VALUE);
+    PinotConfiguration pinotConfig = new PinotConfiguration(props);
+    return new QueryMonitorConfig(pinotConfig, 
Runtime.getRuntime().maxMemory());
+  }
+
+  // --- Strategy built from config (init-time validation) ---
+
+  @Test
+  public void testInitBuildsStrategyFromConfig() {
+    QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    assertNotNull(manager.getActiveStrategy(), "Strategy should be built when 
thresholds are configured");
+    assertTrue(manager.getActiveStrategy() instanceof 
ScanEntriesThresholdStrategy);
+  }
+
+  @Test
+  public void testInitWithNoThresholdsLogsWarningAndReturnsNullStrategy() {
+    // All thresholds are MAX_VALUE — factory should return null
+    QueryMonitorConfig config = buildConfig("enforce", Long.MAX_VALUE, 
Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    assertNull(manager.getActiveStrategy(),
+        "Strategy should be null when no thresholds are configured");
+  }
+
+  @Test
+  public void testInitWithDisabledReturnsNullStrategy() {
+    QueryMonitorConfig config = buildConfig("disabled", 100L, 100L);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    assertNull(manager.getActiveStrategy(),
+        "Strategy should be null when killing is disabled");
+  }
+
+  // --- Default strategy (ScanEntriesThresholdStrategy from config) ---
+
+  @Test
+  public void testDisabledDoesNotKill() {
+    QueryMonitorConfig config = buildConfig("disabled", 100L, 100L);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(500L);
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx, "q1", "testTable_OFFLINE", 
null);
+    assertNull(execCtx.getTerminateException());
+  }
+
+  @Test
+  public void testEnabledKillsWhenThresholdExceeded() {
+    QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L);
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx, "q2", "testTable_OFFLINE", 
null);
+    assertNotNull(execCtx.getTerminateException());
+    assertEquals(execCtx.getTerminateException().getErrorCode(), 
QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED);
+  }
+
+  @Test
+  public void testLogOnlyDoesNotKill() {
+    QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L);
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx, "q3", "testTable_OFFLINE", 
null);
+    assertNull(execCtx.getTerminateException());
+  }
+
+  @Test
+  public void testBelowThresholdDoesNotKill() {
+    QueryMonitorConfig config = buildConfig("enforce", 1000L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(500L);
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx, "q4", "testTable_OFFLINE", 
null);
+    assertNull(execCtx.getTerminateException());
+  }
+
+  @Test
+  public void testAlreadyTerminatedSkipsEvaluation() {
+    QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    execCtx.terminate(QueryErrorCode.QUERY_CANCELLATION, "cancelled");
+
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L);
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx, "q5", "testTable_OFFLINE", 
null);
+    assertNotNull(execCtx.getTerminateException());
+  }
+
+  @Test
+  public void testDocsScannedThreshold() {
+    QueryMonitorConfig config = buildConfig("enforce", Long.MAX_VALUE, 100L);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addDocsScanned(200L);
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx, "q8", "testTable_OFFLINE", 
null);
+    assertNotNull(execCtx.getTerminateException());
+  }
+
+  // --- Table overrides via forQuery() ---
+
+  @Test
+  public void testTableOverrideRaisesThreshold() {
+    QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L); // Above cluster (100), below 
table (500)
+
+    QueryConfig queryConfig = new QueryConfig(null, null, null, null, null, 
null, 500L, null, null);
+    manager.checkAndKillIfNeeded(execCtx, scanCtx, "q6", "testTable_OFFLINE", 
queryConfig);
+    assertNull(execCtx.getTerminateException(),
+        "Table override should raise threshold, preventing kill");
+  }
+
+  @Test
+  public void testTableOverrideLowersThreshold() {
+    QueryMonitorConfig config = buildConfig("enforce", 1000L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(100L); // Below cluster (1000), above 
table (50)
+
+    QueryConfig queryConfig = new QueryConfig(null, null, null, null, null, 
null, 50L, null, null);
+    manager.checkAndKillIfNeeded(execCtx, scanCtx, "q7", "testTable_OFFLINE", 
queryConfig);
+    assertNotNull(execCtx.getTerminateException(),
+        "Table override should lower threshold, causing kill");
+  }
+
+  // --- Custom strategy factory pluggability ---
+
+  @Test
+  public void testCustomFactoryClassFromConfig() {
+    // Configure a custom factory class name
+    Map<String, Object> props = new HashMap<>();
+    props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
+        "enforce");
+    
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME,
+        AlwaysKillStrategyFactory.class.getName());
+    PinotConfiguration pinotConfig = new PinotConfiguration(props);
+    QueryMonitorConfig config = new QueryMonitorConfig(pinotConfig, 
Runtime.getRuntime().maxMemory());
+
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    assertNotNull(manager.getActiveStrategy(), "Custom factory should create a 
strategy");
+
+    // Should kill even with zero scan entries (AlwaysKillStrategy always 
kills)
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    manager.checkAndKillIfNeeded(execCtx, scanCtx, "q10", "testTable_OFFLINE", 
null);
+    assertNotNull(execCtx.getTerminateException(),
+        "Custom AlwaysKillStrategy should kill regardless of scan counts");
+  }
+
+  @Test
+  public void testInvalidFactoryClassFallsBackGracefully() {
+    Map<String, Object> props = new HashMap<>();
+    props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
+        "enforce");
+    
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME,
+        "com.nonexistent.FakeFactory");
+    
props.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
 100L);
+    PinotConfiguration pinotConfig = new PinotConfiguration(props);
+    QueryMonitorConfig config = new QueryMonitorConfig(pinotConfig, 
Runtime.getRuntime().maxMemory());
+
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    // Should not crash — falls back to default factory
+    assertNotNull(manager.getActiveStrategy(),
+        "Invalid factory should fall back to default 
ScanEntriesThresholdStrategy");
+    assertTrue(manager.getActiveStrategy() instanceof 
ScanEntriesThresholdStrategy);
+  }
+
+  @Test
+  public void testRebuildStrategyPicksUpConfigChanges() {
+    // Start with no thresholds
+    QueryMonitorConfig config1 = buildConfig("enforce", Long.MAX_VALUE, 
Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config1);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+    assertNull(manager.getActiveStrategy(), "No thresholds = no strategy");
+
+    // Update config with thresholds
+    QueryMonitorConfig config2 = buildConfig("enforce", 100L, Long.MAX_VALUE);
+    configRef.set(config2);
+    manager.rebuildStrategy();
+    assertNotNull(manager.getActiveStrategy(), "After config update, strategy 
should be built");
+  }
+
+  // --- Test fixtures for pluggable strategy ---
+
+  /**
+   * A test strategy that always kills — used to verify custom factory loading.
+   */
+  public static class AlwaysKillStrategy implements QueryKillingStrategy {
+    @Override
+    public boolean shouldTerminate(QueryScanCostContext context) {
+      return true;
+    }
+
+    @Override
+    public QueryKillReport buildKillReport(QueryScanCostContext context,
+        String queryId, String tableName, String configSource) {
+      return new QueryKillReport(queryId, tableName, "AlwaysKillStrategy",
+          "always", 0, 0, configSource, context);
+    }
+  }
+
+  /**
+   * A test factory that creates an AlwaysKillStrategy — loaded by class name 
via config.
+   */
+  public static class AlwaysKillStrategyFactory implements 
QueryKillingStrategyFactory {
+    @Override
+    public QueryKillingStrategy create(QueryMonitorConfig config) {
+      return new AlwaysKillStrategy();
+    }
+
+    @Override
+    public String getName() {
+      return "AlwaysKillStrategyFactory";
+    }
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
new file mode 100644
index 00000000000..48785498162
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing.strategy;
+
+import org.apache.pinot.core.query.killing.QueryKillReport;
+import org.apache.pinot.core.query.killing.QueryKillingStrategy;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class ScanEntriesThresholdStrategyTest {
+
+  @Test
+  public void testBelowThresholdDoesNotKill() {
+    ScanEntriesThresholdStrategy strategy = new 
ScanEntriesThresholdStrategy(100_000_000L, 10_000_000L);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(50_000_000);
+    ctx.addDocsScanned(5_000_000);
+    assertFalse(strategy.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testAtThresholdDoesNotKill() {
+    ScanEntriesThresholdStrategy strategy = new 
ScanEntriesThresholdStrategy(100_000_000L, 10_000_000L);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(100_000_000);
+    assertFalse(strategy.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testAboveEntriesScannedThresholdKills() {
+    ScanEntriesThresholdStrategy strategy = new 
ScanEntriesThresholdStrategy(100_000_000L, Long.MAX_VALUE);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(100_000_001);
+    assertTrue(strategy.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testAboveDocsScannedThresholdKills() {
+    ScanEntriesThresholdStrategy strategy = new 
ScanEntriesThresholdStrategy(Long.MAX_VALUE, 10_000_000L);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addDocsScanned(10_000_001);
+    assertTrue(strategy.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testMaxValueThresholdEffectivelyDisables() {
+    ScanEntriesThresholdStrategy strategy = new 
ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(999_999_999_999L);
+    ctx.addDocsScanned(999_999_999_999L);
+    assertFalse(strategy.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testBuildKillReportForEntriesScanned() {
+    ScanEntriesThresholdStrategy strategy = new 
ScanEntriesThresholdStrategy(100_000_000L, Long.MAX_VALUE);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(150_000_000);
+
+    QueryKillReport report = strategy.buildKillReport(ctx, "q1", "myTable", 
"cluster");
+    assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter");
+    assertEquals(report.getActualValue(), 150_000_000L);
+    assertEquals(report.getThresholdValue(), 100_000_000L);
+  }
+
+  @Test
+  public void testBuildKillReportForDocsScanned() {
+    ScanEntriesThresholdStrategy strategy = new 
ScanEntriesThresholdStrategy(Long.MAX_VALUE, 10_000_000L);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addDocsScanned(15_000_000);
+
+    QueryKillReport report = strategy.buildKillReport(ctx, "q2", "myTable", 
"table:myTable");
+    assertEquals(report.getTriggeringMetric(), "numDocsScanned");
+    assertEquals(report.getActualValue(), 15_000_000L);
+    assertEquals(report.getThresholdValue(), 10_000_000L);
+  }
+
+  @Test
+  public void testPriority() {
+    ScanEntriesThresholdStrategy strategy = new 
ScanEntriesThresholdStrategy(100L, 100L);
+    assertEquals(strategy.priority(), 10);
+  }
+
+  @Test
+  public void testEitherMetricCanTrigger() {
+    ScanEntriesThresholdStrategy strategy = new 
ScanEntriesThresholdStrategy(100L, 200L);
+
+    // Only entries exceeds
+    QueryScanCostContext ctx1 = new QueryScanCostContext();
+    ctx1.addEntriesScannedInFilter(101);
+    ctx1.addDocsScanned(50);
+    assertTrue(strategy.shouldTerminate(ctx1));
+
+    // Only docs exceeds
+    QueryScanCostContext ctx2 = new QueryScanCostContext();
+    ctx2.addEntriesScannedInFilter(50);
+    ctx2.addDocsScanned(201);
+    assertTrue(strategy.shouldTerminate(ctx2));
+  }
+
+  // --- forQuery() table override tests ---
+
+  @Test
+  public void testForQueryReturnsThisWhenNoOverrides() {
+    ScanEntriesThresholdStrategy strategy = new 
ScanEntriesThresholdStrategy(100L, 200L);
+    QueryKillingStrategy result = strategy.forQuery(null, null);
+    assertTrue(result == strategy, "forQuery(null) should return same 
instance");
+
+    QueryConfig emptyConfig = new QueryConfig(null, null, null, null, null, 
null, null, null, null);
+    result = strategy.forQuery(emptyConfig, null);
+    assertTrue(result == strategy, "forQuery with no scan overrides should 
return same instance");
+  }
+
+  @Test
+  public void testForQueryAppliesTableOverride() {
+    ScanEntriesThresholdStrategy strategy = new 
ScanEntriesThresholdStrategy(100L, 200L);
+    QueryConfig queryConfig = new QueryConfig(null, null, null, null, null, 
null, 500L, null, null);
+
+    QueryKillingStrategy result = strategy.forQuery(queryConfig, null);
+    assertTrue(result != strategy, "forQuery with override should return new 
instance");
+    assertTrue(result instanceof ScanEntriesThresholdStrategy);
+
+    ScanEntriesThresholdStrategy overridden = (ScanEntriesThresholdStrategy) 
result;
+    assertEquals(overridden.getMaxEntriesScannedInFilter(), 500L);
+    assertEquals(overridden.getMaxDocsScanned(), 200L); // inherited from 
original
+  }
+
+  // --- Factory tests ---
+
+  @Test
+  public void testFactoryCreatesStrategyWithThresholds() {
+    java.util.Map<String, Object> props = new java.util.HashMap<>();
+    props.put("accounting.scan.based.killing.enabled", true);
+    props.put("accounting.scan.based.killing.max.entries.scanned.in.filter", 
500_000_000L);
+    props.put("accounting.scan.based.killing.max.docs.scanned", 50_000_000L);
+    org.apache.pinot.spi.env.PinotConfiguration pinotConfig = new 
org.apache.pinot.spi.env.PinotConfiguration(props);
+    org.apache.pinot.core.accounting.QueryMonitorConfig config =
+        new org.apache.pinot.core.accounting.QueryMonitorConfig(pinotConfig, 
1_000_000_000L);
+
+    ScanEntriesThresholdStrategy.Factory factory = new 
ScanEntriesThresholdStrategy.Factory();
+    QueryKillingStrategy strategy = factory.create(config);
+    assertNotNull(strategy);
+    assertTrue(strategy instanceof ScanEntriesThresholdStrategy);
+  }
+
+  @Test
+  public void testFactoryReturnsNullWhenNoThresholds() {
+    java.util.Map<String, Object> props = new java.util.HashMap<>();
+    props.put("accounting.scan.based.killing.enabled", true);
+    // No thresholds set — defaults are Long.MAX_VALUE
+    org.apache.pinot.spi.env.PinotConfiguration pinotConfig = new 
org.apache.pinot.spi.env.PinotConfiguration(props);
+    org.apache.pinot.core.accounting.QueryMonitorConfig config =
+        new org.apache.pinot.core.accounting.QueryMonitorConfig(pinotConfig, 
1_000_000_000L);
+
+    ScanEntriesThresholdStrategy.Factory factory = new 
ScanEntriesThresholdStrategy.Factory();
+    QueryKillingStrategy strategy = factory.create(config);
+    assertNull(strategy, "Factory should return null when no thresholds are 
configured");
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index eda5859962a..b5312fa72e3 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2296,7 +2296,8 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // Add expression override
     TableConfig tableConfig = createOfflineTableConfig();
     tableConfig.setQueryConfig(
-        new QueryConfig(null, null, null, Map.of("DaysSinceEpoch * 24", 
"NewAddedDerivedHoursSinceEpoch"), null, null));
+        new QueryConfig(null, null, null, Map.of("DaysSinceEpoch * 24", 
"NewAddedDerivedHoursSinceEpoch"), null,
+            null));
     updateTableConfig(tableConfig);
 
     TestUtils.waitForCondition(aVoid -> {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
index b2d6c28bead..093b7c340fa 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
@@ -56,18 +56,41 @@ public class QueryConfig extends BaseJsonConfig {
   // Indicates the maximum length of the serialized response per server for a 
query.
   private final Long _maxServerResponseSizeBytes;
 
+  private final Long _maxEntriesScannedInFilter;
+
+  private final Long _maxDocsScanned;
+
+  private final Long _maxEntriesScannedPostFilter;
+
+
+  public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy,
+      @Nullable Boolean useApproximateFunction, @Nullable Map<String, String> 
expressionOverrideMap,
+      @Nullable Long maxQueryResponseSizeBytes, @Nullable Long 
maxServerResponseSizeBytes) {
+    this(timeoutMs, disableGroovy, useApproximateFunction, 
expressionOverrideMap,
+        maxQueryResponseSizeBytes, maxServerResponseSizeBytes, null, null, 
null);
+  }
+
   @JsonCreator
   public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs,
       @JsonProperty("disableGroovy") @Nullable Boolean disableGroovy,
       @JsonProperty("useApproximateFunction") @Nullable Boolean 
useApproximateFunction,
       @JsonProperty("expressionOverrideMap") @Nullable Map<String, String> 
expressionOverrideMap,
       @JsonProperty("maxQueryResponseSizeBytes") @Nullable Long 
maxQueryResponseSizeBytes,
-      @JsonProperty("maxServerResponseSizeBytes") @Nullable Long 
maxServerResponseSizeBytes) {
+      @JsonProperty("maxServerResponseSizeBytes") @Nullable Long 
maxServerResponseSizeBytes,
+      @JsonProperty("maxEntriesScannedInFilter") @Nullable Long 
maxEntriesScannedInFilter,
+      @JsonProperty("maxDocsScanned") @Nullable Long maxDocsScanned,
+      @JsonProperty("maxEntriesScannedPostFilter") @Nullable Long 
maxEntriesScannedPostFilter) {
     Preconditions.checkArgument(timeoutMs == null || timeoutMs > 0, "Invalid 
'timeoutMs': %s", timeoutMs);
     Preconditions.checkArgument(maxQueryResponseSizeBytes == null || 
maxQueryResponseSizeBytes > 0,
         "Invalid 'maxQueryResponseSizeBytes': %s", maxQueryResponseSizeBytes);
     Preconditions.checkArgument(maxServerResponseSizeBytes == null || 
maxServerResponseSizeBytes > 0,
         "Invalid 'maxServerResponseSizeBytes': %s", 
maxServerResponseSizeBytes);
+    Preconditions.checkArgument(maxEntriesScannedInFilter == null || 
maxEntriesScannedInFilter > 0,
+        "Invalid 'maxEntriesScannedInFilter': %s", maxEntriesScannedInFilter);
+    Preconditions.checkArgument(maxDocsScanned == null || maxDocsScanned > 0,
+        "Invalid 'maxDocsScanned': %s", maxDocsScanned);
+    Preconditions.checkArgument(maxEntriesScannedPostFilter == null || 
maxEntriesScannedPostFilter > 0,
+        "Invalid 'maxEntriesScannedPostFilter': %s", 
maxEntriesScannedPostFilter);
 
     _timeoutMs = timeoutMs;
     _disableGroovy = disableGroovy;
@@ -75,6 +98,9 @@ public class QueryConfig extends BaseJsonConfig {
     _expressionOverrideMap = expressionOverrideMap;
     _maxQueryResponseSizeBytes = maxQueryResponseSizeBytes;
     _maxServerResponseSizeBytes = maxServerResponseSizeBytes;
+    _maxEntriesScannedInFilter = maxEntriesScannedInFilter;
+    _maxDocsScanned = maxDocsScanned;
+    _maxEntriesScannedPostFilter = maxEntriesScannedPostFilter;
   }
 
   @Nullable
@@ -112,4 +138,22 @@ public class QueryConfig extends BaseJsonConfig {
   public Long getMaxServerResponseSizeBytes() {
     return _maxServerResponseSizeBytes;
   }
+
+  @Nullable
+  @JsonProperty("maxEntriesScannedInFilter")
+  public Long getMaxEntriesScannedInFilter() {
+    return _maxEntriesScannedInFilter;
+  }
+
+  @Nullable
+  @JsonProperty("maxDocsScanned")
+  public Long getMaxDocsScanned() {
+    return _maxDocsScanned;
+  }
+
+  @Nullable
+  @JsonProperty("maxEntriesScannedPostFilter")
+  public Long getMaxEntriesScannedPostFilter() {
+    return _maxEntriesScannedPostFilter;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
index ea2629b7488..1786dfc977c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
@@ -47,6 +47,7 @@ public enum QueryErrorCode {
   SERVER_SEGMENT_MISSING(235, "ServerSegmentMissing", 
Response.Status.NOT_FOUND),
   QUERY_SCHEDULING_TIMEOUT(240, "QuerySchedulingTimeoutError", 
Response.Status.REQUEST_TIMEOUT),
   SERVER_RESOURCE_LIMIT_EXCEEDED(245, "ServerResourceLimitExceededError", 
Response.Status.SERVICE_UNAVAILABLE),
+  QUERY_SCAN_LIMIT_EXCEEDED(246, "QueryScanLimitExceededError", 
Response.Status.BAD_REQUEST),
   EXECUTION_TIMEOUT(250, "ExecutionTimeoutError", 
Response.Status.REQUEST_TIMEOUT),
   BROKER_SEGMENT_UNAVAILABLE(305, "", Response.Status.SERVICE_UNAVAILABLE),
   BROKER_TIMEOUT(400, "BrokerTimeoutError", Response.Status.REQUEST_TIMEOUT),
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryScanCostContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryScanCostContext.java
new file mode 100644
index 00000000000..aa1e6b5e825
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryScanCostContext.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.query;
+
+import java.util.concurrent.atomic.LongAdder;
+
+
+/**
+ * Lightweight, thread-safe accumulator for query scan cost metrics.
+ *
+ * <p>One instance is created per query when scan-based killing is enabled.
+ * Attached to {@link QueryExecutionContext} and shared across all segment
+ * worker threads executing that query.</p>
+ *
+ */
+public final class QueryScanCostContext {
+  private final LongAdder _numEntriesScannedInFilter = new LongAdder();
+  private final LongAdder _numDocsScanned = new LongAdder();
+  private final LongAdder _numEntriesScannedPostFilter = new LongAdder();
+  private final long _startTimeMs = System.currentTimeMillis();
+
+  /**
+   * Increment entries scanned during filter evaluation.
+   * Called from {@code DocIdSetOperator} after each block.
+   */
+  public void addEntriesScannedInFilter(long count) {
+    _numEntriesScannedInFilter.add(count);
+  }
+
+  /**
+   * Increment documents scanned (passed filter, processed by query operator).
+   * Called from {@code AggregationOperator}, {@code SelectionOnlyOperator},
+   * {@code GroupByOperator} after each value block.
+   */
+  public void addDocsScanned(long count) {
+    _numDocsScanned.add(count);
+  }
+
+  /**
+   * Increment entries scanned post-filter (docs * projected columns).
+   * Called from query operators after projection.
+   */
+  public void addEntriesScannedPostFilter(long count) {
+    _numEntriesScannedPostFilter.add(count);
+  }
+
+  public long getNumEntriesScannedInFilter() {
+    return _numEntriesScannedInFilter.sum();
+  }
+
+  public long getNumDocsScanned() {
+    return _numDocsScanned.sum();
+  }
+
+  public long getNumEntriesScannedPostFilter() {
+    return _numEntriesScannedPostFilter.sum();
+  }
+
+  /**
+   * Returns wall-clock time elapsed since this context was created.
+   */
+  public long getElapsedTimeMs() {
+    return System.currentTimeMillis() - _startTimeMs;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 30c074a422c..fbe1fe743f3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1874,6 +1874,57 @@ public class CommonConstants {
     @Deprecated(since = "1.6.0", forRemoval = true)
     public static final String CONFIG_OF_SECONDARY_WORKLOAD_CPU_PERCENTAGE =
         "accounting.secondary.workload.cpu.percentage";
+
+    // Scan-based query killing
+    public enum ScanKillingMode {
+      DISABLED("disabled"),
+      LOG_ONLY("logOnly"),
+      ENFORCE("enforce");
+
+      private final String _configValue;
+
+      ScanKillingMode(String configValue) {
+        _configValue = configValue;
+      }
+
+      public String getConfigValue() {
+        return _configValue;
+      }
+
+      /**
+       * Parses a config string into a {@link ScanKillingMode}. 
Case-insensitive.
+       * Returns {@code null} if the value is not recognized.
+       */
+      public static ScanKillingMode fromConfigValue(String value) {
+        if (value == null) {
+          return null;
+        }
+        for (ScanKillingMode mode : values()) {
+          if (mode._configValue.equalsIgnoreCase(value)) {
+            return mode;
+          }
+        }
+        return null;
+      }
+    }
+
+    public static final String CONFIG_OF_SCAN_BASED_KILLING_MODE = 
"accounting.scan.based.killing.mode";
+    public static final ScanKillingMode DEFAULT_SCAN_BASED_KILLING_MODE = 
ScanKillingMode.DISABLED;
+
+    public static final String 
CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME =
+        "accounting.scan.based.killing.strategy.factory.class.name";
+
+    public static final String 
CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER =
+        "accounting.scan.based.killing.max.entries.scanned.in.filter";
+    public static final long 
DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER = Long.MAX_VALUE;
+
+    public static final String CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED =
+        "accounting.scan.based.killing.max.docs.scanned";
+    public static final long DEFAULT_SCAN_BASED_KILLING_MAX_DOCS_SCANNED = 
Long.MAX_VALUE;
+
+    public static final String 
CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER =
+        "accounting.scan.based.killing.max.entries.scanned.post.filter";
+    public static final long 
DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER = Long.MAX_VALUE;
   }
 
   public static class ExecutorService {
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
new file mode 100644
index 00000000000..5d6ea187d2d
--- /dev/null
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+public class QueryConfigScanKillingTest {
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  @Test
+  public void testScanFieldsDefaultToNull() {
+    QueryConfig config = new QueryConfig(null, null, null, null, null, null, 
null, null, null);
+    assertNull(config.getMaxEntriesScannedInFilter());
+    assertNull(config.getMaxDocsScanned());
+    assertNull(config.getMaxEntriesScannedPostFilter());
+  }
+
+  @Test
+  public void testScanFieldsSetExplicitly() {
+    QueryConfig config = new QueryConfig(null, null, null, null, null, null,
+        500_000_000L, 50_000_000L, 100_000_000L);
+    assertEquals(config.getMaxEntriesScannedInFilter(), 
Long.valueOf(500_000_000L));
+    assertEquals(config.getMaxDocsScanned(), Long.valueOf(50_000_000L));
+    assertEquals(config.getMaxEntriesScannedPostFilter(), 
Long.valueOf(100_000_000L));
+  }
+
+  @Test
+  public void testJsonSerializationWithScanFields()
+      throws Exception {
+    QueryConfig config = new QueryConfig(30000L, null, null, null, null, null,
+        500_000_000L, 50_000_000L, null);
+
+    String json = OBJECT_MAPPER.writeValueAsString(config);
+    QueryConfig deserialized = OBJECT_MAPPER.readValue(json, 
QueryConfig.class);
+
+    assertEquals(deserialized.getTimeoutMs(), Long.valueOf(30000L));
+    assertEquals(deserialized.getMaxEntriesScannedInFilter(), 
Long.valueOf(500_000_000L));
+    assertEquals(deserialized.getMaxDocsScanned(), Long.valueOf(50_000_000L));
+    assertNull(deserialized.getMaxEntriesScannedPostFilter());
+  }
+
+  @Test
+  public void testJsonDeserializationWithoutScanFields()
+      throws Exception {
+    String json = "{\"timeoutMs\": 5000}";
+    QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class);
+    assertEquals(config.getTimeoutMs(), Long.valueOf(5000L));
+    assertNull(config.getMaxEntriesScannedInFilter());
+    assertNull(config.getMaxDocsScanned());
+    assertNull(config.getMaxEntriesScannedPostFilter());
+  }
+
+  @Test
+  public void testJsonDeserializationWithOnlyScanFields()
+      throws Exception {
+    String json = "{\"maxEntriesScannedInFilter\": 1000000000, 
\"maxDocsScanned\": 100000000}";
+    QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class);
+    assertNull(config.getTimeoutMs());
+    assertEquals(config.getMaxEntriesScannedInFilter(), 
Long.valueOf(1_000_000_000L));
+    assertEquals(config.getMaxDocsScanned(), Long.valueOf(100_000_000L));
+    assertNull(config.getMaxEntriesScannedPostFilter());
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testNegativeMaxEntriesScannedInFilterThrows() {
+    new QueryConfig(null, null, null, null, null, null, -1L, null, null);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testZeroMaxDocsScannedThrows() {
+    new QueryConfig(null, null, null, null, null, null, null, 0L, null);
+  }
+}
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/query/QueryScanCostContextTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/query/QueryScanCostContextTest.java
new file mode 100644
index 00000000000..6f5a0f3da62
--- /dev/null
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/query/QueryScanCostContextTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.query;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class QueryScanCostContextTest {
+
+  @Test
+  public void testInitialValuesAreZero() {
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    assertEquals(ctx.getNumEntriesScannedInFilter(), 0L);
+    assertEquals(ctx.getNumDocsScanned(), 0L);
+    assertEquals(ctx.getNumEntriesScannedPostFilter(), 0L);
+    assertTrue(ctx.getElapsedTimeMs() >= 0);
+  }
+
+  @Test
+  public void testSingleThreadIncrements() {
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(100);
+    ctx.addEntriesScannedInFilter(200);
+    assertEquals(ctx.getNumEntriesScannedInFilter(), 300L);
+
+    ctx.addDocsScanned(50);
+    ctx.addDocsScanned(25);
+    assertEquals(ctx.getNumDocsScanned(), 75L);
+
+    ctx.addEntriesScannedPostFilter(1000);
+    assertEquals(ctx.getNumEntriesScannedPostFilter(), 1000L);
+  }
+
+  @Test
+  public void testElapsedTime()
+      throws InterruptedException {
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    Thread.sleep(50);
+    assertTrue(ctx.getElapsedTimeMs() >= 40, "Elapsed time should be at least 
~50ms");
+  }
+
+  @Test
+  public void testConcurrentWritesAreCorrect()
+      throws InterruptedException {
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    int numThreads = 16;
+    long incrementsPerThread = 100_000;
+
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    CountDownLatch latch = new CountDownLatch(numThreads);
+
+    for (int i = 0; i < numThreads; i++) {
+      executor.submit(() -> {
+        for (long j = 0; j < incrementsPerThread; j++) {
+          ctx.addEntriesScannedInFilter(1);
+          ctx.addDocsScanned(1);
+        }
+        latch.countDown();
+      });
+    }
+    latch.await();
+    executor.shutdown();
+
+    long expected = numThreads * incrementsPerThread;
+    assertEquals(ctx.getNumEntriesScannedInFilter(), expected);
+    assertEquals(ctx.getNumDocsScanned(), expected);
+  }
+
+  @Test
+  public void testLargeIncrements() {
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(500_000_000L);
+    ctx.addEntriesScannedInFilter(500_000_000L);
+    assertEquals(ctx.getNumEntriesScannedInFilter(), 1_000_000_000L);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to