This is an automated email from the ASF dual-hosted git repository.

shauryachats pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 40af9051d29 [pinot-server/ proactive-query-killing] (2/2) integrate 
query scan cost based instrumentation and killing in server operators (#18475)
40af9051d29 is described below

commit 40af9051d294f1304db65b79f4556e110d909f73
Author: Anurag Rai <[email protected]>
AuthorDate: Wed Jun 17 22:01:34 2026 +0530

    [pinot-server/ proactive-query-killing] (2/2) integrate query scan cost 
based instrumentation and killing in server operators (#18475)
    
    * add integration of query scan cost based killing in all server operators
    
    * fix claude review found issues
    
    * add entries scanned post filter instrumentation in query killing
    
    * add per-table scanKillingMode override to proactive query killing
    
    Extends QueryConfig with a scanKillingMode field (disabled/logOnly/enforce)
    that lets individual tables override the cluster-level scan killing mode.
    Stores the resolved mode as a volatile field on QueryExecutionContext during
    query init and applies it in QueryKillingManager.checkAndKillWithStrategy()
    ahead of the cluster config — enabling patterns like cluster=logOnly +
    table=enforce for targeted enforcement without a cluster-wide mode change.
    Invalid mode strings are rejected at QueryConfig construction time via
    Preconditions. Includes 4 new manager tests and 7 new QueryConfig tests.
    
    Co-Authored-By: Claude Sonnet 4.6 (1M context) <[email protected]>
    
    * fix dynamic ZK config changes not being picked up by QueryKillingManager
    
    QueryKillingManager.onChange() received raw ZK keys with the full
    "pinot.query.scheduler." prefix but passed them directly to
    QueryMonitorConfig's update constructor, which checks for keys
    without that prefix (e.g. "accounting.scan.based.killing.mode").
    The mismatch caused all dynamic config changes to be silently ignored,
    requiring a server restart for scan-killing config to take effect.
    
    Strip the prefix before passing to QueryMonitorConfig, matching the
    key space the init constructor uses. Add early return when no relevant
    keys changed, and log the applied config values after rebuild.
    
    Co-Authored-By: Claude Sonnet 4.6 (1M context) <[email protected]>
    
    * switch server metrics for query killing to table specific and fall back 
to global when table name unavailable
    
    * make sure dry run metrics and logs are emitted per query only
    
    * reduce nested bracket for checkScanBasedKilling
    
    ---------
    
    Co-authored-by: Claude Sonnet 4.6 (1M context) <[email protected]>
---
 .../pinot/core/accounting/QueryMonitorConfig.java  |   3 +-
 .../apache/pinot/core/operator/BaseOperator.java   |  33 ++
 .../pinot/core/operator/DocIdSetOperator.java      |  12 +
 .../core/operator/query/AggregationOperator.java   |   7 +
 .../core/operator/query/DistinctOperator.java      |   7 +
 .../query/FilteredAggregationOperator.java         |   7 +
 .../operator/query/FilteredGroupByOperator.java    |   7 +
 .../pinot/core/operator/query/GroupByOperator.java |   7 +
 .../core/operator/query/SelectionOnlyOperator.java |   7 +
 .../operator/query/SelectionOrderByOperator.java   |  11 +
 .../SelectionPartiallyOrderedByDescOperation.java  |   7 +
 .../SelectionPartiallyOrderedByLinearOperator.java |   7 +
 .../streaming/StreamingSelectionOnlyOperator.java  |   7 +
 .../query/executor/ServerQueryExecutorV1Impl.java  |  55 ++
 .../killing/CompositeQueryKillingStrategy.java     |   4 +-
 .../pinot/core/query/killing/QueryKillReport.java  |  17 +-
 .../core/query/killing/QueryKillingManager.java    | 192 ++++++-
 .../core/query/killing/QueryKillingStrategy.java   |   2 +-
 .../strategy/ScanEntriesThresholdStrategy.java     |  46 +-
 .../query/OperatorScanCostTrackingTest.java        | 207 +++++++
 .../killing/CompositeQueryKillingStrategyTest.java |   2 +-
 .../core/query/killing/QueryKillReportTest.java    |   7 +
 .../query/killing/QueryKillingManagerTest.java     | 605 ++++++++++++++++++++-
 .../strategy/ScanEntriesThresholdStrategyTest.java |  62 ++-
 .../server/starter/helix/BaseServerStarter.java    |  11 +
 .../apache/pinot/spi/config/table/QueryConfig.java |  27 +-
 .../pinot/spi/query/QueryExecutionContext.java     |  81 +++
 .../config/table/QueryConfigScanKillingTest.java   |  68 +++
 28 files changed, 1457 insertions(+), 51 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
index 920648ca506..c5bfcde27ad 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
@@ -158,7 +158,8 @@ public class QueryMonitorConfig {
         (String) null);
   }
 
-  QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String> changedConfigs, 
Map<String, String> clusterConfigs) {
+  public QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String> 
changedConfigs,
+      Map<String, String> clusterConfigs) {
     _maxHeapSize = oldConfig._maxHeapSize;
 
     if 
(changedConfigs.contains(Accounting.Keys.MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO)) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
index a99855a07a7..22413450a12 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
@@ -25,6 +25,10 @@ import java.util.stream.Collectors;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.plan.ExplainInfo;
+import org.apache.pinot.core.query.killing.QueryKillingManager;
+import org.apache.pinot.spi.exception.TerminationException;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.trace.InvocationScope;
 import org.apache.pinot.spi.trace.Tracing;
@@ -55,10 +59,39 @@ public abstract class BaseOperator<T extends Block> 
implements Operator<T> {
 
   protected void checkTermination() {
     QueryThreadContext.checkTermination(this::getExplainName);
+    checkScanBasedKilling();
   }
 
   protected void checkTerminationAndSampleUsage() {
     QueryThreadContext.checkTerminationAndSampleUsage(this::getExplainName);
+    checkScanBasedKilling();
+  }
+
+  private void checkScanBasedKilling() {
+    QueryKillingManager killingManager = QueryKillingManager.getInstance();
+    if (killingManager == null) {
+      return;
+    }
+    QueryThreadContext ctx = QueryThreadContext.getIfAvailable();
+    if (ctx == null) {
+      return;
+    }
+    QueryExecutionContext execCtx = ctx.getExecutionContext();
+    QueryScanCostContext scanCost = execCtx.getQueryScanCostContext();
+    if (scanCost == null) {
+      return;
+    }
+    killingManager.checkAndKillIfNeeded(execCtx, scanCost);
+    TerminationException te = execCtx.getTerminateException();
+    if (te != null) {
+      throw te;
+    }
+  }
+
+  @javax.annotation.Nullable
+  protected static QueryScanCostContext getScanCostContext() {
+    QueryThreadContext ctx = QueryThreadContext.getIfAvailable();
+    return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : 
null;
   }
 
   protected List<ExplainInfo> getChildrenExplainInfo() {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
index f7d87453a0f..c96936c3682 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
@@ -28,6 +28,7 @@ import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
 import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 import org.apache.pinot.spi.query.QueryThreadContext;
 
 
@@ -49,6 +50,7 @@ public class DocIdSetOperator extends BaseDocIdSetOperator {
   private BlockDocIdSet _blockDocIdSet;
   private BlockDocIdIterator _blockDocIdIterator;
   private int _currentDocId = 0;
+  private long _lastReportedEntriesScanned = 0;
 
   public DocIdSetOperator(BaseFilterOperator filterOperator, int 
maxSizeOfDocIdSet) {
     Preconditions.checkArgument(maxSizeOfDocIdSet > 0 && maxSizeOfDocIdSet <= 
DocIdSetPlanNode.MAX_DOC_PER_CALL);
@@ -80,6 +82,16 @@ public class DocIdSetOperator extends BaseDocIdSetOperator {
       docIds[pos++] = _currentDocId;
     }
     if (pos > 0) {
+      // Push scan cost delta for proactive query killing
+      QueryScanCostContext scanCost = getScanCostContext();
+      if (scanCost != null) {
+        long currentTotal = _blockDocIdSet.getNumEntriesScannedInFilter();
+        long delta = currentTotal - _lastReportedEntriesScanned;
+        if (delta > 0) {
+          scanCost.addEntriesScannedInFilter(delta);
+          _lastReportedEntriesScanned = currentTotal;
+        }
+      }
       return new DocIdSetBlock(docIds, pos);
     } else {
       return null;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
index 31ef246eb32..5035f6ea63c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
@@ -35,6 +35,7 @@ import 
org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils.AggregationInfo;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 
 
 /**
@@ -72,6 +73,12 @@ public class AggregationOperator extends 
BaseOperator<AggregationResultsBlock> {
     ValueBlock valueBlock;
     while ((valueBlock = _projectOperator.nextBlock()) != null) {
       _numDocsScanned += valueBlock.getNumDocs();
+      QueryScanCostContext scanCost = getScanCostContext();
+      if (scanCost != null) {
+        scanCost.addDocsScanned(valueBlock.getNumDocs());
+        scanCost.addEntriesScannedPostFilter(
+            (long) valueBlock.getNumDocs() * 
_projectOperator.getNumColumnsProjected());
+      }
       aggregationExecutor.aggregate(valueBlock);
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
index 70b51ca458d..93c00d76bde 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
@@ -33,6 +33,7 @@ import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.core.query.distinct.DistinctExecutorFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 
 
 /**
@@ -60,6 +61,12 @@ public class DistinctOperator extends 
BaseOperator<DistinctResultsBlock> {
     ValueBlock valueBlock;
     while ((valueBlock = _projectOperator.nextBlock()) != null) {
       _numDocsScanned += valueBlock.getNumDocs();
+      QueryScanCostContext scanCost = getScanCostContext();
+      if (scanCost != null) {
+        scanCost.addDocsScanned(valueBlock.getNumDocs());
+        scanCost.addEntriesScannedPostFilter(
+            (long) valueBlock.getNumDocs() * 
_projectOperator.getNumColumnsProjected());
+      }
       if (executor.process(valueBlock)) {
         break;
       }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
index 21697367a5a..bfc56b02522 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
@@ -35,6 +35,7 @@ import 
org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils.AggregationInfo;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 
 
 /**
@@ -95,6 +96,12 @@ public class FilteredAggregationOperator extends 
BaseOperator<AggregationResults
         result[resultIndexMap.get(aggregationFunctions[i])] = 
filteredResult.get(i);
       }
       _numDocsScanned += numDocsScanned;
+      QueryScanCostContext scanCost = getScanCostContext();
+      if (scanCost != null) {
+        scanCost.addDocsScanned(numDocsScanned);
+        scanCost.addEntriesScannedPostFilter(
+            (long) numDocsScanned * projectOperator.getNumColumnsProjected());
+      }
       _numEntriesScannedInFilter += 
projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
       _numEntriesScannedPostFilter += (long) numDocsScanned * 
projectOperator.getNumColumnsProjected();
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
index d0555b6e39c..0cb56dfe5eb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
@@ -48,6 +48,7 @@ import 
org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 import org.apache.pinot.spi.trace.Tracing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,6 +161,12 @@ public class FilteredGroupByOperator extends 
BaseOperator<GroupByResultsBlock> {
       }
 
       _numDocsScanned += numDocsScanned;
+      QueryScanCostContext scanCost = getScanCostContext();
+      if (scanCost != null) {
+        scanCost.addDocsScanned(numDocsScanned);
+        scanCost.addEntriesScannedPostFilter(
+            (long) numDocsScanned * projectOperator.getNumColumnsProjected());
+      }
       _numEntriesScannedInFilter += 
projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
       _numEntriesScannedPostFilter += (long) numDocsScanned * 
projectOperator.getNumColumnsProjected();
       GroupByResultHolder[] filterGroupByResults = 
groupByExecutor.getGroupByResultHolders();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
index bd6f58095f3..3465484b652 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
@@ -42,6 +42,7 @@ import 
org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 import org.apache.pinot.spi.trace.Tracing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -119,6 +120,12 @@ public class GroupByOperator extends 
BaseOperator<GroupByResultsBlock> {
 
     while ((valueBlock = _projectOperator.nextBlock()) != null) {
       _numDocsScanned += valueBlock.getNumDocs();
+      QueryScanCostContext scanCost = getScanCostContext();
+      if (scanCost != null) {
+        scanCost.addDocsScanned(valueBlock.getNumDocs());
+        scanCost.addEntriesScannedPostFilter(
+            (long) valueBlock.getNumDocs() * 
_projectOperator.getNumColumnsProjected());
+      }
       groupByExecutor.process(valueBlock);
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
index 8527f9dc032..6d7d6bac9d1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
@@ -38,6 +38,7 @@ import 
org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 import org.roaringbitmap.RoaringBitmap;
 
 
@@ -124,6 +125,12 @@ public class SelectionOnlyOperator extends 
BaseOperator<SelectionResultsBlock> {
       int numDocsToAdd = Math.min(_numRowsToKeep - _rows.size(), 
valueBlock.getNumDocs());
       _rows.ensureCapacity(_rows.size() + numDocsToAdd);
       _numDocsScanned += numDocsToAdd;
+      QueryScanCostContext scanCost = getScanCostContext();
+      if (scanCost != null) {
+        scanCost.addDocsScanned(numDocsToAdd);
+        scanCost.addEntriesScannedPostFilter(
+            (long) numDocsToAdd * _projectOperator.getNumColumnsProjected());
+      }
       if (_nullHandlingEnabled) {
         for (int i = 0; i < numExpressions; i++) {
           _nullBitmaps[i] = _blockValSets[i].getNullBitmap();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index 5db3e914930..d088d721abd 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -52,6 +52,7 @@ import 
org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 import org.roaringbitmap.RoaringBitmap;
 
 
@@ -189,6 +190,11 @@ public class SelectionOrderByOperator extends 
BaseOperator<SelectionResultsBlock
         }
       }
       _numDocsScanned += numDocsFetched;
+      QueryScanCostContext scanCost = getScanCostContext();
+      if (scanCost != null) {
+        scanCost.addDocsScanned(numDocsFetched);
+        scanCost.addEntriesScannedPostFilter((long) numDocsFetched * 
numColumnsProjected);
+      }
     }
     _numEntriesScannedPostFilter = (long) _numDocsScanned * 
numColumnsProjected;
 
@@ -253,6 +259,11 @@ public class SelectionOrderByOperator extends 
BaseOperator<SelectionResultsBlock
         }
       }
       _numDocsScanned += numDocsFetched;
+      QueryScanCostContext scanCost2 = getScanCostContext();
+      if (scanCost2 != null) {
+        scanCost2.addDocsScanned(numDocsFetched);
+        scanCost2.addEntriesScannedPostFilter((long) numDocsFetched * 
numColumnsProjected);
+      }
     }
     _numEntriesScannedPostFilter = (long) _numDocsScanned * 
numColumnsProjected;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
index fb318d85be3..df4f624dedf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
@@ -30,6 +30,7 @@ import org.apache.pinot.core.operator.BaseProjectOperator;
 import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 
 
 /**
@@ -71,6 +72,12 @@ public class SelectionPartiallyOrderedByDescOperation 
extends LinearSelectionOrd
       IntFunction<Object[]> rowFetcher = fetchBlock(valueBlock, blockValSets);
       int numDocsFetched = valueBlock.getNumDocs();
       _numDocsScanned += numDocsFetched;
+      QueryScanCostContext scanCost = getScanCostContext();
+      if (scanCost != null) {
+        scanCost.addDocsScanned(numDocsFetched);
+        scanCost.addEntriesScannedPostFilter(
+            (long) numDocsFetched * _projectOperator.getNumColumnsProjected());
+      }
       ListBuilder listBuilder = listBuilderSupplier.get();
 
       // first, calculate the best rows on this block
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
index eaedac0003c..57237fc1ebc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
@@ -28,6 +28,7 @@ import org.apache.pinot.core.operator.DocIdOrderedOperator;
 import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 
 
 /**
@@ -70,6 +71,12 @@ public class SelectionPartiallyOrderedByLinearOperator 
extends LinearSelectionOr
       IntFunction<Object[]> rowFetcher = fetchBlock(valueBlock, blockValSets);
       int numDocsFetched = valueBlock.getNumDocs();
       _numDocsScanned += numDocsFetched;
+      QueryScanCostContext scanCost = getScanCostContext();
+      if (scanCost != null) {
+        scanCost.addDocsScanned(numDocsFetched);
+        scanCost.addEntriesScannedPostFilter(
+            (long) numDocsFetched * _projectOperator.getNumColumnsProjected());
+      }
       for (int i = 0; i < numDocsFetched; i++) {
         if (listBuilder.add(rowFetcher.apply(i))) {
           return listBuilder.build();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
index bb5171c44d7..9c498a26716 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 import org.roaringbitmap.RoaringBitmap;
 
 
@@ -122,6 +123,12 @@ public class StreamingSelectionOnlyOperator extends 
BaseOperator<SelectionResult
       }
     }
     _numDocsScanned += numDocs;
+    QueryScanCostContext scanCost = getScanCostContext();
+    if (scanCost != null) {
+      scanCost.addDocsScanned(numDocs);
+      scanCost.addEntriesScannedPostFilter(
+          (long) numDocs * _projectOperator.getNumColumnsProjected());
+    }
     return new SelectionResultsBlock(_dataSchema, rows, _queryContext);
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 6e1ef3c1509..ca6c6958d5b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -59,6 +59,8 @@ import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.maker.PlanMaker;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.config.SegmentPrunerConfig;
+import org.apache.pinot.core.query.killing.QueryKillingManager;
+import org.apache.pinot.core.query.killing.QueryKillingStrategy;
 import org.apache.pinot.core.query.pruner.SegmentPrunerService;
 import org.apache.pinot.core.query.pruner.SegmentPrunerStatistics;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -68,17 +70,23 @@ import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUt
 import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.util.trace.TraceContext;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.QueryCancelledException;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryScanCostContext;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.trace.Tracer;
 import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -176,6 +184,9 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
 
     queryContext.setEnablePrefetch(_enablePrefetch);
 
+    // Initialize scan-based query killing for this query
+    initScanBasedKilling(queryRequest, tableNameWithType);
+
     // Query scheduler wait time already exceeds query timeout, directly return
     long querySchedulingTimeMs = System.currentTimeMillis() - 
queryArrivalTimeMs;
     if (querySchedulingTimeMs >= queryTimeoutMs) {
@@ -520,6 +531,50 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     }
   }
 
+  /**
+   * Initializes scan-based query killing for this query. Sets up a {@link 
QueryScanCostContext}
+   * on the current thread's {@link QueryExecutionContext} so operators can 
push scan deltas, and
+   * caches the resolved per-query strategy so table-level overrides are 
applied only once.
+   */
+  private void initScanBasedKilling(ServerQueryRequest queryRequest, String 
tableNameWithType) {
+    QueryKillingManager killingManager = QueryKillingManager.getInstance();
+    if (killingManager == null) {
+      return;
+    }
+    QueryThreadContext ctx = QueryThreadContext.getIfAvailable();
+    if (ctx == null) {
+      return;
+    }
+    QueryExecutionContext execCtx = ctx.getExecutionContext();
+    execCtx.setTableName(tableNameWithType);
+    execCtx.setQueryId(queryRequest.getQueryId());
+    execCtx.setQueryScanCostContext(new QueryScanCostContext());
+
+    // Resolve and cache per-query strategy (applies table-level threshold 
overrides)
+    QueryConfig queryConfig = null;
+    TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(tableNameWithType);
+    if (tableDataManager != null) {
+      TableConfig tableConfig = 
tableDataManager.getCachedTableConfigAndSchema().getLeft();
+      if (tableConfig != null) {
+        queryConfig = tableConfig.getQueryConfig();
+      }
+    }
+    QueryKillingStrategy queryStrategy = 
killingManager.resolveQueryStrategy(queryConfig);
+    if (queryStrategy != null) {
+      execCtx.setCachedKillingStrategy(queryStrategy);
+    }
+    // Resolve and store per-table kill mode override (null = use cluster mode)
+    if (queryConfig != null && queryConfig.getScanKillingMode() != null) {
+      ScanKillingMode tableMode = 
ScanKillingMode.fromConfigValue(queryConfig.getScanKillingMode());
+      if (tableMode != null) {
+        execCtx.setEffectiveScanKillingMode(tableMode);
+      } else {
+        LOGGER.warn("Invalid scanKillingMode '{}' in QueryConfig for table {}, 
falling back to cluster mode",
+            queryConfig.getScanKillingMode(), tableNameWithType);
+      }
+    }
+  }
+
   private void addPrunerStats(InstanceResponseBlock instanceResponse, 
SegmentPrunerStatistics prunerStats) {
     
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
         String.valueOf(prunerStats.getInvalidSegments()));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
index aa363d4d360..fd596ae86fa 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java
@@ -73,10 +73,10 @@ public class CompositeQueryKillingStrategy implements 
QueryKillingStrategy {
 
   @Override
   public QueryKillReport buildKillReport(QueryScanCostContext ctx,
-      String queryId, String tableName, String configSource) {
+      long requestId, String queryId, String tableName, String configSource) {
     for (QueryKillingStrategy s : _strategies) {
       if (s.shouldTerminate(ctx)) {
-        return s.buildKillReport(ctx, queryId, tableName, configSource);
+        return s.buildKillReport(ctx, requestId, queryId, tableName, 
configSource);
       }
     }
     throw new IllegalStateException("buildKillReport called but no strategy 
triggered");
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
index d44d873188d..f9865831424 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java
@@ -25,6 +25,7 @@ import org.apache.pinot.spi.query.QueryScanCostContext;
  * Immutable snapshot of a query-kill event
  */
 public final class QueryKillReport {
+  private final long _requestId;
   private final String _queryId;
   private final String _tableName;
   private final String _strategyName;
@@ -40,6 +41,7 @@ public final class QueryKillReport {
   /**
    * Creates a {@code QueryKillReport} by snapshotting the current state of 
{@code context}.
    *
+   * @param requestId        server request ID for tracing
    * @param queryId          unique identifier of the killed query
    * @param tableName        fully-qualified table name (e.g. {@code 
myTable_OFFLINE})
    * @param strategyName     name of the kill strategy that triggered the kill
@@ -49,8 +51,9 @@ public final class QueryKillReport {
    * @param configSource     source of the threshold config (e.g. {@code 
TABLE_CONFIG})
    * @param context          live scan-cost context; values are snapshotted 
immediately
    */
-  public QueryKillReport(String queryId, String tableName, String 
strategyName, String triggeringMetric,
+  public QueryKillReport(long requestId, String queryId, String tableName, 
String strategyName, String triggeringMetric,
       long actualValue, long thresholdValue, String configSource, 
QueryScanCostContext context) {
+    _requestId = requestId;
     _queryId = queryId;
     _tableName = tableName;
     _strategyName = strategyName;
@@ -67,6 +70,10 @@ public final class QueryKillReport {
 
   // ----- Getters -----
 
+  public long getRequestId() {
+    return _requestId;
+  }
+
   public String getQueryId() {
     return _queryId;
   }
@@ -121,13 +128,13 @@ public final class QueryKillReport {
    */
   public String toCustomerMessage() {
     return String.format(
-        "Query '%s' on table '%s' was killed because '%s' (%,d) exceeded the 
threshold (%,d) "
+        "Query '%s' (requestId=%d) on table '%s' was killed because '%s' (%,d) 
exceeded the threshold (%,d) "
             + "configured in %s. "
             + "At kill time: entriesScannedInFilter=%,d, docsScanned=%,d, "
             + "entriesScannedPostFilter=%,d, elapsedMs=%d. "
             + "To reduce scan cost, consider adding a missing index (e.g. 
inverted or range index) "
             + "on the filter columns.",
-        _queryId, _tableName, _triggeringMetric, _actualValue, 
_thresholdValue, _configSource,
+        _queryId, _requestId, _tableName, _triggeringMetric, _actualValue, 
_thresholdValue, _configSource,
         _snapshotEntriesScannedInFilter, _snapshotDocsScanned, 
_snapshotEntriesScannedPostFilter, _elapsedTimeMs);
   }
 
@@ -138,10 +145,10 @@ public final class QueryKillReport {
    */
   public String toInternalLogMessage() {
     return String.format(
-        "QUERY_KILLED queryId=%s table=%s strategy=%s metric=%s actual=%d 
threshold=%d "
+        "QUERY_KILLED requestId=%d queryId=%s table=%s strategy=%s metric=%s 
actual=%d threshold=%d "
             + "configSource=%s entriesScannedInFilter=%d docsScanned=%d "
             + "entriesScannedPostFilter=%d elapsedMs=%d",
-        _queryId, _tableName, _strategyName, _triggeringMetric, _actualValue, 
_thresholdValue,
+        _requestId, _queryId, _tableName, _strategyName, _triggeringMetric, 
_actualValue, _thresholdValue,
         _configSource, _snapshotEntriesScannedInFilter, _snapshotDocsScanned,
         _snapshotEntriesScannedPostFilter, _elapsedTimeMs);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
index ee8f0678113..6fe5c8b9e14 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java
@@ -18,15 +18,22 @@
  */
 package org.apache.pinot.core.query.killing;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.accounting.QueryMonitorConfig;
 import 
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
 import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.query.QueryExecutionContext;
 import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,12 +42,12 @@ import org.slf4j.LoggerFactory;
  * Central manager for scan-based query killing. Owns the guard rails and 
delegates the
  * actual kill decision to a {@link QueryKillingStrategy}.
  *
- * The default factory is {@link ScanEntriesThresholdStrategy.Factory}, which 
reads
- * scan thresholds from {@link QueryMonitorConfig}. Custom factories can be 
configured
- * via {@code accounting.scan.based.killing.strategy.factory.class.name}.
+ * <p>The strategy is built once at init via a {@link 
QueryKillingStrategyFactory} and rebuilt when
+ * cluster config changes via {@link #onChange}. The default factory is
+ * {@link ScanEntriesThresholdStrategy.Factory}.</p>
  *
  */
-public class QueryKillingManager {
+public class QueryKillingManager implements PinotClusterConfigChangeListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryKillingManager.class);
 
   private static volatile QueryKillingManager _instance;
@@ -64,10 +71,14 @@ public class QueryKillingManager {
    * Initializes the singleton instance and builds the strategy from config.
    * Called once during server startup.
    */
-  public static void init(AtomicReference<QueryMonitorConfig> configRef, 
ServerMetrics serverMetrics) {
+  public static QueryKillingManager init(PinotConfiguration schedulerConfig, 
ServerMetrics serverMetrics) {
+    long maxHeapSize = Runtime.getRuntime().maxMemory();
+    QueryMonitorConfig config = new QueryMonitorConfig(schedulerConfig, 
maxHeapSize);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
     QueryKillingManager manager = new QueryKillingManager(configRef, 
serverMetrics);
     manager.rebuildStrategy();
     _instance = manager;
+    return manager;
   }
 
   @Nullable
@@ -129,6 +140,68 @@ public class QueryKillingManager {
     return _strategy;
   }
 
+  /**
+   * Handles ZK cluster config changes. Rebuilds the {@link 
QueryMonitorConfig} from the delta
+   * and refreshes the killing strategy if scan-killing-related keys changed.
+   *
+   * <p>Raw ZK keys arrive with the full {@value 
CommonConstants#PINOT_QUERY_SCHEDULER_PREFIX}
+   * prefix. We strip it before passing to {@link QueryMonitorConfig}, 
matching the key space
+   * the init constructor uses (which reads from a config already subsetted to 
that prefix).</p>
+   */
+  @Override
+  public synchronized void onChange(Set<String> changedConfigs, Map<String, 
String> clusterConfigs) {
+    String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + ".";
+    int prefixLen = prefix.length();
+
+    Set<String> filteredChangedConfigs = new HashSet<>();
+    for (String key : changedConfigs) {
+      if (key.startsWith(prefix)) {
+        filteredChangedConfigs.add(key.substring(prefixLen));
+      }
+    }
+
+    if (filteredChangedConfigs.isEmpty()) {
+      return;
+    }
+
+    Map<String, String> filteredClusterConfigs = new HashMap<>();
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      if (entry.getKey().startsWith(prefix)) {
+        filteredClusterConfigs.put(entry.getKey().substring(prefixLen), 
entry.getValue());
+      }
+    }
+
+    QueryMonitorConfig oldConfig = _configRef.get();
+    QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig, 
filteredChangedConfigs, filteredClusterConfigs);
+    _configRef.set(newConfig);
+    rebuildStrategy();
+    LOGGER.info("Scan-based killing config updated: mode={}, 
maxEntriesScannedInFilter={}, "
+            + "maxDocsScanned={}, maxEntriesScannedPostFilter={}",
+        newConfig.getScanBasedKillingMode(),
+        newConfig.getScanBasedKillingMaxEntriesScannedInFilter(),
+        newConfig.getScanBasedKillingMaxDocsScanned(),
+        newConfig.getScanBasedKillingMaxEntriesScannedPostFilter());
+  }
+
+  /**
+   * Convenience overload called from {@link 
org.apache.pinot.core.operator.BaseOperator#checkTermination()}.
+   * Reads query context (table name, query id, cached strategy) from the 
execution context.
+   */
+  public void checkAndKillIfNeeded(QueryExecutionContext executionContext, 
QueryScanCostContext scanCostContext) {
+    Object cached = executionContext.getCachedKillingStrategy();
+    QueryKillingStrategy cachedStrategy;
+    if (cached instanceof QueryKillingStrategy) {
+      cachedStrategy = (QueryKillingStrategy) cached;
+    } else {
+      if (cached != null) {
+        LOGGER.warn("Unexpected cached killing strategy type: {}", 
cached.getClass().getName());
+      }
+      cachedStrategy = null;
+    }
+    checkAndKillIfNeeded(executionContext, scanCostContext, cachedStrategy,
+        executionContext.getQueryId(), executionContext.getTableName());
+  }
+
   /**
    * Evaluates whether the query should be killed based on the active strategy.
    *
@@ -138,7 +211,6 @@ public class QueryKillingManager {
   public void checkAndKillIfNeeded(QueryExecutionContext executionContext,
       QueryScanCostContext scanCostContext, String queryId, String tableName,
       @Nullable QueryConfig queryConfig) {
-    // no strategy means killing is disabled or unconfigured
     QueryKillingStrategy strategy = _strategy;
     if (strategy == null) {
       return;
@@ -149,37 +221,109 @@ public class QueryKillingManager {
       return;
     }
 
-    // Prevent duplicate kills
     if (executionContext.getTerminateException() != null) {
       return;
     }
 
     try {
-      // Resolve per-query table overrides (returns same instance if no 
overrides)
       QueryKillingStrategy queryStrategy = strategy.forQuery(queryConfig, 
config);
-
       String configSource = (queryStrategy != strategy) ? "table:" + tableName 
: "cluster";
+      checkAndKillWithStrategy(executionContext, scanCostContext, 
queryStrategy, configSource, queryId, tableName,
+          config);
+    } catch (Exception e) {
+      LOGGER.error("Error in scan-based killing evaluation for query {}", 
queryId, e);
+      emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_ERROR, tableName);
+    }
+  }
 
-      // Delegate to strategy
-      if (!queryStrategy.shouldTerminate(scanCostContext)) {
-        return;
-      }
+  /**
+   * Resolves a per-query strategy (applying table-level overrides from {@code 
queryConfig}).
+   * Returns null if killing is disabled. Caches the resolved strategy on the 
execution context.
+   */
+  @Nullable
+  public QueryKillingStrategy resolveQueryStrategy(@Nullable QueryConfig 
queryConfig) {
+    QueryKillingStrategy strategy = _strategy;
+    if (strategy == null) {
+      return null;
+    }
+    QueryMonitorConfig config = _configRef.get();
+    if (config == null || !config.isScanBasedKillingEnabled()) {
+      return null;
+    }
+    return strategy.forQuery(queryConfig, config);
+  }
 
-      QueryKillReport report = queryStrategy.buildKillReport(
-          scanCostContext, queryId, tableName, configSource);
+  private void checkAndKillIfNeeded(QueryExecutionContext executionContext, 
QueryScanCostContext scanCostContext,
+      @Nullable QueryKillingStrategy cachedStrategy, @Nullable String queryId, 
@Nullable String tableName) {
+    QueryKillingStrategy currentStrategy = _strategy;
+    QueryKillingStrategy strategy = cachedStrategy != null ? cachedStrategy : 
currentStrategy;
+    if (strategy == null) {
+      return;
+    }
+    QueryMonitorConfig config = _configRef.get();
+    if (config == null || !config.isScanBasedKillingEnabled()) {
+      return;
+    }
+    if (executionContext.getTerminateException() != null) {
+      return;
+    }
+    String resolvedQueryId = queryId != null ? queryId : "unknown";
+    String resolvedTableName = tableName != null ? tableName : "unknown";
+    String configSource = (cachedStrategy != null && cachedStrategy != 
currentStrategy) ? "table:" + resolvedTableName
+        : "cluster";
+    try {
+      checkAndKillWithStrategy(executionContext, scanCostContext, strategy, 
configSource, resolvedQueryId,
+          resolvedTableName, config);
+    } catch (Exception e) {
+      LOGGER.error("Error in scan-based killing evaluation for query {}", 
resolvedQueryId, e);
+      emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_ERROR, resolvedTableName);
+    }
+  }
 
-      if (config.isScanBasedKillingLogOnly()) {
-        LOGGER.info("Query killed in LogOnly mode: {}", 
report.toInternalLogMessage());
-        
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 
1);
+  private void checkAndKillWithStrategy(QueryExecutionContext 
executionContext, QueryScanCostContext scanCostContext,
+      QueryKillingStrategy queryStrategy, String configSource, String queryId, 
String tableName,
+      QueryMonitorConfig config) {
+    if (!queryStrategy.shouldTerminate(scanCostContext)) {
+      return;
+    }
+    // Resolve effective mode: per-table override takes precedence over 
cluster config
+    CommonConstants.Accounting.ScanKillingMode effectiveMode = 
executionContext.getEffectiveScanKillingMode();
+    if (effectiveMode == CommonConstants.Accounting.ScanKillingMode.DISABLED) {
+      return;
+    }
+    boolean logOnly = effectiveMode == 
CommonConstants.Accounting.ScanKillingMode.LOG_ONLY
+        || (effectiveMode == null && config.isScanBasedKillingLogOnly());
+    if (logOnly) {
+      // only the first observer for this query logs the dry-run line and
+      // emits the metric; subsequent observers no-op
+      if (!executionContext.markScanKillingDryRunEmitted()) {
         return;
       }
-
+      long requestId = executionContext.getRequestId();
+      QueryKillReport report = queryStrategy.buildKillReport(scanCostContext, 
requestId, queryId, tableName,
+          configSource);
+      LOGGER.info("Query killed in LogOnly mode: {}", 
report.toInternalLogMessage());
+      emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 
report.getTableName());
+      return;
+    }
+    long requestId = executionContext.getRequestId();
+    QueryKillReport report = queryStrategy.buildKillReport(scanCostContext, 
requestId, queryId, tableName,
+        configSource);
+    if (executionContext.terminate(queryStrategy.getErrorCode(), 
report.toCustomerMessage())) {
       LOGGER.warn("Query Killed in enforce mode: {}", 
report.toInternalLogMessage());
-      executionContext.terminate(queryStrategy.getErrorCode(), 
report.toCustomerMessage());
-      _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1);
-    } catch (Exception e) {
-      LOGGER.error("Error in scan-based killing evaluation for query {}", 
queryId, e);
-      
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR, 1);
+      emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN, report.getTableName());
+    }
+  }
+
+  /**
+   * Emits a kill metric per-table when the table name is known, falling back 
to global emission
+   * when it is not.
+   */
+  private void emitKillMetric(ServerMeter meter, @Nullable String tableName) {
+    if (tableName != null && !tableName.isEmpty() && 
!"unknown".equals(tableName)) {
+      _serverMetrics.addMeteredTableValue(tableName, meter, 1);
+    } else {
+      _serverMetrics.addMeteredGlobalValue(meter, 1);
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
index 4bf4390d59e..c5e1248689f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java
@@ -47,7 +47,7 @@ public interface QueryKillingStrategy {
    * Only called when {@link #shouldTerminate} returns true.
    */
   QueryKillReport buildKillReport(QueryScanCostContext context,
-      String queryId, String tableName, String configSource);
+      long requestId, String queryId, String tableName, String configSource);
 
   /** Error code for the termination response. */
   default QueryErrorCode getErrorCode() {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
index 92b13dcdc60..12adba92f13 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java
@@ -49,10 +49,17 @@ public class ScanEntriesThresholdStrategy implements 
QueryKillingStrategy {
 
   private final long _maxEntriesScannedInFilter;
   private final long _maxDocsScanned;
+  private final long _maxEntriesScannedPostFilter;
 
   public ScanEntriesThresholdStrategy(long maxEntriesScannedInFilter, long 
maxDocsScanned) {
+    this(maxEntriesScannedInFilter, maxDocsScanned, Long.MAX_VALUE);
+  }
+
+  public ScanEntriesThresholdStrategy(long maxEntriesScannedInFilter, long 
maxDocsScanned,
+      long maxEntriesScannedPostFilter) {
     _maxEntriesScannedInFilter = maxEntriesScannedInFilter;
     _maxDocsScanned = maxDocsScanned;
+    _maxEntriesScannedPostFilter = maxEntriesScannedPostFilter;
   }
 
   @Override
@@ -60,12 +67,14 @@ public class ScanEntriesThresholdStrategy implements 
QueryKillingStrategy {
     return (_maxEntriesScannedInFilter < Long.MAX_VALUE
             && ctx.getNumEntriesScannedInFilter() > _maxEntriesScannedInFilter)
         || (_maxDocsScanned < Long.MAX_VALUE
-            && ctx.getNumDocsScanned() > _maxDocsScanned);
+            && ctx.getNumDocsScanned() > _maxDocsScanned)
+        || (_maxEntriesScannedPostFilter < Long.MAX_VALUE
+            && ctx.getNumEntriesScannedPostFilter() > 
_maxEntriesScannedPostFilter);
   }
 
   @Override
   public QueryKillReport buildKillReport(QueryScanCostContext ctx,
-      String queryId, String tableName, String configSource) {
+      long requestId, String queryId, String tableName, String configSource) {
     String triggeringMetric;
     long actualValue;
     long thresholdValue;
@@ -74,12 +83,17 @@ public class ScanEntriesThresholdStrategy implements 
QueryKillingStrategy {
       triggeringMetric = "numEntriesScannedInFilter";
       actualValue = ctx.getNumEntriesScannedInFilter();
       thresholdValue = _maxEntriesScannedInFilter;
-    } else {
+    } else if (_maxDocsScanned < Long.MAX_VALUE
+        && ctx.getNumDocsScanned() > _maxDocsScanned) {
       triggeringMetric = "numDocsScanned";
       actualValue = ctx.getNumDocsScanned();
       thresholdValue = _maxDocsScanned;
+    } else {
+      triggeringMetric = "numEntriesScannedPostFilter";
+      actualValue = ctx.getNumEntriesScannedPostFilter();
+      thresholdValue = _maxEntriesScannedPostFilter;
     }
-    return new QueryKillReport(queryId, tableName, STRATEGY_NAME,
+    return new QueryKillReport(requestId, queryId, tableName, STRATEGY_NAME,
         triggeringMetric, actualValue, thresholdValue, configSource, ctx);
   }
 
@@ -101,12 +115,14 @@ public class ScanEntriesThresholdStrategy implements 
QueryKillingStrategy {
     }
     Long tableEntries = queryConfig.getMaxEntriesScannedInFilter();
     Long tableDocs = queryConfig.getMaxDocsScanned();
-    if (tableEntries == null && tableDocs == null) {
+    Long tablePostFilter = queryConfig.getMaxEntriesScannedPostFilter();
+    if (tableEntries == null && tableDocs == null && tablePostFilter == null) {
       return this;
     }
     return new ScanEntriesThresholdStrategy(
         tableEntries != null ? tableEntries : _maxEntriesScannedInFilter,
-        tableDocs != null ? tableDocs : _maxDocsScanned);
+        tableDocs != null ? tableDocs : _maxDocsScanned,
+        tablePostFilter != null ? tablePostFilter : 
_maxEntriesScannedPostFilter);
   }
 
   public long getMaxEntriesScannedInFilter() {
@@ -117,6 +133,10 @@ public class ScanEntriesThresholdStrategy implements 
QueryKillingStrategy {
     return _maxDocsScanned;
   }
 
+  public long getMaxEntriesScannedPostFilter() {
+    return _maxEntriesScannedPostFilter;
+  }
+
   /**
    * Factory that creates a {@link ScanEntriesThresholdStrategy} from
    * {@link QueryMonitorConfig}. This is the default factory used when no 
custom
@@ -134,19 +154,23 @@ public class ScanEntriesThresholdStrategy implements 
QueryKillingStrategy {
     public QueryKillingStrategy create(QueryMonitorConfig config) {
       long maxEntries = config.getScanBasedKillingMaxEntriesScannedInFilter();
       long maxDocs = config.getScanBasedKillingMaxDocsScanned();
+      long maxPostFilter = 
config.getScanBasedKillingMaxEntriesScannedPostFilter();
 
-      if (maxEntries == Long.MAX_VALUE && maxDocs == Long.MAX_VALUE) {
+      if (maxEntries == Long.MAX_VALUE && maxDocs == Long.MAX_VALUE && 
maxPostFilter == Long.MAX_VALUE) {
         LOGGER.warn("Scan-based killing is enabled but no thresholds are 
configured. "
             + "Set at least one of: 
accounting.scan.based.killing.max.entries.scanned.in.filter, "
-            + "accounting.scan.based.killing.max.docs.scanned. "
+            + "accounting.scan.based.killing.max.docs.scanned, "
+            + "accounting.scan.based.killing.max.entries.scanned.post.filter. "
             + "Scan-based killing will be effectively disabled until 
thresholds are set.");
         return null;
       }
 
-      LOGGER.info("Initialized ScanEntriesThresholdStrategy with 
maxEntriesScannedInFilter={}, maxDocsScanned={}",
+      LOGGER.info("Initialized ScanEntriesThresholdStrategy with 
maxEntriesScannedInFilter={}, "
+              + "maxDocsScanned={}, maxEntriesScannedPostFilter={}",
           maxEntries == Long.MAX_VALUE ? "disabled" : maxEntries,
-          maxDocs == Long.MAX_VALUE ? "disabled" : maxDocs);
-      return new ScanEntriesThresholdStrategy(maxEntries, maxDocs);
+          maxDocs == Long.MAX_VALUE ? "disabled" : maxDocs,
+          maxPostFilter == Long.MAX_VALUE ? "disabled" : maxPostFilter);
+      return new ScanEntriesThresholdStrategy(maxEntries, maxDocs, 
maxPostFilter);
     }
 
     @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java
new file mode 100644
index 00000000000..cf180bf85f2
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.operator.BaseProjectOperator;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.core.plan.ProjectPlanNode;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Verifies that query operators correctly push scan cost metrics into {@link 
QueryScanCostContext}
+ */
+public class OperatorScanCostTrackingTest {
+  private static final String RAW_TABLE_NAME = "scanCostTestTable";
+  private static final String SEGMENT_NAME = "scanCostTestSegment";
+  private static final int NUM_ROWS = 100;
+  private static final String COL_INT = "intCol";
+  private static final String COL_STRING = "stringCol";
+  private static final String COL_DOUBLE = "doubleCol";
+
+  private static final TableConfig TABLE_CONFIG =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+  private static final Schema SCHEMA = new Schema.SchemaBuilder()
+      .addSingleValueDimension(COL_INT, FieldSpec.DataType.INT)
+      .addSingleValueDimension(COL_STRING, FieldSpec.DataType.STRING)
+      .addMetric(COL_DOUBLE, FieldSpec.DataType.DOUBLE)
+      .build();
+
+  private File _tempDir;
+  private IndexSegment _segment;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    _tempDir = 
Files.createTempDirectory("OperatorScanCostTrackingTest").toFile();
+
+    List<GenericRow> records = new ArrayList<>(NUM_ROWS);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue(COL_INT, i);
+      row.putValue(COL_STRING, "val_" + (i % 10));
+      row.putValue(COL_DOUBLE, i * 1.5);
+      records.add(row);
+    }
+
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG, 
SCHEMA);
+    config.setTableName(RAW_TABLE_NAME);
+    config.setSegmentName(SEGMENT_NAME);
+    config.setOutDir(_tempDir.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(config, new GenericRowRecordReader(records));
+    driver.build();
+
+    _segment = ImmutableSegmentLoader.load(new File(_tempDir, SEGMENT_NAME), 
ReadMode.mmap);
+  }
+
+  @Test
+  public void testSelectionOnlyOperatorTracksScanCost() {
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT intCol, stringCol, doubleCol FROM scanCostTestTable LIMIT 50");
+
+    try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+      QueryScanCostContext scanCost = new QueryScanCostContext();
+      
QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost);
+
+      List<ExpressionContext> expressions =
+          SelectionOperatorUtils.extractExpressions(queryContext, _segment);
+      BaseProjectOperator<?> projectOperator =
+          new ProjectPlanNode(new SegmentContext(_segment), queryContext, 
expressions,
+              DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+      int numColumnsProjected = projectOperator.getNumColumnsProjected();
+
+      SelectionOnlyOperator operator =
+          new SelectionOnlyOperator(_segment, queryContext, expressions, 
projectOperator);
+      operator.nextBlock();
+
+      long docsScanned = scanCost.getNumDocsScanned();
+      assertTrue(docsScanned > 0, "Should have scanned some docs");
+      assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned * 
numColumnsProjected);
+    }
+  }
+
+  @Test
+  public void testSelectionOrderByOperatorTracksScanCost() {
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT intCol, stringCol FROM scanCostTestTable ORDER BY intCol LIMIT 
10");
+
+    try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+      QueryScanCostContext scanCost = new QueryScanCostContext();
+      
QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost);
+
+      List<ExpressionContext> expressions =
+          SelectionOperatorUtils.extractExpressions(queryContext, _segment);
+      BaseProjectOperator<?> projectOperator =
+          new ProjectPlanNode(new SegmentContext(_segment), queryContext, 
expressions,
+              DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+      int numColumnsProjected = projectOperator.getNumColumnsProjected();
+
+      SelectionOrderByOperator operator =
+          new SelectionOrderByOperator(_segment, queryContext, expressions, 
projectOperator);
+      operator.nextBlock();
+
+      long docsScanned = scanCost.getNumDocsScanned();
+      assertTrue(docsScanned > 0, "Should have scanned some docs");
+      assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned * 
numColumnsProjected);
+    }
+  }
+
+  @Test
+  public void testDistinctOperatorTracksScanCost() {
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT DISTINCT stringCol FROM scanCostTestTable");
+
+    try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+      QueryScanCostContext scanCost = new QueryScanCostContext();
+      
QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost);
+
+      List<ExpressionContext> expressions = 
queryContext.getSelectExpressions();
+      BaseProjectOperator<?> projectOperator =
+          new ProjectPlanNode(new SegmentContext(_segment), queryContext, 
expressions,
+              DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+      int numColumnsProjected = projectOperator.getNumColumnsProjected();
+
+      DistinctOperator operator = new DistinctOperator(_segment, queryContext, 
projectOperator);
+      operator.nextBlock();
+
+      long docsScanned = scanCost.getNumDocsScanned();
+      assertTrue(docsScanned > 0, "Should have scanned some docs");
+      assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned * 
numColumnsProjected);
+    }
+  }
+
+  @Test
+  public void testScanCostIsZeroWhenContextNotSet() {
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT intCol, stringCol FROM scanCostTestTable LIMIT 10");
+
+    try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+      // Intentionally NOT setting QueryScanCostContext — getScanCostContext() 
returns null
+      List<ExpressionContext> expressions =
+          SelectionOperatorUtils.extractExpressions(queryContext, _segment);
+      BaseProjectOperator<?> projectOperator =
+          new ProjectPlanNode(new SegmentContext(_segment), queryContext, 
expressions,
+              DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+
+      SelectionOnlyOperator operator =
+          new SelectionOnlyOperator(_segment, queryContext, expressions, 
projectOperator);
+      operator.nextBlock();
+      // No exception — scan cost tracking is gracefully skipped when context 
is null
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    _segment.destroy();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
index ea48ff49859..6a31a4b8d76 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java
@@ -117,7 +117,7 @@ public class CompositeQueryKillingStrategyTest {
     ctx.addDocsScanned(501);
     assertTrue(composite.shouldTerminate(ctx));
 
-    QueryKillReport report = composite.buildKillReport(ctx, "q1", "t1", 
"cluster");
+    QueryKillReport report = composite.buildKillReport(ctx, 1L, "q1", "t1", 
"cluster");
     assertEquals(report.getTriggeringMetric(), "numDocsScanned");
     assertEquals(report.getActualValue(), 501L);
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
index 054197aeaed..6819a6991be 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java
@@ -38,6 +38,7 @@ public class QueryKillReportTest {
     context.addEntriesScannedPostFilter(200L);
 
     QueryKillReport report = new QueryKillReport(
+        101L,
         "queryId-1",
         "myTable_OFFLINE",
         "EntriesScannedInFilterStrategy",
@@ -66,6 +67,7 @@ public class QueryKillReportTest {
     context.addEntriesScannedPostFilter(50L);
 
     QueryKillReport report = new QueryKillReport(
+        202L,
         "queryId-abc",
         "salesTable_REALTIME",
         "EntriesScannedInFilterStrategy",
@@ -83,6 +85,7 @@ public class QueryKillReportTest {
     assertTrue(msg.contains("1,234,567"), "Should contain actual value with 
commas");
     assertTrue(msg.contains("1,000,000"), "Should contain threshold with 
commas");
     assertTrue(msg.contains("CLUSTER_CONFIG"), "Should contain config source");
+    assertTrue(msg.contains("requestId=202"), "Should contain requestId");
     assertTrue(msg.contains("missing index") || msg.contains("index"),
         "Should include advice about missing indexes");
   }
@@ -95,6 +98,7 @@ public class QueryKillReportTest {
     context.addEntriesScannedPostFilter(150L);
 
     QueryKillReport report = new QueryKillReport(
+        303L,
         "queryId-xyz",
         "ordersTable_OFFLINE",
         "DocsScannedStrategy",
@@ -108,6 +112,7 @@ public class QueryKillReportTest {
     String msg = report.toInternalLogMessage();
 
     assertTrue(msg.startsWith("QUERY_KILLED"), "Should start with QUERY_KILLED 
prefix");
+    assertTrue(msg.contains("requestId=303"), "Should have requestId 
key=value");
     assertTrue(msg.contains("queryId=queryId-xyz"), "Should have queryId 
key=value");
     assertTrue(msg.contains("table=ordersTable_OFFLINE"), "Should have table 
key=value");
     assertTrue(msg.contains("metric=numDocsScanned"), "Should have metric 
key=value");
@@ -124,6 +129,7 @@ public class QueryKillReportTest {
     context.addEntriesScannedPostFilter(25L);
 
     QueryKillReport report = new QueryKillReport(
+        404L,
         "queryId-getters",
         "testTable_OFFLINE",
         "TestStrategy",
@@ -134,6 +140,7 @@ public class QueryKillReportTest {
         context
     );
 
+    assertEquals(report.getRequestId(), 404L);
     assertEquals(report.getQueryId(), "queryId-getters");
     assertEquals(report.getTableName(), "testTable_OFFLINE");
     assertEquals(report.getStrategyName(), "TestStrategy");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
index 5bde697b5a3..394da6926e3 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java
@@ -19,8 +19,15 @@
 package org.apache.pinot.core.query.killing;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.accounting.QueryMonitorConfig;
 import 
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy;
@@ -30,10 +37,17 @@ import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.query.QueryExecutionContext;
 import org.apache.pinot.spi.query.QueryScanCostContext;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -295,7 +309,592 @@ public class QueryKillingManagerTest {
     assertNotNull(manager.getActiveStrategy(), "After config update, strategy 
should be built");
   }
 
-  // --- Test fixtures for pluggable strategy ---
+  // --- onChange (dynamic config reload) ---
+
+  @Test
+  public void testOnChangeRebuildsStrategy() {
+    // Start with killing disabled
+    QueryMonitorConfig disabledConfig = buildConfig("disabled", 100L, 
Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(disabledConfig);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+    assertNull(manager.getActiveStrategy(), "Strategy should be null when 
disabled");
+
+    // Simulate cluster config change enabling killing with enforce mode + 
threshold.
+    // Keys arrive from ZK with full "pinot.query.scheduler." prefix — 
onChange() strips it.
+    String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + ".";
+
+    Set<String> changedKeys = new HashSet<>();
+    changedKeys.add(prefix + 
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE);
+    changedKeys.add(prefix + 
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    clusterConfigs.put(prefix + 
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "enforce");
+    clusterConfigs.put(
+        prefix + 
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
 "500");
+
+    manager.onChange(changedKeys, clusterConfigs);
+    assertNotNull(manager.getActiveStrategy(),
+        "Strategy should be rebuilt after onChange enables killing");
+  }
+
+  @Test
+  public void testOnChangeDisablesStrategy() {
+    // Start with killing enabled
+    QueryMonitorConfig enabledConfig = buildConfig("enforce", 100L, 
Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(enabledConfig);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+    assertNotNull(manager.getActiveStrategy(), "Strategy should be active when 
enabled");
+
+    // Simulate cluster config change to disable killing (full ZK-prefixed 
keys)
+    String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + ".";
+
+    Set<String> changedKeys = new HashSet<>();
+    changedKeys.add(prefix + 
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    clusterConfigs.put(prefix + 
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "disabled");
+
+    manager.onChange(changedKeys, clusterConfigs);
+    assertNull(manager.getActiveStrategy(),
+        "Strategy should be null after onChange disables killing");
+  }
+
+  @Test
+  public void testOnChangeIgnoresIrrelevantKeys() {
+    // Start with killing enabled
+    QueryMonitorConfig enabledConfig = buildConfig("enforce", 100L, 
Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(enabledConfig);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+    assertNotNull(manager.getActiveStrategy(), "Strategy should be active when 
enabled");
+
+    // Simulate a ZK change that only touches non-scheduler keys — should be 
ignored
+    Set<String> changedKeys = new HashSet<>();
+    changedKeys.add("some.unrelated.config");
+    changedKeys.add("helix.rebalance.something");
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    clusterConfigs.put("some.unrelated.config", "value");
+
+    manager.onChange(changedKeys, clusterConfigs);
+    assertNotNull(manager.getActiveStrategy(),
+        "Strategy should remain unchanged when no scheduler keys changed");
+  }
+
+  // --- Convenience overload (2-arg checkAndKillIfNeeded) ---
+
+  @Test
+  public void testConvenienceOverloadKillsViaCachedStrategy() {
+    // Create a manager with killing enabled, threshold = 50 for entries 
scanned
+    QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    // Resolve the per-query strategy and cache it on the execution context
+    QueryKillingStrategy resolvedStrategy = manager.resolveQueryStrategy(null);
+    assertNotNull(resolvedStrategy);
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    execCtx.setTableName("testTable_OFFLINE");
+    execCtx.setQueryId("conv-q1");
+    execCtx.setCachedKillingStrategy(resolvedStrategy);
+
+    // Create scan cost exceeding the threshold
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(100L); // Above threshold of 50
+
+    // Use the 2-arg convenience overload
+    manager.checkAndKillIfNeeded(execCtx, scanCtx);
+    assertNotNull(execCtx.getTerminateException(),
+        "Query should be terminated via cached strategy when threshold is 
exceeded");
+  }
+
+  @Test
+  public void testConvenienceOverloadNullScanCostContextNoOp() {
+    // When the manager's strategy is null (disabled), calling with null 
scanCostContext is a no-op.
+    // In production, BaseOperator guards against null scanCostContext before 
calling the manager.
+    // Here we verify the manager's early-return when disabled.
+    QueryMonitorConfig config = buildConfig("disabled", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    // No queryScanCostContext set on execCtx — mirrors the real scenario where
+    // scan-based killing is not initialized for this query.
+
+    // The 2-arg overload with null scanCostContext is safe when strategy is 
null (disabled)
+    manager.checkAndKillIfNeeded(execCtx, null);
+    assertNull(execCtx.getTerminateException(),
+        "No exception should be set when killing is disabled and 
scanCostContext is null");
+  }
+
+  // --- resolveQueryStrategy ---
+
+  @Test
+  public void testResolveQueryStrategyReturnsNullWhenDisabled() {
+    QueryMonitorConfig config = buildConfig("disabled", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryKillingStrategy resolved = manager.resolveQueryStrategy(null);
+    assertNull(resolved, "resolveQueryStrategy should return null when killing 
is disabled");
+  }
+
+  @Test
+  public void testResolveQueryStrategyReturnsStrategyWhenEnabled() {
+    QueryMonitorConfig config = buildConfig("enforce", 200L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryKillingStrategy resolved = manager.resolveQueryStrategy(null);
+    assertNotNull(resolved, "resolveQueryStrategy should return a strategy 
when killing is enabled");
+    assertTrue(resolved instanceof ScanEntriesThresholdStrategy);
+  }
+
+  @Test
+  public void testTableModeEnforceOverridesClusterLogOnly() {
+    // Cluster is logOnly — normally no kills
+    QueryMonitorConfig config = buildConfig("logOnly", 50L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    // Table override: enforce
+    execCtx.setEffectiveScanKillingMode(ScanKillingMode.ENFORCE);
+    execCtx.setTableName("testTable_OFFLINE");
+    execCtx.setQueryId("tbl-mode-q1");
+
+    QueryKillingStrategy strategy = manager.resolveQueryStrategy(null);
+    assertNotNull(strategy);
+    execCtx.setCachedKillingStrategy(strategy);
+
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx);
+    assertNotNull(execCtx.getTerminateException(),
+        "Table enforce override should cause real termination even when 
cluster is logOnly");
+    assertEquals(execCtx.getTerminateException().getErrorCode(), 
QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED);
+  }
+
+  @Test
+  public void testTableModeLogOnlyOverridesClusterEnforce() {
+    // Cluster is enforce — normally kills
+    QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    // Table override: logOnly — should downgrade to dry-run
+    execCtx.setEffectiveScanKillingMode(ScanKillingMode.LOG_ONLY);
+    execCtx.setTableName("testTable_OFFLINE");
+    execCtx.setQueryId("tbl-mode-q2");
+
+    QueryKillingStrategy strategy = manager.resolveQueryStrategy(null);
+    assertNotNull(strategy);
+    execCtx.setCachedKillingStrategy(strategy);
+
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx);
+    assertNull(execCtx.getTerminateException(),
+        "Table logOnly override should prevent real kill even when cluster is 
enforce");
+  }
+
+  @Test
+  public void testTableModeDisabledOverridesClusterEnforce() {
+    // Cluster is enforce — normally kills
+    QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    // Table override: disabled — fully exempt
+    execCtx.setEffectiveScanKillingMode(ScanKillingMode.DISABLED);
+    execCtx.setTableName("testTable_OFFLINE");
+    execCtx.setQueryId("tbl-mode-q3");
+
+    QueryKillingStrategy strategy = manager.resolveQueryStrategy(null);
+    assertNotNull(strategy);
+    execCtx.setCachedKillingStrategy(strategy);
+
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx);
+    assertNull(execCtx.getTerminateException(),
+        "Table disabled override should fully exempt the table from killing");
+  }
+
+  @Test
+  public void testNoTableModeOverrideFallsBackToCluster() {
+    // Cluster is enforce, no table mode override
+    QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    // No setEffectiveScanKillingMode call — should use cluster mode (enforce)
+    execCtx.setTableName("testTable_OFFLINE");
+    execCtx.setQueryId("tbl-mode-q4");
+
+    QueryKillingStrategy strategy = manager.resolveQueryStrategy(null);
+    assertNotNull(strategy);
+    execCtx.setCachedKillingStrategy(strategy);
+
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx);
+    assertNotNull(execCtx.getTerminateException(),
+        "Without table mode override, cluster enforce mode should kill");
+  }
+
+  // --- Per-table metric emission ---
+
+  @Test
+  public void testEnforceKillEmitsPerTableMetric() {
+    QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L);
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx, "q-metric-1", 
"myTable_REALTIME", null);
+
+    verify(_serverMetrics).addMeteredTableValue("myTable_REALTIME", 
ServerMeter.QUERIES_KILLED_SCAN, 1L);
+    verify(_serverMetrics, 
never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1L);
+  }
+
+  @Test
+  public void testLogOnlyKillEmitsPerTableDryRunMetric() {
+    QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L);
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx, "q-metric-2", 
"myTable_REALTIME", null);
+
+    verify(_serverMetrics).addMeteredTableValue("myTable_REALTIME", 
ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+    verify(_serverMetrics, 
never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+    // logOnly should not actually terminate
+    assertNull(execCtx.getTerminateException());
+  }
+
+  @Test
+  public void testNullTableNameFallsBackToGlobalErrorMetric() {
+    // When the strategy throws inside checkAndKillIfNeeded, the catch block 
emits the error
+    // metric. With a null table name, the helper falls back to global 
emission so we do not
+    // silently drop the error signal.
+    QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    // Force the catch path with a cached strategy that throws inside 
shouldTerminate.
+    execCtx.setCachedKillingStrategy(new QueryKillingStrategy() {
+      @Override
+      public boolean shouldTerminate(QueryScanCostContext context) {
+        throw new RuntimeException("boom");
+      }
+
+      @Override
+      public QueryKillReport buildKillReport(QueryScanCostContext context, 
long requestId,
+          String queryId, String tableName, String configSource) {
+        // unused — shouldTerminate throws first
+        throw new UnsupportedOperationException();
+      }
+    });
+    execCtx.setTableName(null);
+    execCtx.setQueryId("q-metric-null");
+
+    manager.checkAndKillIfNeeded(execCtx, new QueryScanCostContext());
+
+    // Null table name → global fallback emission, never per-table
+    
verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR,
 1L);
+    verify(_serverMetrics, never()).addMeteredTableValue(anyString(),
+        eq(ServerMeter.QUERIES_KILLED_SCAN_ERROR), anyLong());
+  }
+
+  @Test
+  public void testNullTableNameInReportFallsBackToGlobalEnforceMetric() {
+    QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    execCtx.setCachedKillingStrategy(new QueryKillingStrategy() {
+      @Override
+      public boolean shouldTerminate(QueryScanCostContext context) {
+        return true;
+      }
+
+      @Override
+      public QueryKillReport buildKillReport(QueryScanCostContext context, 
long requestId,
+          String queryId, String tableName, String configSource) {
+        // Intentionally drop the table name to exercise the null-fallback path
+        return new QueryKillReport(requestId, queryId, null, "TestStrategy", 
"test", 0, 0,
+            configSource, context);
+      }
+
+      @Override
+      public org.apache.pinot.spi.exception.QueryErrorCode getErrorCode() {
+        return 
org.apache.pinot.spi.exception.QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED;
+      }
+    });
+    execCtx.setTableName(null);
+    execCtx.setQueryId("q-null-report");
+
+    manager.checkAndKillIfNeeded(execCtx, new QueryScanCostContext());
+
+    
verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 
1L);
+    verify(_serverMetrics, never()).addMeteredTableValue(anyString(),
+        eq(ServerMeter.QUERIES_KILLED_SCAN), anyLong());
+  }
+
+  @Test
+  public void testUnknownTableNameSentinelFallsBackToGlobalMetric() {
+    QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L);
+
+    // Use the convenience overload with null tableName → routes through 
"unknown" sentinel
+    execCtx.setTableName(null);
+    execCtx.setQueryId("q-unknown");
+    execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+    manager.checkAndKillIfNeeded(execCtx, scanCtx);
+
+    
verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 
1L);
+    verify(_serverMetrics, never()).addMeteredTableValue(eq("unknown"),
+        eq(ServerMeter.QUERIES_KILLED_SCAN), anyLong());
+  }
+
+  // --- Dry-run emit-once guard ---
+
+  @Test
+  public void testLogOnlyEmitsExactlyOncePerQueryAcrossManyBlockChecks() {
+    // logOnly mode never terminates the query, so without a guard, every 
subsequent block-level
+    // termination check after the threshold is crossed re-builds a report, 
re-logs, and
+    // re-emits the metric. This test simulates 10,000 block checks and 
verifies exactly one
+    // emission — the CAS guard on QueryExecutionContext suppresses the 
duplicates.
+    QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    execCtx.setTableName("dryRunTable_REALTIME");
+    execCtx.setQueryId("q-dry-run-once");
+    execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L); // permanently over threshold
+
+    for (int i = 0; i < 10_000; i++) {
+      manager.checkAndKillIfNeeded(execCtx, scanCtx);
+    }
+
+    // Exactly one per-table dry-run metric emission across 10k block checks
+    verify(_serverMetrics).addMeteredTableValue("dryRunTable_REALTIME",
+        ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+    verify(_serverMetrics, 
never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+    // logOnly never terminates
+    assertNull(execCtx.getTerminateException());
+  }
+
+  @Test
+  public void testEnforceEmitsOncePerQueryAcrossManyBlockChecks() {
+    // Sanity check: enforce mode is already guarded by 
getTerminateException() != null, but the
+    // dry-run CAS must not regress that. Verify enforce still emits exactly 
once.
+    QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    execCtx.setTableName("enforceTable_REALTIME");
+    execCtx.setQueryId("q-enforce-once");
+    execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L);
+
+    for (int i = 0; i < 10_000; i++) {
+      manager.checkAndKillIfNeeded(execCtx, scanCtx);
+    }
+
+    verify(_serverMetrics).addMeteredTableValue("enforceTable_REALTIME",
+        ServerMeter.QUERIES_KILLED_SCAN, 1L);
+    assertNotNull(execCtx.getTerminateException());
+  }
+
+  @Test
+  public void testDryRunGuardIsPerQueryNotGlobal() {
+    // Two independent queries in logOnly mode should each emit once — the CAS 
lives on
+    // QueryExecutionContext, not on the manager or a static.
+    QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L);
+
+    QueryExecutionContext execCtxA = QueryExecutionContext.forSseTest();
+    execCtxA.setTableName("tableA_REALTIME");
+    execCtxA.setQueryId("qA");
+    execCtxA.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+    QueryExecutionContext execCtxB = QueryExecutionContext.forSseTest();
+    execCtxB.setTableName("tableB_REALTIME");
+    execCtxB.setQueryId("qB");
+    execCtxB.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+    for (int i = 0; i < 100; i++) {
+      manager.checkAndKillIfNeeded(execCtxA, scanCtx);
+      manager.checkAndKillIfNeeded(execCtxB, scanCtx);
+    }
+
+    verify(_serverMetrics).addMeteredTableValue("tableA_REALTIME",
+        ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+    verify(_serverMetrics).addMeteredTableValue("tableB_REALTIME",
+        ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+  }
+
+  @Test
+  public void testLogOnlyConcurrentCallersEmitExactlyOnce()
+      throws InterruptedException {
+    // Production reality: BaseOperator.checkTermination() is invoked from 
multiple worker
+    // threads in parallel (one per segment / per block). Without an atomic 
CAS, two threads
+    // racing past the threshold at the same instant could both pass the "not 
yet emitted"
+    // check and double-emit. AtomicBoolean.compareAndSet guarantees exactly 
one winner; this
+    // test fails if the guard is downgraded to a plain or volatile boolean.
+    QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    execCtx.setTableName("concurrentLogOnly_REALTIME");
+    execCtx.setQueryId("q-concurrent-dry-run");
+    execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L);
+
+    int threadCount = 16;
+    int iterationsPerThread = 5_000;
+    ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+    CountDownLatch startBarrier = new CountDownLatch(1);
+    CountDownLatch doneBarrier = new CountDownLatch(threadCount);
+
+    try {
+      for (int t = 0; t < threadCount; t++) {
+        pool.submit(() -> {
+          try {
+            startBarrier.await();
+            for (int i = 0; i < iterationsPerThread; i++) {
+              manager.checkAndKillIfNeeded(execCtx, scanCtx);
+            }
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          } finally {
+            doneBarrier.countDown();
+          }
+        });
+      }
+      startBarrier.countDown();
+      assertTrue(doneBarrier.await(30, TimeUnit.SECONDS), "Workers did not 
complete in time");
+    } finally {
+      pool.shutdownNow();
+    }
+
+    // Even with 16 threads × 5000 iterations = 80,000 concurrent attempts, 
the CAS guarantees
+    // exactly one emission.
+    verify(_serverMetrics, 
times(1)).addMeteredTableValue("concurrentLogOnly_REALTIME",
+        ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+    verify(_serverMetrics, 
never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L);
+    assertNull(execCtx.getTerminateException(), "logOnly must not terminate");
+  }
+
+  @Test
+  public void testEnforceConcurrentCallersEmitExactlyOnce()
+      throws InterruptedException {
+    // Enforce-mode duplicate-emit prevention: terminate() is internally a 
synchronized CAS that
+    // returns true only on the first transition. The manager gates the emit + 
warn log on the
+    // return value, so two threads that both observe (!isTerminated) and 
cross the threshold
+    // before either commits will still result in exactly one emission.
+    QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE);
+    AtomicReference<QueryMonitorConfig> configRef = new 
AtomicReference<>(config);
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
_serverMetrics);
+    manager.rebuildStrategy();
+
+    QueryExecutionContext execCtx = QueryExecutionContext.forSseTest();
+    execCtx.setTableName("concurrentEnforce_REALTIME");
+    execCtx.setQueryId("q-concurrent-enforce");
+    execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null));
+
+    QueryScanCostContext scanCtx = new QueryScanCostContext();
+    scanCtx.addEntriesScannedInFilter(200L);
+
+    int threadCount = 16;
+    int iterationsPerThread = 5_000;
+    ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+    CountDownLatch startBarrier = new CountDownLatch(1);
+    CountDownLatch doneBarrier = new CountDownLatch(threadCount);
+
+    try {
+      for (int t = 0; t < threadCount; t++) {
+        pool.submit(() -> {
+          try {
+            startBarrier.await();
+            for (int i = 0; i < iterationsPerThread; i++) {
+              manager.checkAndKillIfNeeded(execCtx, scanCtx);
+            }
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          } finally {
+            doneBarrier.countDown();
+          }
+        });
+      }
+      startBarrier.countDown();
+      assertTrue(doneBarrier.await(30, TimeUnit.SECONDS), "Workers did not 
complete in time");
+    } finally {
+      pool.shutdownNow();
+    }
+
+    verify(_serverMetrics, 
times(1)).addMeteredTableValue("concurrentEnforce_REALTIME",
+        ServerMeter.QUERIES_KILLED_SCAN, 1L);
+    verify(_serverMetrics, 
never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1L);
+    assertNotNull(execCtx.getTerminateException());
+  }
 
   /**
    * A test strategy that always kills — used to verify custom factory loading.
@@ -308,8 +907,8 @@ public class QueryKillingManagerTest {
 
     @Override
     public QueryKillReport buildKillReport(QueryScanCostContext context,
-        String queryId, String tableName, String configSource) {
-      return new QueryKillReport(queryId, tableName, "AlwaysKillStrategy",
+        long requestId, String queryId, String tableName, String configSource) 
{
+      return new QueryKillReport(requestId, queryId, tableName, 
"AlwaysKillStrategy",
           "always", 0, 0, configSource, context);
     }
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
index 48785498162..e64c1985c90 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java
@@ -81,7 +81,7 @@ public class ScanEntriesThresholdStrategyTest {
     QueryScanCostContext ctx = new QueryScanCostContext();
     ctx.addEntriesScannedInFilter(150_000_000);
 
-    QueryKillReport report = strategy.buildKillReport(ctx, "q1", "myTable", 
"cluster");
+    QueryKillReport report = strategy.buildKillReport(ctx, 1L, "q1", 
"myTable", "cluster");
     assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter");
     assertEquals(report.getActualValue(), 150_000_000L);
     assertEquals(report.getThresholdValue(), 100_000_000L);
@@ -93,7 +93,7 @@ public class ScanEntriesThresholdStrategyTest {
     QueryScanCostContext ctx = new QueryScanCostContext();
     ctx.addDocsScanned(15_000_000);
 
-    QueryKillReport report = strategy.buildKillReport(ctx, "q2", "myTable", 
"table:myTable");
+    QueryKillReport report = strategy.buildKillReport(ctx, 2L, "q2", 
"myTable", "table:myTable");
     assertEquals(report.getTriggeringMetric(), "numDocsScanned");
     assertEquals(report.getActualValue(), 15_000_000L);
     assertEquals(report.getThresholdValue(), 10_000_000L);
@@ -122,6 +122,64 @@ public class ScanEntriesThresholdStrategyTest {
     assertTrue(strategy.shouldTerminate(ctx2));
   }
 
+  @Test
+  public void testAbovePostFilterThresholdKills() {
+    ScanEntriesThresholdStrategy strategy =
+        new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE, 
1000L);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedPostFilter(1001);
+    assertTrue(strategy.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testBelowPostFilterThresholdDoesNotKill() {
+    ScanEntriesThresholdStrategy strategy =
+        new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE, 
1000L);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedPostFilter(999);
+    assertFalse(strategy.shouldTerminate(ctx));
+  }
+
+  @Test
+  public void testBuildKillReportForPostFilter() {
+    ScanEntriesThresholdStrategy strategy =
+        new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE, 
5000L);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedPostFilter(7500);
+
+    QueryKillReport report = strategy.buildKillReport(ctx, 3L, "q3", 
"myTable", "cluster");
+    assertEquals(report.getTriggeringMetric(), "numEntriesScannedPostFilter");
+    assertEquals(report.getActualValue(), 7500L);
+    assertEquals(report.getThresholdValue(), 5000L);
+  }
+
+  @Test
+  public void testPostFilterOverrideViaForQuery() {
+    ScanEntriesThresholdStrategy strategy =
+        new ScanEntriesThresholdStrategy(100L, 200L, 300L);
+    QueryConfig queryConfig = new QueryConfig(null, null, null, null, null, 
null, null, null, 600L);
+
+    QueryKillingStrategy result = strategy.forQuery(queryConfig, null);
+    assertTrue(result != strategy);
+    ScanEntriesThresholdStrategy overridden = (ScanEntriesThresholdStrategy) 
result;
+    assertEquals(overridden.getMaxEntriesScannedInFilter(), 100L);
+    assertEquals(overridden.getMaxDocsScanned(), 200L);
+    assertEquals(overridden.getMaxEntriesScannedPostFilter(), 600L);
+  }
+
+  @Test
+  public void testPostFilterPriorityInBuildKillReport() {
+    // When both entries-in-filter and post-filter exceed, entries-in-filter 
takes priority
+    ScanEntriesThresholdStrategy strategy =
+        new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE, 500L);
+    QueryScanCostContext ctx = new QueryScanCostContext();
+    ctx.addEntriesScannedInFilter(200);
+    ctx.addEntriesScannedPostFilter(1000);
+
+    QueryKillReport report = strategy.buildKillReport(ctx, 4L, "q4", 
"myTable", "cluster");
+    assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter");
+  }
+
   // --- forQuery() table override tests ---
 
   @Test
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 71c4e6539f8..b22f220a4df 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -87,6 +87,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
 import 
org.apache.pinot.core.data.manager.realtime.ServerRateLimitConfigChangeListener;
 import org.apache.pinot.core.instance.context.ServerContext;
+import org.apache.pinot.core.query.killing.QueryKillingManager;
 import 
org.apache.pinot.core.query.scheduler.QuerySchedulerThreadPoolConfigChangeListener;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.core.transport.ListenerConfig;
@@ -189,6 +190,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
   protected PinotEnvironmentProvider _pinotEnvironmentProvider;
   protected SegmentOperationsThrottlerSet _segmentOperationsThrottlerSet;
   protected ThreadAccountant _threadAccountant;
+  protected QueryKillingManager _queryKillingManager;
   protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler;
   protected volatile boolean _isServerReadyToServeQueries = false;
   protected ScheduledExecutorService _helixMessageCountScheduler;
@@ -772,6 +774,10 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _threadAccountant = 
ThreadAccountantUtils.createAccountant(accountingConfig, _instanceId,
         org.apache.pinot.spi.config.instance.InstanceType.SERVER);
 
+    // Initialize scan-based query killing
+    PinotConfiguration schedulerConfig = 
_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX);
+    _queryKillingManager = QueryKillingManager.init(schedulerConfig, 
ServerMetrics.get());
+
     SendStatsPredicate sendStatsPredicate = 
SendStatsPredicate.create(_serverConf, _helixManager);
     KeepPipelineBreakerStatsPredicate keepPipelineBreakerStatsPredicate =
         KeepPipelineBreakerStatsPredicate.create(_serverConf);
@@ -790,6 +796,11 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         new ServerRateLimitConfigChangeListener(_serverMetrics);
     
_clusterConfigChangeHandler.registerClusterConfigChangeListener(serverRateLimitConfigChangeListener);
 
+    // Register query killing manager for dynamic config updates 
(threshold/mode changes via ZK)
+    if (_queryKillingManager != null) {
+      
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_queryKillingManager);
+    }
+
     initSegmentFetcher(_serverConf);
     StateModelFactory<?> stateModelFactory =
         createSegmentOnlineOfflineStateModelFactory(instanceDataManager, 
_transitionThreadPoolManager);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
index 093b7c340fa..92fb399f584 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
 
 
 /**
@@ -62,6 +63,10 @@ public class QueryConfig extends BaseJsonConfig {
 
   private final Long _maxEntriesScannedPostFilter;
 
+  // Per-table scan-based killing mode override. Null means use the 
cluster-level mode.
+  // Valid values: "disabled", "logOnly", "enforce" (case-insensitive).
+  private final String _scanKillingMode;
+
 
   public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy,
       @Nullable Boolean useApproximateFunction, @Nullable Map<String, String> 
expressionOverrideMap,
@@ -70,6 +75,16 @@ public class QueryConfig extends BaseJsonConfig {
         maxQueryResponseSizeBytes, maxServerResponseSizeBytes, null, null, 
null);
   }
 
+  public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy,
+      @Nullable Boolean useApproximateFunction, @Nullable Map<String, String> 
expressionOverrideMap,
+      @Nullable Long maxQueryResponseSizeBytes, @Nullable Long 
maxServerResponseSizeBytes,
+      @Nullable Long maxEntriesScannedInFilter, @Nullable Long maxDocsScanned,
+      @Nullable Long maxEntriesScannedPostFilter) {
+    this(timeoutMs, disableGroovy, useApproximateFunction, 
expressionOverrideMap,
+        maxQueryResponseSizeBytes, maxServerResponseSizeBytes, 
maxEntriesScannedInFilter, maxDocsScanned,
+        maxEntriesScannedPostFilter, null);
+  }
+
   @JsonCreator
   public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs,
       @JsonProperty("disableGroovy") @Nullable Boolean disableGroovy,
@@ -79,7 +94,8 @@ public class QueryConfig extends BaseJsonConfig {
       @JsonProperty("maxServerResponseSizeBytes") @Nullable Long 
maxServerResponseSizeBytes,
       @JsonProperty("maxEntriesScannedInFilter") @Nullable Long 
maxEntriesScannedInFilter,
       @JsonProperty("maxDocsScanned") @Nullable Long maxDocsScanned,
-      @JsonProperty("maxEntriesScannedPostFilter") @Nullable Long 
maxEntriesScannedPostFilter) {
+      @JsonProperty("maxEntriesScannedPostFilter") @Nullable Long 
maxEntriesScannedPostFilter,
+      @JsonProperty("scanKillingMode") @Nullable String scanKillingMode) {
     Preconditions.checkArgument(timeoutMs == null || timeoutMs > 0, "Invalid 
'timeoutMs': %s", timeoutMs);
     Preconditions.checkArgument(maxQueryResponseSizeBytes == null || 
maxQueryResponseSizeBytes > 0,
         "Invalid 'maxQueryResponseSizeBytes': %s", maxQueryResponseSizeBytes);
@@ -91,6 +107,8 @@ public class QueryConfig extends BaseJsonConfig {
         "Invalid 'maxDocsScanned': %s", maxDocsScanned);
     Preconditions.checkArgument(maxEntriesScannedPostFilter == null || 
maxEntriesScannedPostFilter > 0,
         "Invalid 'maxEntriesScannedPostFilter': %s", 
maxEntriesScannedPostFilter);
+    Preconditions.checkArgument(scanKillingMode == null || 
ScanKillingMode.fromConfigValue(scanKillingMode) != null,
+        "Invalid 'scanKillingMode': %s. Valid values: disabled, logOnly, 
enforce", scanKillingMode);
 
     _timeoutMs = timeoutMs;
     _disableGroovy = disableGroovy;
@@ -101,6 +119,7 @@ public class QueryConfig extends BaseJsonConfig {
     _maxEntriesScannedInFilter = maxEntriesScannedInFilter;
     _maxDocsScanned = maxDocsScanned;
     _maxEntriesScannedPostFilter = maxEntriesScannedPostFilter;
+    _scanKillingMode = scanKillingMode;
   }
 
   @Nullable
@@ -156,4 +175,10 @@ public class QueryConfig extends BaseJsonConfig {
   public Long getMaxEntriesScannedPostFilter() {
     return _maxEntriesScannedPostFilter;
   }
+
+  @Nullable
+  @JsonProperty("scanKillingMode")
+  public String getScanKillingMode() {
+    return _scanKillingMode;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java
index ca970cf5ff1..dd79faa2aff 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.pinot.spi.exception.QueryErrorCode;
@@ -65,6 +66,25 @@ public class QueryExecutionContext {
 
   private volatile TerminationException _terminateException;
 
+  /// Per-query scan cost accumulators for scan-based killing, tracking 
cumulative scan cost across all segments.
+  @Nullable
+  private volatile QueryScanCostContext _queryScanCostContext;
+
+  @Nullable
+  private volatile Object _cachedKillingStrategy;
+
+  @Nullable
+  private volatile String _tableName;
+
+  @Nullable
+  private volatile String _queryId;
+
+  @Nullable
+  private volatile Accounting.ScanKillingMode _effectiveScanKillingMode;
+
+  /// Guards single-emission of the scan-based killing dry-run log line and 
metric for this query
+  private final AtomicBoolean _scanKillingDryRunEmitted = new 
AtomicBoolean(false);
+
   public QueryExecutionContext(QueryType queryType, long requestId, String 
cid, String workloadName, long startTimeMs,
       long activeDeadlineMs, long passiveDeadlineMs, String brokerId, String 
instanceId, String queryHash) {
     _queryType = queryType;
@@ -193,4 +213,65 @@ public class QueryExecutionContext {
   public TerminationException getTerminateException() {
     return _terminateException;
   }
+
+  @Nullable
+  public QueryScanCostContext getQueryScanCostContext() {
+    return _queryScanCostContext;
+  }
+
+  public void setQueryScanCostContext(@Nullable QueryScanCostContext 
queryScanCostContext) {
+    _queryScanCostContext = queryScanCostContext;
+  }
+
+  @Nullable
+  public Object getCachedKillingStrategy() {
+    return _cachedKillingStrategy;
+  }
+
+  public void setCachedKillingStrategy(@Nullable Object cachedKillingStrategy) 
{
+    _cachedKillingStrategy = cachedKillingStrategy;
+  }
+
+  @Nullable
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public void setTableName(@Nullable String tableName) {
+    _tableName = tableName;
+  }
+
+  @Nullable
+  public String getQueryId() {
+    return _queryId;
+  }
+
+  public void setQueryId(@Nullable String queryId) {
+    _queryId = queryId;
+  }
+
+  /**
+   * Returns the per-table scan killing mode override set for this query, or 
{@code null} if no
+   * table-level override is configured. When {@code null}, the cluster-level 
mode from
+   * {@link org.apache.pinot.spi.utils.CommonConstants.Accounting} applies.
+   */
+  @Nullable
+  public Accounting.ScanKillingMode getEffectiveScanKillingMode() {
+    return _effectiveScanKillingMode;
+  }
+
+  /**
+   * Sets the per-table scan killing mode for this query. Pass {@code null} to 
fall back to the
+   * cluster-level mode. Called once during query initialization; thread-safe 
via {@code volatile}.
+   */
+  public void setEffectiveScanKillingMode(@Nullable Accounting.ScanKillingMode 
effectiveScanKillingMode) {
+    _effectiveScanKillingMode = effectiveScanKillingMode;
+  }
+
+  /**
+   * Atomically marks that the scan-based killing dry-run signal has been 
emitted for this query
+   */
+  public boolean markScanKillingDryRunEmitted() {
+    return _scanKillingDryRunEmitted.compareAndSet(false, true);
+  }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
index 5d6ea187d2d..fce5748652c 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.config.table;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -92,4 +93,71 @@ public class QueryConfigScanKillingTest {
   public void testZeroMaxDocsScannedThrows() {
     new QueryConfig(null, null, null, null, null, null, null, 0L, null);
   }
+
+  @Test
+  public void testScanKillingModeDefaultsToNull() {
+    QueryConfig config = new QueryConfig(null, null, null, null, null, null, 
null, null, null);
+    assertNull(config.getScanKillingMode());
+  }
+
+  @Test
+  public void testScanKillingModeSetExplicitly() {
+    QueryConfig config = new QueryConfig(null, null, null, null, null, null, 
null, null, null, "enforce");
+    assertEquals(config.getScanKillingMode(), "enforce");
+  }
+
+  @Test
+  public void testScanKillingModeJsonRoundTrip()
+      throws Exception {
+    QueryConfig config = new QueryConfig(null, null, null, null, null, null, 
500_000L, null, null, "logOnly");
+
+    String json = OBJECT_MAPPER.writeValueAsString(config);
+    QueryConfig deserialized = OBJECT_MAPPER.readValue(json, 
QueryConfig.class);
+
+    assertEquals(deserialized.getScanKillingMode(), "logOnly");
+    assertEquals(deserialized.getMaxEntriesScannedInFilter(), 
Long.valueOf(500_000L));
+  }
+
+  @Test
+  public void testScanKillingModeDeserializesFromJson()
+      throws Exception {
+    String json = "{\"maxDocsScanned\": 5000000, \"scanKillingMode\": 
\"disabled\"}";
+    QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class);
+    assertEquals(config.getScanKillingMode(), "disabled");
+    assertEquals(config.getMaxDocsScanned(), Long.valueOf(5_000_000L));
+  }
+
+  @Test
+  public void testScanKillingModeAbsentInJsonDeserializesToNull()
+      throws Exception {
+    String json = "{\"maxDocsScanned\": 5000000}";
+    QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class);
+    assertNull(config.getScanKillingMode());
+  }
+
+  @Test
+  public void testScanKillingModeStringParsesToCorrectEnum() {
+    // Verify the production parse path: getScanKillingMode() string → 
ScanKillingMode enum
+    QueryConfig enforce = new QueryConfig(null, null, null, null, null, null, 
null, null, null, "enforce");
+    
assertEquals(ScanKillingMode.fromConfigValue(enforce.getScanKillingMode()), 
ScanKillingMode.ENFORCE);
+
+    QueryConfig logOnly = new QueryConfig(null, null, null, null, null, null, 
null, null, null, "logOnly");
+    
assertEquals(ScanKillingMode.fromConfigValue(logOnly.getScanKillingMode()), 
ScanKillingMode.LOG_ONLY);
+
+    QueryConfig disabled = new QueryConfig(null, null, null, null, null, null, 
null, null, null, "disabled");
+    
assertEquals(ScanKillingMode.fromConfigValue(disabled.getScanKillingMode()), 
ScanKillingMode.DISABLED);
+  }
+
+  @Test
+  public void testScanKillingModeCaseInsensitiveParsing() {
+    // fromConfigValue is case-insensitive, so "Enforce" is valid
+    QueryConfig config = new QueryConfig(null, null, null, null, null, null, 
null, null, null, "Enforce");
+    assertEquals(ScanKillingMode.fromConfigValue(config.getScanKillingMode()), 
ScanKillingMode.ENFORCE);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testInvalidScanKillingModeThrowsAtConstruction() {
+    // A completely unrecognized value fails at construction time, not 
silently at query time
+    new QueryConfig(null, null, null, null, null, null, null, null, null, 
"enforced");
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to