This is an automated email from the ASF dual-hosted git repository.

felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 89c4182ec7 [VL] Add metrics for lazy vector load (#10726)
89c4182ec7 is described below

commit 89c4182ec7758822e69b0332cf7759f3f44d1508
Author: Rong Ma <mar...@apache.org>
AuthorDate: Fri Sep 26 11:12:09 2025 +0800

    [VL] Add metrics for lazy vector load (#10726)
    
    The PR add metrics "time of loading lazy vectors" in NextInternal function 
to the last operator of the transformer.
---
 .../java/org/apache/gluten/metrics/Metrics.java    |  7 +-
 .../org/apache/gluten/metrics/OperatorMetrics.java |  6 +-
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala | 80 +++++++++++++++++-----
 .../gluten/metrics/BatchScanMetricsUpdater.scala   |  1 +
 .../gluten/metrics/ExpandMetricsUpdater.scala      |  1 +
 .../metrics/FileSourceScanMetricsUpdater.scala     |  2 +
 .../gluten/metrics/FilterMetricsUpdater.scala      |  1 +
 .../gluten/metrics/GenerateMetricsUpdater.scala    |  1 +
 .../metrics/HashAggregateMetricsUpdater.scala      |  7 ++
 .../metrics/HiveTableScanMetricsUpdater.scala      |  2 +
 .../apache/gluten/metrics/JoinMetricsUpdater.scala |  6 ++
 .../gluten/metrics/LimitMetricsUpdater.scala       |  1 +
 .../org/apache/gluten/metrics/MetricsUtil.scala    |  5 +-
 .../metrics/NestedLoopJoinMetricsUpdater.scala     |  6 ++
 .../gluten/metrics/ProjectMetricsUpdater.scala     |  1 +
 .../gluten/metrics/SampleMetricsUpdater.scala      |  1 +
 .../apache/gluten/metrics/SortMetricsUpdater.scala |  1 +
 .../gluten/metrics/UnionMetricsUpdater.scala       |  3 +
 .../gluten/metrics/WindowMetricsUpdater.scala      |  1 +
 .../gluten/metrics/WriteFilesMetricsUpdater.scala  |  1 +
 cpp/core/jni/JniWrapper.cc                         |  3 +-
 cpp/core/utils/Metrics.h                           |  3 +
 cpp/velox/compute/WholeStageResultIterator.cc      | 13 +++-
 cpp/velox/compute/WholeStageResultIterator.h       |  2 +
 24 files changed, 133 insertions(+), 22 deletions(-)

diff --git 
a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java 
b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
index d9b16aef98..5515dd9300 100644
--- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
+++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
@@ -59,6 +59,8 @@ public class Metrics implements IMetrics {
   public long[] writeIOTime;
   public long[] numWrittenFiles;
 
+  public long[] loadLazyVectorTime;
+
   public SingleMetric singleMetric = new SingleMetric();
 
   public String taskStats;
@@ -104,6 +106,7 @@ public class Metrics implements IMetrics {
       long[] physicalWrittenBytes,
       long[] writeIOTime,
       long[] numWrittenFiles,
+      long[] loadLazyVectorTime,
       String taskStats) {
     this.inputRows = inputRows;
     this.inputVectors = inputVectors;
@@ -145,6 +148,7 @@ public class Metrics implements IMetrics {
     this.physicalWrittenBytes = physicalWrittenBytes;
     this.writeIOTime = writeIOTime;
     this.numWrittenFiles = numWrittenFiles;
+    this.loadLazyVectorTime = loadLazyVectorTime;
     this.taskStats = taskStats;
   }
 
@@ -191,7 +195,8 @@ public class Metrics implements IMetrics {
         dataSourceReadTime[index],
         physicalWrittenBytes[index],
         writeIOTime[index],
-        numWrittenFiles[index]);
+        numWrittenFiles[index],
+        loadLazyVectorTime[index]);
   }
 
   public SingleMetric getSingleMetrics() {
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java 
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
index 24bedf0a46..9292caebeb 100644
--- 
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
+++ 
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
@@ -57,6 +57,8 @@ public class OperatorMetrics implements IOperatorMetrics {
   public long writeIOTime;
   public long numWrittenFiles;
 
+  public long loadLazyVectorTime;
+
   /** Create an instance for operator metrics. */
   public OperatorMetrics(
       long inputRows,
@@ -96,7 +98,8 @@ public class OperatorMetrics implements IOperatorMetrics {
       long dataSourceReadTime,
       long physicalWrittenBytes,
       long writeIOTime,
-      long numWrittenFiles) {
+      long numWrittenFiles,
+      long loadLazyVectorTime) {
     this.inputRows = inputRows;
     this.inputVectors = inputVectors;
     this.inputBytes = inputBytes;
@@ -135,5 +138,6 @@ public class OperatorMetrics implements IOperatorMetrics {
     this.physicalWrittenBytes = physicalWrittenBytes;
     this.writeIOTime = writeIOTime;
     this.numWrittenFiles = numWrittenFiles;
+    this.loadLazyVectorTime = loadLazyVectorTime;
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 0feefe02bd..12c66a6a1a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -122,7 +122,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait 
time"),
       "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage 
read bytes"),
       "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd 
read bytes"),
-      "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read 
bytes")
+      "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read 
bytes"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genBatchScanTransformerMetricsUpdater(
@@ -169,7 +172,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait 
time"),
       "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage 
read bytes"),
       "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd 
read bytes"),
-      "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read 
bytes")
+      "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read 
bytes"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genHiveTableScanTransformerMetricsUpdater(
@@ -216,7 +222,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait 
time"),
       "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage 
read bytes"),
       "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd 
read bytes"),
-      "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read 
bytes")
+      "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read 
bytes"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genFileSourceScanTransformerMetricsUpdater(
@@ -232,7 +241,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak 
memory bytes"),
       "numMemoryAllocations" -> SQLMetrics.createMetric(
         sparkContext,
-        "number of memory allocations")
+        "number of memory allocations"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genFilterTransformerMetricsUpdater(
@@ -250,7 +262,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak 
memory bytes"),
       "numMemoryAllocations" -> SQLMetrics.createMetric(
         sparkContext,
-        "number of memory allocations")
+        "number of memory allocations"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genProjectTransformerMetricsUpdater(
@@ -295,7 +310,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "finalOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
final output rows"),
       "finalOutputVectors" -> SQLMetrics.createMetric(
         sparkContext,
-        "number of final output vectors")
+        "number of final output vectors"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genHashAggregateTransformerMetricsUpdater(
@@ -312,7 +330,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak 
memory bytes"),
       "numMemoryAllocations" -> SQLMetrics.createMetric(
         sparkContext,
-        "number of memory allocations")
+        "number of memory allocations"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genExpandTransformerMetricsUpdater(metrics: Map[String, 
SQLMetric]): MetricsUpdater =
@@ -381,7 +402,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "spilledBytes" -> SQLMetrics.createSizeMetric(sparkContext, "bytes 
written for spilling"),
       "spilledRows" -> SQLMetrics.createMetric(sparkContext, "total rows 
written for spilling"),
       "spilledPartitions" -> SQLMetrics.createMetric(sparkContext, "total 
spilled partitions"),
-      "spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled 
files")
+      "spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled 
files"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genWindowTransformerMetricsUpdater(metrics: Map[String, 
SQLMetric]): MetricsUpdater =
@@ -411,7 +435,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak 
memory bytes"),
       "numMemoryAllocations" -> SQLMetrics.createMetric(
         sparkContext,
-        "number of memory allocations")
+        "number of memory allocations"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genLimitTransformerMetricsUpdater(metrics: Map[String, 
SQLMetric]): MetricsUpdater =
@@ -424,7 +451,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
         "number of written bytes"),
       "writeIONanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time 
of write IO"),
       "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of 
write"),
-      "numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files")
+      "numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): 
MetricsUpdater =
@@ -444,7 +474,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "spilledBytes" -> SQLMetrics.createSizeMetric(sparkContext, "bytes 
written for spilling"),
       "spilledRows" -> SQLMetrics.createMetric(sparkContext, "total rows 
written for spilling"),
       "spilledPartitions" -> SQLMetrics.createMetric(sparkContext, "total 
spilled partitions"),
-      "spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled 
files")
+      "spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled 
files"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genSortTransformerMetricsUpdater(metrics: Map[String, 
SQLMetric]): MetricsUpdater =
@@ -479,7 +512,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
         "postProject cpu wall time count"),
       "postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
         sparkContext,
-        "time of postProjection")
+        "time of postProjection"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genSortMergeJoinTransformerMetricsUpdater(
@@ -598,7 +634,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
         "time of postProjection"),
       "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
-      "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes")
+      "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genHashJoinTransformerMetricsUpdater(
@@ -663,7 +702,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
         "time of postProjection"),
       "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
-      "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes")
+      "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genNestedLoopJoinTransformerMetricsUpdater(
@@ -679,7 +721,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak 
memory bytes"),
       "numMemoryAllocations" -> SQLMetrics.createMetric(
         sparkContext,
-        "number of memory allocations")
+        "number of memory allocations"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
     )
 
   override def genSampleTransformerMetricsUpdater(metrics: Map[String, 
SQLMetric]): MetricsUpdater =
@@ -690,7 +735,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
     "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input 
vectors"),
     "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input 
bytes"),
     "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of 
union"),
-    "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count")
+    "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
+    "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+      sparkContext,
+      "time of loading lazy vectors")
   )
 
   override def genUnionTransformerMetricsUpdater(metrics: Map[String, 
SQLMetric]): MetricsUpdater =
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
index 886e8353b8..07b5c30107 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
@@ -56,6 +56,7 @@ class BatchScanMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends Metri
       metrics("preloadSplits") += operatorMetrics.preloadSplits
       metrics("dataSourceAddSplitTime") += 
operatorMetrics.dataSourceAddSplitTime
       metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime
+      metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/ExpandMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/ExpandMetricsUpdater.scala
index 68f6b4239b..6179ed3e1b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/ExpandMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/ExpandMetricsUpdater.scala
@@ -30,6 +30,7 @@ class ExpandMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsU
       metrics("wallNanos") += operatorMetrics.wallNanos
       metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
       metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
+      metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
index b76525d7fb..4ba0195802 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
@@ -51,6 +51,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
   val storageReadBytes: SQLMetric = metrics("storageReadBytes")
   val localReadBytes: SQLMetric = metrics("localReadBytes")
   val ramReadBytes: SQLMetric = metrics("ramReadBytes")
+  val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
 
   override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
     inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
@@ -84,6 +85,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
       preloadSplits += operatorMetrics.preloadSplits
       dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
       dataSourceReadTime += operatorMetrics.dataSourceReadTime
+      loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
index e8c27f6a43..03a5f604e5 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
@@ -33,6 +33,7 @@ class FilterMetricsUpdater(
       metrics("wallNanos") += operatorMetrics.wallNanos
       metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
       metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
+      metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
       extraMetrics.foreach {
         case (name, metric) =>
           name match {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/GenerateMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/GenerateMetricsUpdater.scala
index 0a3dccd644..6d88e2bacc 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/GenerateMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/GenerateMetricsUpdater.scala
@@ -29,6 +29,7 @@ class GenerateMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends Metric
       metrics("wallNanos") += nativeMetrics.wallNanos
       metrics("peakMemoryBytes") += nativeMetrics.peakMemoryBytes
       metrics("numMemoryAllocations") += nativeMetrics.numMemoryAllocations
+      metrics("loadLazyVectorTime") += nativeMetrics.loadLazyVectorTime
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
index f81ab2708a..3843a34b83 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
@@ -22,6 +22,8 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkMetricsUtil
 import org.apache.spark.task.TaskResources
 
+import scala.collection.JavaConverters._
+
 trait HashAggregateMetricsUpdater extends MetricsUpdater {
   def updateAggregationMetrics(
       aggregationMetrics: java.util.ArrayList[OperatorMetrics],
@@ -53,6 +55,8 @@ class HashAggregateMetricsUpdaterImpl(val metrics: 
Map[String, SQLMetric])
   val finalOutputRows: SQLMetric = metrics("finalOutputRows")
   val finalOutputVectors: SQLMetric = metrics("finalOutputVectors")
 
+  val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
+
   override def updateAggregationMetrics(
       aggregationMetrics: java.util.ArrayList[OperatorMetrics],
       aggParams: AggregationParams): Unit = {
@@ -85,6 +89,9 @@ class HashAggregateMetricsUpdaterImpl(val metrics: 
Map[String, SQLMetric])
       rowConstructionWallNanos += aggregationMetrics.get(idx).wallNanos
       idx += 1
     }
+
+    loadLazyVectorTime += aggregationMetrics.asScala.last.loadLazyVectorTime
+
     if (TaskResources.inSparkTask()) {
       SparkMetricsUtil.incMemoryBytesSpilled(
         TaskResources.getLocalTaskContext().taskMetrics(),
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
index c8e4e98cdf..c061540ab6 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
@@ -46,6 +46,7 @@ class HiveTableScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric]
   val storageReadBytes: SQLMetric = metrics("storageReadBytes")
   val localReadBytes: SQLMetric = metrics("localReadBytes")
   val ramReadBytes: SQLMetric = metrics("ramReadBytes")
+  val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
 
   override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
     inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
@@ -79,6 +80,7 @@ class HiveTableScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric]
       preloadSplits += operatorMetrics.preloadSplits
       dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
       dataSourceReadTime += operatorMetrics.dataSourceReadTime
+      loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
index 9b8f03dd7f..103bd00fbf 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
@@ -25,6 +25,8 @@ import org.apache.spark.task.TaskResources
 
 import java.util
 
+import scala.collection.JavaConverters._
+
 trait JoinMetricsUpdater extends MetricsUpdater {
   def updateJoinMetrics(
       joinMetrics: java.util.ArrayList[OperatorMetrics],
@@ -103,6 +105,8 @@ class HashJoinMetricsUpdater(override val metrics: 
Map[String, SQLMetric])
   val buildPreProjectionCpuCount: SQLMetric = 
metrics("buildPreProjectionCpuCount")
   val buildPreProjectionWallNanos: SQLMetric = 
metrics("buildPreProjectionWallNanos")
 
+  val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
+
   override protected def updateJoinMetricsInternal(
       joinMetrics: java.util.ArrayList[OperatorMetrics],
       joinParams: JoinParams): Unit = {
@@ -166,6 +170,8 @@ class HashJoinMetricsUpdater(override val metrics: 
Map[String, SQLMetric])
         TaskResources.getLocalTaskContext().taskMetrics(),
         hashBuildMetrics.spilledBytes)
     }
+
+    loadLazyVectorTime += joinMetrics.asScala.last.loadLazyVectorTime
   }
 }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/LimitMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/LimitMetricsUpdater.scala
index f3c1b399f6..a2b0947a13 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/LimitMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/LimitMetricsUpdater.scala
@@ -30,6 +30,7 @@ class LimitMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsUp
       metrics("wallNanos") += operatorMetrics.wallNanos
       metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
       metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
+      metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index cd3e0eafa0..3fd44456b1 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -136,6 +136,7 @@ object MetricsUtil extends Logging {
     var dataSourceAddSplitTime: Long = 0
     var dataSourceReadTime: Long = 0
     var numWrittenFiles: Long = 0
+    var loadLazyVectorTime: Long = 0
 
     val metricsIterator = operatorMetrics.iterator()
     while (metricsIterator.hasNext) {
@@ -168,6 +169,7 @@ object MetricsUtil extends Logging {
       dataSourceAddSplitTime += metrics.dataSourceAddSplitTime
       dataSourceReadTime += metrics.dataSourceReadTime
       numWrittenFiles += metrics.numWrittenFiles
+      loadLazyVectorTime += metrics.loadLazyVectorTime
     }
 
     new OperatorMetrics(
@@ -208,7 +210,8 @@ object MetricsUtil extends Logging {
       dataSourceReadTime,
       physicalWrittenBytes,
       writeIOTime,
-      numWrittenFiles
+      numWrittenFiles,
+      loadLazyVectorTime
     )
   }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
index 2f55849889..f2a4dfff36 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
@@ -22,6 +22,8 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 
 import java.util
 
+import scala.collection.JavaConverters._
+
 class NestedLoopJoinMetricsUpdater(override val metrics: Map[String, 
SQLMetric])
   extends JoinMetricsUpdaterBase(metrics) {
 
@@ -45,6 +47,8 @@ class NestedLoopJoinMetricsUpdater(override val metrics: 
Map[String, SQLMetric])
   val nestedLoopJoinProbeNumMemoryAllocations: SQLMetric = metrics(
     "nestedLoopJoinProbeNumMemoryAllocations")
 
+  val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
+
   override protected def updateJoinMetricsInternal(
       joinMetrics: util.ArrayList[OperatorMetrics],
       joinParams: JoinParams): Unit = {
@@ -69,5 +73,7 @@ class NestedLoopJoinMetricsUpdater(override val metrics: 
Map[String, SQLMetric])
     nestedLoopJoinBuildWallNanos += nestedLoopJoinBuildMetrics.wallNanos
     nestedLoopJoinBuildPeakMemoryBytes += 
nestedLoopJoinBuildMetrics.peakMemoryBytes
     nestedLoopJoinBuildNumMemoryAllocations += 
nestedLoopJoinBuildMetrics.numMemoryAllocations
+
+    loadLazyVectorTime += joinMetrics.asScala.last.loadLazyVectorTime
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
index cbd195bb80..e297bf356f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
@@ -33,6 +33,7 @@ class ProjectMetricsUpdater(
       metrics("wallNanos") += operatorMetrics.wallNanos
       metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
       metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
+      metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
       extraMetrics.foreach {
         case (name, metric) =>
           name match {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/SampleMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/SampleMetricsUpdater.scala
index a108a5b797..e2f14577f8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/SampleMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/SampleMetricsUpdater.scala
@@ -30,6 +30,7 @@ class SampleMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsU
       metrics("wallNanos") += operatorMetrics.wallNanos
       metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
       metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
+      metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala
index cb01ae1510..57ed07a16c 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala
@@ -36,6 +36,7 @@ class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) 
extends MetricsUpd
       metrics("spilledRows") += operatorMetrics.spilledRows
       metrics("spilledPartitions") += operatorMetrics.spilledPartitions
       metrics("spilledFiles") += operatorMetrics.spilledFiles
+      metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
       if (TaskResources.inSparkTask()) {
         SparkMetricsUtil.incMemoryBytesSpilled(
           TaskResources.getLocalTaskContext().taskMetrics(),
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/UnionMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/UnionMetricsUpdater.scala
index 9e91cf368c..4a3fc961eb 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/UnionMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/UnionMetricsUpdater.scala
@@ -18,6 +18,8 @@ package org.apache.gluten.metrics
 
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import scala.collection.JavaConverters._
+
 class UnionMetricsUpdater(val metrics: Map[String, SQLMetric]) extends 
MetricsUpdater {
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     throw new UnsupportedOperationException()
@@ -31,5 +33,6 @@ class UnionMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsUp
     metrics("inputBytes") += localExchangeMetrics.inputBytes
     metrics("cpuCount") += localExchangeMetrics.cpuCount
     metrics("wallNanos") += localExchangeMetrics.wallNanos
+    metrics("loadLazyVectorTime") += 
unionMetrics.asScala.last.loadLazyVectorTime
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/WindowMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/WindowMetricsUpdater.scala
index 4a098102b1..2f648bd448 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/WindowMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/WindowMetricsUpdater.scala
@@ -34,6 +34,7 @@ class WindowMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsU
       metrics("spilledRows") += operatorMetrics.spilledRows
       metrics("spilledPartitions") += operatorMetrics.spilledPartitions
       metrics("spilledFiles") += operatorMetrics.spilledFiles
+      metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala
index 4a89691895..7dc0ca880c 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala
@@ -27,6 +27,7 @@ class WriteFilesMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends Metr
       metrics("writeIONanos") += operatorMetrics.writeIOTime
       metrics("wallNanos") += operatorMetrics.wallNanos
       metrics("numWrittenFiles") += operatorMetrics.numWrittenFiles
+      metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
     }
   }
 }
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 11e7ad15de..fe69708192 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -259,7 +259,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
       env,
       metricsBuilderClass,
       "<init>",
-      
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
+      
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
 
   nativeColumnarToRowInfoClass =
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
@@ -590,6 +590,7 @@ JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
       longArray[Metrics::kPhysicalWrittenBytes],
       longArray[Metrics::kWriteIOTime],
       longArray[Metrics::kNumWrittenFiles],
+      longArray[Metrics::kLoadLazyVectorTime],
       metrics && metrics->stats.has_value() ? 
env->NewStringUTF(metrics->stats->c_str()) : nullptr);
 
   JNI_METHOD_END(nullptr)
diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h
index d3169eb69f..01d25182cd 100644
--- a/cpp/core/utils/Metrics.h
+++ b/cpp/core/utils/Metrics.h
@@ -88,6 +88,9 @@ struct Metrics {
     kWriteIOTime,
     kNumWrittenFiles,
 
+    // Load lazy vector.
+    kLoadLazyVectorTime,
+
     // The end of enum items.
     kEnd,
     kNum = kEnd - kBegin
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 039e67bd4f..d2b130f629 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -250,8 +250,12 @@ std::shared_ptr<ColumnarBatch> 
WholeStageResultIterator::nextInternal() {
   if (numRows == 0) {
     return nullptr;
   }
-  for (auto& child : vector->children()) {
-    child->loadedVector();
+
+  {
+    ScopedTimer timer(&loadLazyVectorTime_);
+    for (auto& child : vector->children()) {
+      child->loadedVector();
+    }
   }
 
   return std::make_shared<VeloxColumnarBatch>(vector);
@@ -407,6 +411,8 @@ void WholeStageResultIterator::collectMetrics() {
 
   int metricIndex = 0;
   for (int idx = 0; idx < orderedNodeIds_.size(); idx++) {
+    metrics_->get(Metrics::kLoadLazyVectorTime)[metricIndex] = 0;
+
     const auto& nodeId = orderedNodeIds_[idx];
     if (planStats.find(nodeId) == planStats.end()) {
       // Special handing for Filter over Project case. Filter metrics are
@@ -481,6 +487,9 @@ void WholeStageResultIterator::collectMetrics() {
     }
   }
 
+  // Put the loadLazyVector time into the metrics of the last operator.
+  metrics_->get(Metrics::kLoadLazyVectorTime)[orderedNodeIds_.size() - 1] = 
loadLazyVectorTime_;
+
   // Populate the metrics with task stats for long running tasks.
   if (const int64_t collectTaskStatsThreshold =
           veloxCfg_->get<int64_t>(kTaskMetricsToEventLogThreshold, 
kTaskMetricsToEventLogThresholdDefault);
diff --git a/cpp/velox/compute/WholeStageResultIterator.h 
b/cpp/velox/compute/WholeStageResultIterator.h
index f24817ca8b..671016596b 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -141,6 +141,8 @@ class WholeStageResultIterator : public 
ColumnarBatchIterator {
   std::vector<facebook::velox::core::PlanNodeId> streamIds_;
   std::vector<std::vector<facebook::velox::exec::Split>> splits_;
   bool noMoreSplits_ = false;
+
+  int64_t loadLazyVectorTime_ = 0;
 };
 
 } // namespace gluten


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org


Reply via email to