This is an automated email from the ASF dual-hosted git repository.

FelixYBW pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new aa82d4aae6 [VL] Reduce Velox scan SQL metrics by default to mitigate 
driver OOM (#12127)
aa82d4aae6 is described below

commit aa82d4aae69a5bd4b09c36e8280d0fe14ada5a9e
Author: lifulong <[email protected]>
AuthorDate: Fri Jun 5 09:17:50 2026 +0800

    [VL] Reduce Velox scan SQL metrics by default to mitigate driver OOM 
(#12127)
    
    Add config to reduce velox scan metrics and avoid OOM error on driver.
---
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala |  18 ++++
 .../gluten/metrics/BatchScanMetricsUpdater.scala   | 102 ++++++++++++------
 .../metrics/FileSourceScanMetricsUpdater.scala     | 116 ++++++++++-----------
 .../metrics/HiveTableScanMetricsUpdater.scala      | 113 ++++++++++----------
 docs/Configuration.md                              |   1 +
 .../org/apache/gluten/config/GlutenConfig.scala    |  14 +++
 .../execution/FileSourceScanExecTransformer.scala  |  13 ++-
 .../apache/gluten/metrics/ScanMetricsUtil.scala    |  81 ++++++++++++++
 .../sql/hive/HiveTableScanExecTransformer.scala    |   5 +-
 .../gluten/metrics/ScanMetricsUtilSuite.scala      |  70 +++++++++++++
 10 files changed, 380 insertions(+), 153 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 800fd758e8..83e83555fa 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -99,6 +99,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
   }
 
   override def genBatchScanTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
+    ScanMetricsUtil.filterScanMetrics(
+      genBatchScanTransformerMetricsFull(sparkContext),
+      ScanMetricsUtil.VELOX_BATCH_SCAN_MINIMAL_METRICS)
+
+  private def genBatchScanTransformerMetricsFull(
+      sparkContext: SparkContext): Map[String, SQLMetric] =
     Map(
       "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input 
vectors"),
@@ -148,6 +154,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
 
   override def genHiveTableScanTransformerMetrics(
       sparkContext: SparkContext): Map[String, SQLMetric] =
+    ScanMetricsUtil.filterScanMetrics(
+      genHiveTableScanTransformerMetricsFull(sparkContext),
+      ScanMetricsUtil.VELOX_HIVE_SCAN_MINIMAL_METRICS)
+
+  private def genHiveTableScanTransformerMetricsFull(
+      sparkContext: SparkContext): Map[String, SQLMetric] =
     Map(
       "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw 
input rows"),
       "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
raw input bytes"),
@@ -200,6 +212,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
 
   override def genFileSourceScanTransformerMetrics(
       sparkContext: SparkContext): Map[String, SQLMetric] =
+    ScanMetricsUtil.filterScanMetrics(
+      genFileSourceScanTransformerMetricsFull(sparkContext),
+      ScanMetricsUtil.VELOX_FILE_SCAN_MINIMAL_METRICS)
+
+  private def genFileSourceScanTransformerMetricsFull(
+      sparkContext: SparkContext): Map[String, SQLMetric] =
     Map(
       "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw 
input rows"),
       "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
raw input bytes"),
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
index 9e2b1d7c92..45767a2c53 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
@@ -19,46 +19,82 @@ package org.apache.gluten.metrics
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 
-class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends 
MetricsUpdater {
+/**
+ * See [[FileSourceScanMetricsUpdater]]: @transient metrics map with 
per-metric fields captured on
+ * the driver for executor serialization.
+ */
+class BatchScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
+  extends MetricsUpdater {
+
+  private def metric(key: String): Option[SQLMetric] = 
Option(metrics).flatMap(_.get(key))
+
+  private val numInputRows: Option[SQLMetric] = metric("numInputRows")
+  private val inputVectors: Option[SQLMetric] = metric("inputVectors")
+  private val inputBytes: Option[SQLMetric] = metric("inputBytes")
+  private val rawInputRows: Option[SQLMetric] = metric("rawInputRows")
+  private val rawInputBytes: Option[SQLMetric] = metric("rawInputBytes")
+  private val outputRows: Option[SQLMetric] = metric("numOutputRows")
+  private val outputVectors: Option[SQLMetric] = metric("outputVectors")
+  private val outputBytes: Option[SQLMetric] = metric("outputBytes")
+  private val cpuCount: Option[SQLMetric] = metric("cpuCount")
+  private val scanTime: Option[SQLMetric] = metric("scanTime")
+  private val wallNanos: Option[SQLMetric] = metric("wallNanos")
+  private val peakMemoryBytes: Option[SQLMetric] = metric("peakMemoryBytes")
+  private val numMemoryAllocations: Option[SQLMetric] = 
metric("numMemoryAllocations")
+  private val numDynamicFiltersAccepted: Option[SQLMetric] = 
metric("numDynamicFiltersAccepted")
+  private val skippedSplits: Option[SQLMetric] = metric("skippedSplits")
+  private val processedSplits: Option[SQLMetric] = metric("processedSplits")
+  private val skippedStrides: Option[SQLMetric] = metric("skippedStrides")
+  private val processedStrides: Option[SQLMetric] = metric("processedStrides")
+  private val remainingFilterTime: Option[SQLMetric] = 
metric("remainingFilterTime")
+  private val ioWaitTime: Option[SQLMetric] = metric("ioWaitTime")
+  private val storageReadBytes: Option[SQLMetric] = metric("storageReadBytes")
+  private val storageReads: Option[SQLMetric] = metric("storageReads")
+  private val localReadBytes: Option[SQLMetric] = metric("localReadBytes")
+  private val ramReadBytes: Option[SQLMetric] = metric("ramReadBytes")
+  private val preloadSplits: Option[SQLMetric] = metric("preloadSplits")
+  private val pageLoadTime: Option[SQLMetric] = metric("pageLoadTime")
+  private val dataSourceAddSplitTime: Option[SQLMetric] = 
metric("dataSourceAddSplitTime")
+  private val dataSourceReadTime: Option[SQLMetric] = 
metric("dataSourceReadTime")
+  private val loadLazyVectorTime: Option[SQLMetric] = 
metric("loadLazyVectorTime")
 
   override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
-    inputMetrics.bridgeIncBytesRead(metrics("rawInputBytes").value)
-    inputMetrics.bridgeIncRecordsRead(metrics("rawInputRows").value)
+    rawInputBytes.foreach(m => inputMetrics.bridgeIncBytesRead(m.value))
+    rawInputRows.foreach(m => inputMetrics.bridgeIncRecordsRead(m.value))
   }
 
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      metrics("numInputRows") += operatorMetrics.inputRows
-      metrics("inputVectors") += operatorMetrics.inputVectors
-      metrics("inputBytes") += operatorMetrics.inputBytes
-      metrics("rawInputRows") += operatorMetrics.rawInputRows
-      metrics("rawInputBytes") += operatorMetrics.rawInputBytes
-      metrics("numOutputRows") += operatorMetrics.outputRows
-      metrics("outputVectors") += operatorMetrics.outputVectors
-      metrics("outputBytes") += operatorMetrics.outputBytes
-      metrics("cpuCount") += operatorMetrics.cpuCount
-      metrics("scanTime") += operatorMetrics.scanTime
-      metrics("wallNanos") += operatorMetrics.wallNanos
-      metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
-      metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
-      // Number of dynamic filters received.
-      metrics("numDynamicFiltersAccepted") += 
operatorMetrics.numDynamicFiltersAccepted
-      metrics("skippedSplits") += operatorMetrics.skippedSplits
-      metrics("processedSplits") += operatorMetrics.processedSplits
-      metrics("skippedStrides") += operatorMetrics.skippedStrides
-      metrics("processedStrides") += operatorMetrics.processedStrides
-      metrics("remainingFilterTime") += operatorMetrics.remainingFilterTime
-      metrics("ioWaitTime") += operatorMetrics.ioWaitTime
-      metrics("storageReadBytes") += operatorMetrics.storageReadBytes
-      metrics("storageReads") += operatorMetrics.storageReads
-      metrics("localReadBytes") += operatorMetrics.localReadBytes
-      metrics("ramReadBytes") += operatorMetrics.ramReadBytes
-      metrics("preloadSplits") += operatorMetrics.preloadSplits
-      metrics("pageLoadTime") += operatorMetrics.pageLoadTime
-      metrics("dataSourceAddSplitTime") += 
operatorMetrics.dataSourceAddSplitTime
-      metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime
-      metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
+      ScanMetricsUtil.inc(numInputRows, operatorMetrics.inputRows)
+      ScanMetricsUtil.inc(inputVectors, operatorMetrics.inputVectors)
+      ScanMetricsUtil.inc(inputBytes, operatorMetrics.inputBytes)
+      ScanMetricsUtil.inc(rawInputRows, operatorMetrics.rawInputRows)
+      ScanMetricsUtil.inc(rawInputBytes, operatorMetrics.rawInputBytes)
+      ScanMetricsUtil.inc(outputRows, operatorMetrics.outputRows)
+      ScanMetricsUtil.inc(outputVectors, operatorMetrics.outputVectors)
+      ScanMetricsUtil.inc(outputBytes, operatorMetrics.outputBytes)
+      ScanMetricsUtil.inc(cpuCount, operatorMetrics.cpuCount)
+      ScanMetricsUtil.inc(scanTime, operatorMetrics.scanTime)
+      ScanMetricsUtil.inc(wallNanos, operatorMetrics.wallNanos)
+      ScanMetricsUtil.inc(peakMemoryBytes, operatorMetrics.peakMemoryBytes)
+      ScanMetricsUtil.inc(numMemoryAllocations, 
operatorMetrics.numMemoryAllocations)
+      ScanMetricsUtil.inc(numDynamicFiltersAccepted, 
operatorMetrics.numDynamicFiltersAccepted)
+      ScanMetricsUtil.inc(skippedSplits, operatorMetrics.skippedSplits)
+      ScanMetricsUtil.inc(processedSplits, operatorMetrics.processedSplits)
+      ScanMetricsUtil.inc(skippedStrides, operatorMetrics.skippedStrides)
+      ScanMetricsUtil.inc(processedStrides, operatorMetrics.processedStrides)
+      ScanMetricsUtil.inc(remainingFilterTime, 
operatorMetrics.remainingFilterTime)
+      ScanMetricsUtil.inc(ioWaitTime, operatorMetrics.ioWaitTime)
+      ScanMetricsUtil.inc(storageReadBytes, operatorMetrics.storageReadBytes)
+      ScanMetricsUtil.inc(storageReads, operatorMetrics.storageReads)
+      ScanMetricsUtil.inc(localReadBytes, operatorMetrics.localReadBytes)
+      ScanMetricsUtil.inc(ramReadBytes, operatorMetrics.ramReadBytes)
+      ScanMetricsUtil.inc(preloadSplits, operatorMetrics.preloadSplits)
+      ScanMetricsUtil.inc(pageLoadTime, operatorMetrics.pageLoadTime)
+      ScanMetricsUtil.inc(dataSourceAddSplitTime, 
operatorMetrics.dataSourceAddSplitTime)
+      ScanMetricsUtil.inc(dataSourceReadTime, 
operatorMetrics.dataSourceReadTime)
+      ScanMetricsUtil.inc(loadLazyVectorTime, 
operatorMetrics.loadLazyVectorTime)
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
index 527f8d7bc5..4b62447aac 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
@@ -20,76 +20,76 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 
 /**
- * Note: "val metrics" is made transient to avoid sending driver-side metrics 
to tasks, e.g.
- * "pruning time" from scan.
+ * Note: "metrics" is @transient to avoid sending the map to executors. 
SQLMetric references are
+ * captured into fields at construction time on the driver (when metrics are 
registered) and
+ * serialized to executors. Missing keys (minimal scan metrics mode) yield 
None and are skipped.
  */
 class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, 
SQLMetric])
   extends MetricsUpdater {
 
-  val rawInputRows: SQLMetric = metrics("rawInputRows")
-  val rawInputBytes: SQLMetric = metrics("rawInputBytes")
-  val outputRows: SQLMetric = metrics("numOutputRows")
-  val outputVectors: SQLMetric = metrics("outputVectors")
-  val outputBytes: SQLMetric = metrics("outputBytes")
-  val wallNanos: SQLMetric = metrics("wallNanos")
-  val cpuCount: SQLMetric = metrics("cpuCount")
-  val scanTime: SQLMetric = metrics("scanTime")
-  val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes")
-  val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations")
+  private def metric(key: String): Option[SQLMetric] = 
Option(metrics).flatMap(_.get(key))
 
-  // Number of dynamic filters received.
-  val numDynamicFiltersAccepted: SQLMetric = 
metrics("numDynamicFiltersAccepted")
-  val skippedSplits: SQLMetric = metrics("skippedSplits")
-  val processedSplits: SQLMetric = metrics("processedSplits")
-  val preloadSplits: SQLMetric = metrics("preloadSplits")
-  val pageLoadTime: SQLMetric = metrics("pageLoadTime")
-  val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
-  val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
-  val skippedStrides: SQLMetric = metrics("skippedStrides")
-  val processedStrides: SQLMetric = metrics("processedStrides")
-  val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
-  val ioWaitTime: SQLMetric = metrics("ioWaitTime")
-  val storageReadBytes: SQLMetric = metrics("storageReadBytes")
-  val storageReads: SQLMetric = metrics("storageReads")
-  val localReadBytes: SQLMetric = metrics("localReadBytes")
-  val ramReadBytes: SQLMetric = metrics("ramReadBytes")
-  val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
+  private val rawInputRows: Option[SQLMetric] = metric("rawInputRows")
+  private val rawInputBytes: Option[SQLMetric] = metric("rawInputBytes")
+  private val outputRows: Option[SQLMetric] = metric("numOutputRows")
+  private val outputVectors: Option[SQLMetric] = metric("outputVectors")
+  private val outputBytes: Option[SQLMetric] = metric("outputBytes")
+  private val wallNanos: Option[SQLMetric] = metric("wallNanos")
+  private val cpuCount: Option[SQLMetric] = metric("cpuCount")
+  private val scanTime: Option[SQLMetric] = metric("scanTime")
+  private val peakMemoryBytes: Option[SQLMetric] = metric("peakMemoryBytes")
+  private val numMemoryAllocations: Option[SQLMetric] = 
metric("numMemoryAllocations")
+  private val numDynamicFiltersAccepted: Option[SQLMetric] = 
metric("numDynamicFiltersAccepted")
+  private val skippedSplits: Option[SQLMetric] = metric("skippedSplits")
+  private val processedSplits: Option[SQLMetric] = metric("processedSplits")
+  private val skippedStrides: Option[SQLMetric] = metric("skippedStrides")
+  private val processedStrides: Option[SQLMetric] = metric("processedStrides")
+  private val remainingFilterTime: Option[SQLMetric] = 
metric("remainingFilterTime")
+  private val ioWaitTime: Option[SQLMetric] = metric("ioWaitTime")
+  private val storageReadBytes: Option[SQLMetric] = metric("storageReadBytes")
+  private val storageReads: Option[SQLMetric] = metric("storageReads")
+  private val localReadBytes: Option[SQLMetric] = metric("localReadBytes")
+  private val ramReadBytes: Option[SQLMetric] = metric("ramReadBytes")
+  private val preloadSplits: Option[SQLMetric] = metric("preloadSplits")
+  private val pageLoadTime: Option[SQLMetric] = metric("pageLoadTime")
+  private val dataSourceAddSplitTime: Option[SQLMetric] = 
metric("dataSourceAddSplitTime")
+  private val dataSourceReadTime: Option[SQLMetric] = 
metric("dataSourceReadTime")
+  private val loadLazyVectorTime: Option[SQLMetric] = 
metric("loadLazyVectorTime")
 
   override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
-    inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
-    inputMetrics.bridgeIncRecordsRead(rawInputRows.value)
+    rawInputBytes.foreach(m => inputMetrics.bridgeIncBytesRead(m.value))
+    rawInputRows.foreach(m => inputMetrics.bridgeIncRecordsRead(m.value))
   }
 
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      rawInputRows += operatorMetrics.rawInputRows
-      rawInputBytes += operatorMetrics.rawInputBytes
-      outputRows += operatorMetrics.outputRows
-      outputVectors += operatorMetrics.outputVectors
-      outputBytes += operatorMetrics.outputBytes
-      wallNanos += operatorMetrics.wallNanos
-      cpuCount += operatorMetrics.cpuCount
-      scanTime += operatorMetrics.scanTime
-      peakMemoryBytes += operatorMetrics.peakMemoryBytes
-      numMemoryAllocations += operatorMetrics.numMemoryAllocations
-      // Number of dynamic filters received.
-      numDynamicFiltersAccepted += operatorMetrics.numDynamicFiltersAccepted
-      skippedSplits += operatorMetrics.skippedSplits
-      processedSplits += operatorMetrics.processedSplits
-      skippedStrides += operatorMetrics.skippedStrides
-      processedStrides += operatorMetrics.processedStrides
-      remainingFilterTime += operatorMetrics.remainingFilterTime
-      ioWaitTime += operatorMetrics.ioWaitTime
-      storageReadBytes += operatorMetrics.storageReadBytes
-      storageReads += operatorMetrics.storageReads
-      localReadBytes += operatorMetrics.localReadBytes
-      ramReadBytes += operatorMetrics.ramReadBytes
-      preloadSplits += operatorMetrics.preloadSplits
-      pageLoadTime += operatorMetrics.pageLoadTime
-      dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
-      dataSourceReadTime += operatorMetrics.dataSourceReadTime
-      loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
+      ScanMetricsUtil.inc(rawInputRows, operatorMetrics.rawInputRows)
+      ScanMetricsUtil.inc(rawInputBytes, operatorMetrics.rawInputBytes)
+      ScanMetricsUtil.inc(outputRows, operatorMetrics.outputRows)
+      ScanMetricsUtil.inc(outputVectors, operatorMetrics.outputVectors)
+      ScanMetricsUtil.inc(outputBytes, operatorMetrics.outputBytes)
+      ScanMetricsUtil.inc(wallNanos, operatorMetrics.wallNanos)
+      ScanMetricsUtil.inc(cpuCount, operatorMetrics.cpuCount)
+      ScanMetricsUtil.inc(scanTime, operatorMetrics.scanTime)
+      ScanMetricsUtil.inc(peakMemoryBytes, operatorMetrics.peakMemoryBytes)
+      ScanMetricsUtil.inc(numMemoryAllocations, 
operatorMetrics.numMemoryAllocations)
+      ScanMetricsUtil.inc(numDynamicFiltersAccepted, 
operatorMetrics.numDynamicFiltersAccepted)
+      ScanMetricsUtil.inc(skippedSplits, operatorMetrics.skippedSplits)
+      ScanMetricsUtil.inc(processedSplits, operatorMetrics.processedSplits)
+      ScanMetricsUtil.inc(skippedStrides, operatorMetrics.skippedStrides)
+      ScanMetricsUtil.inc(processedStrides, operatorMetrics.processedStrides)
+      ScanMetricsUtil.inc(remainingFilterTime, 
operatorMetrics.remainingFilterTime)
+      ScanMetricsUtil.inc(ioWaitTime, operatorMetrics.ioWaitTime)
+      ScanMetricsUtil.inc(storageReadBytes, operatorMetrics.storageReadBytes)
+      ScanMetricsUtil.inc(storageReads, operatorMetrics.storageReads)
+      ScanMetricsUtil.inc(localReadBytes, operatorMetrics.localReadBytes)
+      ScanMetricsUtil.inc(ramReadBytes, operatorMetrics.ramReadBytes)
+      ScanMetricsUtil.inc(preloadSplits, operatorMetrics.preloadSplits)
+      ScanMetricsUtil.inc(pageLoadTime, operatorMetrics.pageLoadTime)
+      ScanMetricsUtil.inc(dataSourceAddSplitTime, 
operatorMetrics.dataSourceAddSplitTime)
+      ScanMetricsUtil.inc(dataSourceReadTime, 
operatorMetrics.dataSourceReadTime)
+      ScanMetricsUtil.inc(loadLazyVectorTime, 
operatorMetrics.loadLazyVectorTime)
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
index 3330f8b15a..7affd730aa 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
@@ -19,72 +19,73 @@ package org.apache.gluten.metrics
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 
+/** See [[FileSourceScanMetricsUpdater]]. */
 class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, 
SQLMetric])
   extends MetricsUpdater {
-  val rawInputRows: SQLMetric = metrics("rawInputRows")
-  val rawInputBytes: SQLMetric = metrics("rawInputBytes")
-  val outputRows: SQLMetric = metrics("numOutputRows")
-  val outputVectors: SQLMetric = metrics("outputVectors")
-  val outputBytes: SQLMetric = metrics("outputBytes")
-  val wallNanos: SQLMetric = metrics("wallNanos")
-  val cpuCount: SQLMetric = metrics("cpuCount")
-  val scanTime: SQLMetric = metrics("scanTime")
-  val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes")
-  val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations")
 
-  // Number of dynamic filters received.
-  val numDynamicFiltersAccepted: SQLMetric = 
metrics("numDynamicFiltersAccepted")
-  val skippedSplits: SQLMetric = metrics("skippedSplits")
-  val processedSplits: SQLMetric = metrics("processedSplits")
-  val preloadSplits: SQLMetric = metrics("preloadSplits")
-  val pageLoadTime: SQLMetric = metrics("pageLoadTime")
-  val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
-  val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
-  val skippedStrides: SQLMetric = metrics("skippedStrides")
-  val processedStrides: SQLMetric = metrics("processedStrides")
-  val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
-  val ioWaitTime: SQLMetric = metrics("ioWaitTime")
-  val storageReadBytes: SQLMetric = metrics("storageReadBytes")
-  val storageReads: SQLMetric = metrics("storageReads")
-  val localReadBytes: SQLMetric = metrics("localReadBytes")
-  val ramReadBytes: SQLMetric = metrics("ramReadBytes")
-  val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
+  private def metric(key: String): Option[SQLMetric] = 
Option(metrics).flatMap(_.get(key))
+
+  private val rawInputRows: Option[SQLMetric] = metric("rawInputRows")
+  private val rawInputBytes: Option[SQLMetric] = metric("rawInputBytes")
+  private val outputRows: Option[SQLMetric] = metric("numOutputRows")
+  private val outputVectors: Option[SQLMetric] = metric("outputVectors")
+  private val outputBytes: Option[SQLMetric] = metric("outputBytes")
+  private val wallNanos: Option[SQLMetric] = metric("wallNanos")
+  private val cpuCount: Option[SQLMetric] = metric("cpuCount")
+  private val scanTime: Option[SQLMetric] = metric("scanTime")
+  private val peakMemoryBytes: Option[SQLMetric] = metric("peakMemoryBytes")
+  private val numMemoryAllocations: Option[SQLMetric] = 
metric("numMemoryAllocations")
+  private val numDynamicFiltersAccepted: Option[SQLMetric] = 
metric("numDynamicFiltersAccepted")
+  private val skippedSplits: Option[SQLMetric] = metric("skippedSplits")
+  private val processedSplits: Option[SQLMetric] = metric("processedSplits")
+  private val skippedStrides: Option[SQLMetric] = metric("skippedStrides")
+  private val processedStrides: Option[SQLMetric] = metric("processedStrides")
+  private val remainingFilterTime: Option[SQLMetric] = 
metric("remainingFilterTime")
+  private val ioWaitTime: Option[SQLMetric] = metric("ioWaitTime")
+  private val storageReadBytes: Option[SQLMetric] = metric("storageReadBytes")
+  private val storageReads: Option[SQLMetric] = metric("storageReads")
+  private val localReadBytes: Option[SQLMetric] = metric("localReadBytes")
+  private val ramReadBytes: Option[SQLMetric] = metric("ramReadBytes")
+  private val preloadSplits: Option[SQLMetric] = metric("preloadSplits")
+  private val pageLoadTime: Option[SQLMetric] = metric("pageLoadTime")
+  private val dataSourceAddSplitTime: Option[SQLMetric] = 
metric("dataSourceAddSplitTime")
+  private val dataSourceReadTime: Option[SQLMetric] = 
metric("dataSourceReadTime")
+  private val loadLazyVectorTime: Option[SQLMetric] = 
metric("loadLazyVectorTime")
 
   override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
-    inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
-    inputMetrics.bridgeIncRecordsRead(rawInputRows.value)
+    rawInputBytes.foreach(m => inputMetrics.bridgeIncBytesRead(m.value))
+    rawInputRows.foreach(m => inputMetrics.bridgeIncRecordsRead(m.value))
   }
 
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      rawInputRows += operatorMetrics.rawInputRows
-      rawInputBytes += operatorMetrics.rawInputBytes
-      outputRows += operatorMetrics.outputRows
-      outputVectors += operatorMetrics.outputVectors
-      outputBytes += operatorMetrics.outputBytes
-      wallNanos += operatorMetrics.wallNanos
-      cpuCount += operatorMetrics.cpuCount
-      scanTime += operatorMetrics.scanTime
-      peakMemoryBytes += operatorMetrics.peakMemoryBytes
-      numMemoryAllocations += operatorMetrics.numMemoryAllocations
-      // Number of dynamic filters received.
-      numDynamicFiltersAccepted += operatorMetrics.numDynamicFiltersAccepted
-      skippedSplits += operatorMetrics.skippedSplits
-      processedSplits += operatorMetrics.processedSplits
-      skippedStrides += operatorMetrics.skippedStrides
-      processedStrides += operatorMetrics.processedStrides
-      remainingFilterTime += operatorMetrics.remainingFilterTime
-      ioWaitTime += operatorMetrics.ioWaitTime
-      storageReadBytes += operatorMetrics.storageReadBytes
-      storageReads += operatorMetrics.storageReads
-      localReadBytes += operatorMetrics.localReadBytes
-      ramReadBytes += operatorMetrics.ramReadBytes
-      preloadSplits += operatorMetrics.preloadSplits
-      pageLoadTime += operatorMetrics.pageLoadTime
-      dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
-      dataSourceReadTime += operatorMetrics.dataSourceReadTime
-      loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
+      ScanMetricsUtil.inc(rawInputRows, operatorMetrics.rawInputRows)
+      ScanMetricsUtil.inc(rawInputBytes, operatorMetrics.rawInputBytes)
+      ScanMetricsUtil.inc(outputRows, operatorMetrics.outputRows)
+      ScanMetricsUtil.inc(outputVectors, operatorMetrics.outputVectors)
+      ScanMetricsUtil.inc(outputBytes, operatorMetrics.outputBytes)
+      ScanMetricsUtil.inc(wallNanos, operatorMetrics.wallNanos)
+      ScanMetricsUtil.inc(cpuCount, operatorMetrics.cpuCount)
+      ScanMetricsUtil.inc(scanTime, operatorMetrics.scanTime)
+      ScanMetricsUtil.inc(peakMemoryBytes, operatorMetrics.peakMemoryBytes)
+      ScanMetricsUtil.inc(numMemoryAllocations, 
operatorMetrics.numMemoryAllocations)
+      ScanMetricsUtil.inc(numDynamicFiltersAccepted, 
operatorMetrics.numDynamicFiltersAccepted)
+      ScanMetricsUtil.inc(skippedSplits, operatorMetrics.skippedSplits)
+      ScanMetricsUtil.inc(processedSplits, operatorMetrics.processedSplits)
+      ScanMetricsUtil.inc(skippedStrides, operatorMetrics.skippedStrides)
+      ScanMetricsUtil.inc(processedStrides, operatorMetrics.processedStrides)
+      ScanMetricsUtil.inc(remainingFilterTime, 
operatorMetrics.remainingFilterTime)
+      ScanMetricsUtil.inc(ioWaitTime, operatorMetrics.ioWaitTime)
+      ScanMetricsUtil.inc(storageReadBytes, operatorMetrics.storageReadBytes)
+      ScanMetricsUtil.inc(storageReads, operatorMetrics.storageReads)
+      ScanMetricsUtil.inc(localReadBytes, operatorMetrics.localReadBytes)
+      ScanMetricsUtil.inc(ramReadBytes, operatorMetrics.ramReadBytes)
+      ScanMetricsUtil.inc(preloadSplits, operatorMetrics.preloadSplits)
+      ScanMetricsUtil.inc(pageLoadTime, operatorMetrics.pageLoadTime)
+      ScanMetricsUtil.inc(dataSourceAddSplitTime, 
operatorMetrics.dataSourceAddSplitTime)
+      ScanMetricsUtil.inc(dataSourceReadTime, 
operatorMetrics.dataSourceReadTime)
+      ScanMetricsUtil.inc(loadLazyVectorTime, 
operatorMetrics.loadLazyVectorTime)
     }
   }
 }
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 748a200cfb..60d555e7c9 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -137,6 +137,7 @@ nav_order: 15
 | spark.gluten.sql.pushAggregateThroughJoin.maxDepth                  | 🔄 
Dynamic    | 2147483647        | Maximum join traversal depth when applying the 
push-aggregate-through-join optimization. A value of 1 allows pushing an 
aggregate through a single join; larger values allow the rule to traverse and 
push through multiple consecutive joins.                                        
                                                                                
                            [...]
 | spark.gluten.sql.removeNativeWriteFilesSortAndProject               | 🔄 
Dynamic    | true              | When true, Gluten will remove the vanilla 
Spark V1Writes added sort and project for velox backend.                        
                                                                                
                                                                                
                                                                                
                        [...]
 | spark.gluten.sql.rewrite.dateTimestampComparison                    | 🔄 
Dynamic    | true              | Rewrite the comparision between date and 
timestamp to timestamp comparison.For example `from_unixtime(ts) > date` will 
be rewritten to `ts > to_unixtime(date)`                                        
                                                                                
                                                                                
                           [...]
+| spark.gluten.sql.scan.detailedMetrics.enabled                       | 🔄 
Dynamic    | true              | When true (default), Velox backend scan 
operators register all detailed SQL metrics. When false, only essential scan 
metrics are registered to reduce driver memory usage. Also enabled 
automatically when spark.gluten.sql.debug is true. Does not affect the 
ClickHouse backend.                                                             
                                                   [...]
 | spark.gluten.sql.scan.fileSchemeValidation.enabled                  | 🔄 
Dynamic    | true              | When true, enable file path scheme validation 
for scan. Validation will fail if file scheme is not supported by registered 
file systems, which will cause scan  operator fall back.                        
                                                                                
                                                                                
                       [...]
 | spark.gluten.sql.supported.flattenNestedFunctions                   | 🔄 
Dynamic    | and,or            | Flatten nested functions as one for 
optimization.                                                                   
                                                                                
                                                                                
                                                                                
                              [...]
 | spark.gluten.sql.text.input.empty.as.default                        | 🔄 
Dynamic    | false             | treat empty fields in CSV input as default 
values.                                                                         
                                                                                
                                                                                
                                                                                
                       [...]
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 821d747228..8656b3e2af 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -327,6 +327,10 @@ class GlutenConfig(conf: SQLConf) extends 
GlutenCoreConfig(conf) {
 
   def debug: Boolean = getConf(DEBUG_ENABLED)
 
+  /** Full scan SQL metrics; also enabled when [[debug]] is true. */
+  def detailedScanMetricsEnabled: Boolean =
+    getConf(SCAN_DETAILED_METRICS_ENABLED) || debug
+
   def collectUtStats: Boolean = getConf(UT_STATISTIC)
 
   def benchmarkStageId: Int = getConf(BENCHMARK_TASK_STAGEID)
@@ -1288,6 +1292,16 @@ object GlutenConfig extends ConfigRegistry {
       .booleanConf
       .createWithDefault(false)
 
+  val SCAN_DETAILED_METRICS_ENABLED =
+    buildConf("spark.gluten.sql.scan.detailedMetrics.enabled")
+      .doc(
+        "When true (default), Velox backend scan operators register all 
detailed SQL metrics. " +
+          "When false, only essential scan metrics are registered to reduce 
driver memory " +
+          "usage. Also enabled automatically when spark.gluten.sql.debug is 
true. " +
+          "Does not affect the ClickHouse backend.")
+      .booleanConf
+      .createWithDefault(true)
+
   val DEBUG_KEEP_JNI_WORKSPACE =
     buildStaticConf("spark.gluten.sql.debug.keepJniWorkspace")
       .internal()
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 2278bd3a62..8fea175d7e 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -109,11 +109,15 @@ abstract class FileSourceScanExecTransformerBase(
     disableBucketedScan)
   with DatasourceScanTransformer {
 
-  // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
-  @transient override lazy val metrics: Map[String, SQLMetric] =
+  // Executor-side metrics only (excludes driverMetricsAlias).
+  @transient private lazy val executorSideScanMetrics: Map[String, SQLMetric] =
     BackendsApiManager.getMetricsApiInstance
       .genFileSourceScanTransformerMetrics(sparkContext)
-      .filter(m => !driverMetricsAlias.contains(m._1)) ++ driverMetricsAlias
+      .filter(m => !driverMetricsAlias.contains(m._1))
+
+  // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+    executorSideScanMetrics ++ driverMetricsAlias
 
   override def scanFilters: Seq[Expression] = dataFilters
 
@@ -177,7 +181,8 @@ abstract class FileSourceScanExecTransformerBase(
   }
 
   override def metricsUpdater(): MetricsUpdater =
-    
BackendsApiManager.getMetricsApiInstance.genFileSourceScanTransformerMetricsUpdater(metrics)
+    BackendsApiManager.getMetricsApiInstance
+      .genFileSourceScanTransformerMetricsUpdater(executorSideScanMetrics)
 
   override val nodeName: String = {
     s"${getClass.getSimpleName} $relation 
${tableIdentifier.map(_.unquotedString).getOrElse("")}"
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/metrics/ScanMetricsUtil.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/metrics/ScanMetricsUtil.scala
new file mode 100644
index 0000000000..1ff7e7efdb
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/metrics/ScanMetricsUtil.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.metrics
+
+import org.apache.gluten.config.GlutenConfig
+
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+/**
+ * Velox-only utilities to reduce scan operator SQL metrics under driver 
memory pressure.
+ *
+ * By default all detailed metrics are registered. Disable full collection via
+ * `spark.gluten.sql.scan.detailedMetrics.enabled=false` to keep only 
essential metrics. Also
+ * enabled automatically when `spark.gluten.sql.debug` is true.
+ */
+object ScanMetricsUtil {
+
+  def detailedScanMetricsEnabled: Boolean = 
GlutenConfig.get.detailedScanMetricsEnabled
+
+  /** Velox BatchScanExecTransformer - executor-side metrics (default subset). 
*/
+  val VELOX_BATCH_SCAN_MINIMAL_METRICS: Set[String] = Set(
+    "rawInputRows",
+    "rawInputBytes",
+    "numOutputRows",
+    "outputBytes",
+    "scanTime",
+    "wallNanos",
+    "peakMemoryBytes",
+    "ioWaitTime",
+    "storageReadBytes"
+  )
+
+  /** Velox FileSourceScan / HiveTableScan - executor + Spark-aligned driver 
metrics. */
+  val VELOX_FILE_SCAN_MINIMAL_METRICS: Set[String] = 
VELOX_BATCH_SCAN_MINIMAL_METRICS ++ Set(
+    "numFiles",
+    "metadataTime",
+    "filesSize",
+    "numPartitions",
+    "pruningTime"
+  )
+
+  val VELOX_HIVE_SCAN_MINIMAL_METRICS: Set[String] = 
VELOX_FILE_SCAN_MINIMAL_METRICS
+
+  /** Driver-only FileSource/Hive scan metrics (not passed to executor 
MetricsUpdater). */
+  val VELOX_FILE_SCAN_DRIVER_METRICS: Set[String] = Set(
+    "numFiles",
+    "metadataTime",
+    "filesSize",
+    "numPartitions",
+    "pruningTime"
+  )
+
+  def filterExecutorMetrics(metrics: Map[String, SQLMetric]): Map[String, 
SQLMetric] =
+    metrics.filterKeys(!VELOX_FILE_SCAN_DRIVER_METRICS.contains(_)).toMap
+
+  def filterScanMetrics(
+      metrics: Map[String, SQLMetric],
+      minimalKeys: Set[String]): Map[String, SQLMetric] = {
+    if (detailedScanMetricsEnabled) {
+      metrics
+    } else {
+      metrics.filterKeys(minimalKeys.contains).toMap
+    }
+  }
+
+  def inc(metric: Option[SQLMetric], value: Long): Unit = metric.foreach(_ += 
value)
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index a091d047c2..26613f5f80 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.hive
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.metrics.MetricsUpdater
+import org.apache.gluten.metrics.{MetricsUpdater, ScanMetricsUtil}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
@@ -116,7 +116,8 @@ case class HiveTableScanExecTransformer(
   override def getRootPathsInternal: Seq[String] = Seq.empty
 
   override def metricsUpdater(): MetricsUpdater =
-    
BackendsApiManager.getMetricsApiInstance.genHiveTableScanTransformerMetricsUpdater(metrics)
+    
BackendsApiManager.getMetricsApiInstance.genHiveTableScanTransformerMetricsUpdater(
+      ScanMetricsUtil.filterExecutorMetrics(metrics))
 
   @transient private lazy val hivePartitionConverter =
     new HivePartitionConverter(session.sessionState.newHadoopConf(), session)
diff --git 
a/gluten-substrait/src/test/scala/org/apache/gluten/metrics/ScanMetricsUtilSuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/gluten/metrics/ScanMetricsUtilSuite.scala
new file mode 100644
index 0000000000..00e13ed0c9
--- /dev/null
+++ 
b/gluten-substrait/src/test/scala/org/apache/gluten/metrics/ScanMetricsUtilSuite.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.metrics
+
+import org.apache.gluten.config.GlutenConfig
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.test.SharedSparkSession
+
+class ScanMetricsUtilSuite extends SparkFunSuite with SharedSparkSession with 
SQLHelper {
+
+  test("filterScanMetrics keeps minimal Velox batch scan keys when disabled") {
+    withSQLConf(
+      GlutenConfig.SCAN_DETAILED_METRICS_ENABLED.key -> "false",
+      GlutenConfig.DEBUG_ENABLED.key -> "false") {
+      val full = Map(
+        "numInputRows" -> SQLMetrics.createMetric(sparkContext, "in"),
+        "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "raw in rows"),
+        "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "out"),
+        "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu")
+      )
+      val filtered =
+        ScanMetricsUtil.filterScanMetrics(full, 
ScanMetricsUtil.VELOX_BATCH_SCAN_MINIMAL_METRICS)
+      assert(!filtered.contains("numInputRows"))
+      assert(!filtered.contains("cpuCount"))
+      assert(filtered.contains("rawInputRows"))
+      assert(filtered.contains("numOutputRows"))
+    }
+  }
+
+  test("filterScanMetrics returns all keys by default") {
+    withSQLConf(
+      GlutenConfig.SCAN_DETAILED_METRICS_ENABLED.key -> "true",
+      GlutenConfig.DEBUG_ENABLED.key -> "false") {
+      val full = Map(
+        "numInputRows" -> SQLMetrics.createMetric(sparkContext, "in"),
+        "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "raw in rows"))
+      val filtered =
+        ScanMetricsUtil.filterScanMetrics(full, 
ScanMetricsUtil.VELOX_BATCH_SCAN_MINIMAL_METRICS)
+      assert(filtered.size == full.size)
+    }
+  }
+
+  test("filterScanMetrics returns all keys when debug is enabled") {
+    withSQLConf(
+      GlutenConfig.SCAN_DETAILED_METRICS_ENABLED.key -> "false",
+      GlutenConfig.DEBUG_ENABLED.key -> "true") {
+      val full = Map("cpuCount" -> SQLMetrics.createMetric(sparkContext, 
"cpu"))
+      val filtered =
+        ScanMetricsUtil.filterScanMetrics(full, 
ScanMetricsUtil.VELOX_BATCH_SCAN_MINIMAL_METRICS)
+      assert(filtered.size == full.size)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to