This is an automated email from the ASF dual-hosted git repository. felixybw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new 89c4182ec7 [VL] Add metrics for lazy vector load (#10726) 89c4182ec7 is described below commit 89c4182ec7758822e69b0332cf7759f3f44d1508 Author: Rong Ma <mar...@apache.org> AuthorDate: Fri Sep 26 11:12:09 2025 +0800 [VL] Add metrics for lazy vector load (#10726) The PR add metrics "time of loading lazy vectors" in NextInternal function to the last operator of the transformer. --- .../java/org/apache/gluten/metrics/Metrics.java | 7 +- .../org/apache/gluten/metrics/OperatorMetrics.java | 6 +- .../gluten/backendsapi/velox/VeloxMetricsApi.scala | 80 +++++++++++++++++----- .../gluten/metrics/BatchScanMetricsUpdater.scala | 1 + .../gluten/metrics/ExpandMetricsUpdater.scala | 1 + .../metrics/FileSourceScanMetricsUpdater.scala | 2 + .../gluten/metrics/FilterMetricsUpdater.scala | 1 + .../gluten/metrics/GenerateMetricsUpdater.scala | 1 + .../metrics/HashAggregateMetricsUpdater.scala | 7 ++ .../metrics/HiveTableScanMetricsUpdater.scala | 2 + .../apache/gluten/metrics/JoinMetricsUpdater.scala | 6 ++ .../gluten/metrics/LimitMetricsUpdater.scala | 1 + .../org/apache/gluten/metrics/MetricsUtil.scala | 5 +- .../metrics/NestedLoopJoinMetricsUpdater.scala | 6 ++ .../gluten/metrics/ProjectMetricsUpdater.scala | 1 + .../gluten/metrics/SampleMetricsUpdater.scala | 1 + .../apache/gluten/metrics/SortMetricsUpdater.scala | 1 + .../gluten/metrics/UnionMetricsUpdater.scala | 3 + .../gluten/metrics/WindowMetricsUpdater.scala | 1 + .../gluten/metrics/WriteFilesMetricsUpdater.scala | 1 + cpp/core/jni/JniWrapper.cc | 3 +- cpp/core/utils/Metrics.h | 3 + cpp/velox/compute/WholeStageResultIterator.cc | 13 +++- cpp/velox/compute/WholeStageResultIterator.h | 2 + 24 files changed, 133 insertions(+), 22 deletions(-) diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java index d9b16aef98..5515dd9300 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java @@ -59,6 +59,8 @@ public class Metrics implements IMetrics { public long[] writeIOTime; public long[] numWrittenFiles; + public long[] loadLazyVectorTime; + public SingleMetric singleMetric = new SingleMetric(); public String taskStats; @@ -104,6 +106,7 @@ public class Metrics implements IMetrics { long[] physicalWrittenBytes, long[] writeIOTime, long[] numWrittenFiles, + long[] loadLazyVectorTime, String taskStats) { this.inputRows = inputRows; this.inputVectors = inputVectors; @@ -145,6 +148,7 @@ public class Metrics implements IMetrics { this.physicalWrittenBytes = physicalWrittenBytes; this.writeIOTime = writeIOTime; this.numWrittenFiles = numWrittenFiles; + this.loadLazyVectorTime = loadLazyVectorTime; this.taskStats = taskStats; } @@ -191,7 +195,8 @@ public class Metrics implements IMetrics { dataSourceReadTime[index], physicalWrittenBytes[index], writeIOTime[index], - numWrittenFiles[index]); + numWrittenFiles[index], + loadLazyVectorTime[index]); } public SingleMetric getSingleMetrics() { diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index 24bedf0a46..9292caebeb 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -57,6 +57,8 @@ public class OperatorMetrics implements IOperatorMetrics { public long writeIOTime; public long numWrittenFiles; + public long loadLazyVectorTime; + /** Create an instance for operator metrics. */ public OperatorMetrics( long inputRows, @@ -96,7 +98,8 @@ public class OperatorMetrics implements IOperatorMetrics { long dataSourceReadTime, long physicalWrittenBytes, long writeIOTime, - long numWrittenFiles) { + long numWrittenFiles, + long loadLazyVectorTime) { this.inputRows = inputRows; this.inputVectors = inputVectors; this.inputBytes = inputBytes; @@ -135,5 +138,6 @@ public class OperatorMetrics implements IOperatorMetrics { this.physicalWrittenBytes = physicalWrittenBytes; this.writeIOTime = writeIOTime; this.numWrittenFiles = numWrittenFiles; + this.loadLazyVectorTime = loadLazyVectorTime; } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 0feefe02bd..12c66a6a1a 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -122,7 +122,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"), "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"), "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"), - "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes") + "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genBatchScanTransformerMetricsUpdater( @@ -169,7 +172,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"), "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"), "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"), - "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes") + "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genHiveTableScanTransformerMetricsUpdater( @@ -216,7 +222,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"), "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"), "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"), - "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes") + "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genFileSourceScanTransformerMetricsUpdater( @@ -232,7 +241,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"), "numMemoryAllocations" -> SQLMetrics.createMetric( sparkContext, - "number of memory allocations") + "number of memory allocations"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genFilterTransformerMetricsUpdater( @@ -250,7 +262,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"), "numMemoryAllocations" -> SQLMetrics.createMetric( sparkContext, - "number of memory allocations") + "number of memory allocations"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genProjectTransformerMetricsUpdater( @@ -295,7 +310,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "finalOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of final output rows"), "finalOutputVectors" -> SQLMetrics.createMetric( sparkContext, - "number of final output vectors") + "number of final output vectors"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genHashAggregateTransformerMetricsUpdater( @@ -312,7 +330,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"), "numMemoryAllocations" -> SQLMetrics.createMetric( sparkContext, - "number of memory allocations") + "number of memory allocations"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = @@ -381,7 +402,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "spilledBytes" -> SQLMetrics.createSizeMetric(sparkContext, "bytes written for spilling"), "spilledRows" -> SQLMetrics.createMetric(sparkContext, "total rows written for spilling"), "spilledPartitions" -> SQLMetrics.createMetric(sparkContext, "total spilled partitions"), - "spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled files") + "spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled files"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genWindowTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = @@ -411,7 +435,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"), "numMemoryAllocations" -> SQLMetrics.createMetric( sparkContext, - "number of memory allocations") + "number of memory allocations"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genLimitTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = @@ -424,7 +451,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "number of written bytes"), "writeIONanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of write IO"), "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of write"), - "numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files") + "numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = @@ -444,7 +474,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "spilledBytes" -> SQLMetrics.createSizeMetric(sparkContext, "bytes written for spilling"), "spilledRows" -> SQLMetrics.createMetric(sparkContext, "total rows written for spilling"), "spilledPartitions" -> SQLMetrics.createMetric(sparkContext, "total spilled partitions"), - "spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled files") + "spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled files"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genSortTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = @@ -479,7 +512,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "postProject cpu wall time count"), "postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric( sparkContext, - "time of postProjection") + "time of postProjection"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genSortMergeJoinTransformerMetricsUpdater( @@ -598,7 +634,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "time of postProjection"), "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), - "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes") + "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genHashJoinTransformerMetricsUpdater( @@ -663,7 +702,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "time of postProjection"), "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), - "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes") + "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genNestedLoopJoinTransformerMetricsUpdater( @@ -679,7 +721,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"), "numMemoryAllocations" -> SQLMetrics.createMetric( sparkContext, - "number of memory allocations") + "number of memory allocations"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genSampleTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = @@ -690,7 +735,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of union"), - "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count") + "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") ) override def genUnionTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala index 886e8353b8..07b5c30107 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala @@ -56,6 +56,7 @@ class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metri metrics("preloadSplits") += operatorMetrics.preloadSplits metrics("dataSourceAddSplitTime") += operatorMetrics.dataSourceAddSplitTime metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime + metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/ExpandMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/ExpandMetricsUpdater.scala index 68f6b4239b..6179ed3e1b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/ExpandMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/ExpandMetricsUpdater.scala @@ -30,6 +30,7 @@ class ExpandMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU metrics("wallNanos") += operatorMetrics.wallNanos metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations + metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala index b76525d7fb..4ba0195802 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala @@ -51,6 +51,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric val storageReadBytes: SQLMetric = metrics("storageReadBytes") val localReadBytes: SQLMetric = metrics("localReadBytes") val ramReadBytes: SQLMetric = metrics("ramReadBytes") + val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { inputMetrics.bridgeIncBytesRead(rawInputBytes.value) @@ -84,6 +85,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric preloadSplits += operatorMetrics.preloadSplits dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime dataSourceReadTime += operatorMetrics.dataSourceReadTime + loadLazyVectorTime += operatorMetrics.loadLazyVectorTime } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala index e8c27f6a43..03a5f604e5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala @@ -33,6 +33,7 @@ class FilterMetricsUpdater( metrics("wallNanos") += operatorMetrics.wallNanos metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations + metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime extraMetrics.foreach { case (name, metric) => name match { diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/GenerateMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/GenerateMetricsUpdater.scala index 0a3dccd644..6d88e2bacc 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/GenerateMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/GenerateMetricsUpdater.scala @@ -29,6 +29,7 @@ class GenerateMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metric metrics("wallNanos") += nativeMetrics.wallNanos metrics("peakMemoryBytes") += nativeMetrics.peakMemoryBytes metrics("numMemoryAllocations") += nativeMetrics.numMemoryAllocations + metrics("loadLazyVectorTime") += nativeMetrics.loadLazyVectorTime } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala index f81ab2708a..3843a34b83 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala @@ -22,6 +22,8 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkMetricsUtil import org.apache.spark.task.TaskResources +import scala.collection.JavaConverters._ + trait HashAggregateMetricsUpdater extends MetricsUpdater { def updateAggregationMetrics( aggregationMetrics: java.util.ArrayList[OperatorMetrics], @@ -53,6 +55,8 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric]) val finalOutputRows: SQLMetric = metrics("finalOutputRows") val finalOutputVectors: SQLMetric = metrics("finalOutputVectors") + val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime") + override def updateAggregationMetrics( aggregationMetrics: java.util.ArrayList[OperatorMetrics], aggParams: AggregationParams): Unit = { @@ -85,6 +89,9 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric]) rowConstructionWallNanos += aggregationMetrics.get(idx).wallNanos idx += 1 } + + loadLazyVectorTime += aggregationMetrics.asScala.last.loadLazyVectorTime + if (TaskResources.inSparkTask()) { SparkMetricsUtil.incMemoryBytesSpilled( TaskResources.getLocalTaskContext().taskMetrics(), diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala index c8e4e98cdf..c061540ab6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala @@ -46,6 +46,7 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric] val storageReadBytes: SQLMetric = metrics("storageReadBytes") val localReadBytes: SQLMetric = metrics("localReadBytes") val ramReadBytes: SQLMetric = metrics("ramReadBytes") + val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { inputMetrics.bridgeIncBytesRead(rawInputBytes.value) @@ -79,6 +80,7 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric] preloadSplits += operatorMetrics.preloadSplits dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime dataSourceReadTime += operatorMetrics.dataSourceReadTime + loadLazyVectorTime += operatorMetrics.loadLazyVectorTime } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala index 9b8f03dd7f..103bd00fbf 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala @@ -25,6 +25,8 @@ import org.apache.spark.task.TaskResources import java.util +import scala.collection.JavaConverters._ + trait JoinMetricsUpdater extends MetricsUpdater { def updateJoinMetrics( joinMetrics: java.util.ArrayList[OperatorMetrics], @@ -103,6 +105,8 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) val buildPreProjectionCpuCount: SQLMetric = metrics("buildPreProjectionCpuCount") val buildPreProjectionWallNanos: SQLMetric = metrics("buildPreProjectionWallNanos") + val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime") + override protected def updateJoinMetricsInternal( joinMetrics: java.util.ArrayList[OperatorMetrics], joinParams: JoinParams): Unit = { @@ -166,6 +170,8 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) TaskResources.getLocalTaskContext().taskMetrics(), hashBuildMetrics.spilledBytes) } + + loadLazyVectorTime += joinMetrics.asScala.last.loadLazyVectorTime } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/LimitMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/LimitMetricsUpdater.scala index f3c1b399f6..a2b0947a13 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/LimitMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/LimitMetricsUpdater.scala @@ -30,6 +30,7 @@ class LimitMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUp metrics("wallNanos") += operatorMetrics.wallNanos metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations + metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index cd3e0eafa0..3fd44456b1 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -136,6 +136,7 @@ object MetricsUtil extends Logging { var dataSourceAddSplitTime: Long = 0 var dataSourceReadTime: Long = 0 var numWrittenFiles: Long = 0 + var loadLazyVectorTime: Long = 0 val metricsIterator = operatorMetrics.iterator() while (metricsIterator.hasNext) { @@ -168,6 +169,7 @@ object MetricsUtil extends Logging { dataSourceAddSplitTime += metrics.dataSourceAddSplitTime dataSourceReadTime += metrics.dataSourceReadTime numWrittenFiles += metrics.numWrittenFiles + loadLazyVectorTime += metrics.loadLazyVectorTime } new OperatorMetrics( @@ -208,7 +210,8 @@ object MetricsUtil extends Logging { dataSourceReadTime, physicalWrittenBytes, writeIOTime, - numWrittenFiles + numWrittenFiles, + loadLazyVectorTime ) } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala index 2f55849889..f2a4dfff36 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala @@ -22,6 +22,8 @@ import org.apache.spark.sql.execution.metric.SQLMetric import java.util +import scala.collection.JavaConverters._ + class NestedLoopJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) extends JoinMetricsUpdaterBase(metrics) { @@ -45,6 +47,8 @@ class NestedLoopJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) val nestedLoopJoinProbeNumMemoryAllocations: SQLMetric = metrics( "nestedLoopJoinProbeNumMemoryAllocations") + val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime") + override protected def updateJoinMetricsInternal( joinMetrics: util.ArrayList[OperatorMetrics], joinParams: JoinParams): Unit = { @@ -69,5 +73,7 @@ class NestedLoopJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) nestedLoopJoinBuildWallNanos += nestedLoopJoinBuildMetrics.wallNanos nestedLoopJoinBuildPeakMemoryBytes += nestedLoopJoinBuildMetrics.peakMemoryBytes nestedLoopJoinBuildNumMemoryAllocations += nestedLoopJoinBuildMetrics.numMemoryAllocations + + loadLazyVectorTime += joinMetrics.asScala.last.loadLazyVectorTime } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala index cbd195bb80..e297bf356f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala @@ -33,6 +33,7 @@ class ProjectMetricsUpdater( metrics("wallNanos") += operatorMetrics.wallNanos metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations + metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime extraMetrics.foreach { case (name, metric) => name match { diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/SampleMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/SampleMetricsUpdater.scala index a108a5b797..e2f14577f8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/SampleMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/SampleMetricsUpdater.scala @@ -30,6 +30,7 @@ class SampleMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU metrics("wallNanos") += operatorMetrics.wallNanos metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations + metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala index cb01ae1510..57ed07a16c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala @@ -36,6 +36,7 @@ class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpd metrics("spilledRows") += operatorMetrics.spilledRows metrics("spilledPartitions") += operatorMetrics.spilledPartitions metrics("spilledFiles") += operatorMetrics.spilledFiles + metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime if (TaskResources.inSparkTask()) { SparkMetricsUtil.incMemoryBytesSpilled( TaskResources.getLocalTaskContext().taskMetrics(), diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/UnionMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/UnionMetricsUpdater.scala index 9e91cf368c..4a3fc961eb 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/UnionMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/UnionMetricsUpdater.scala @@ -18,6 +18,8 @@ package org.apache.gluten.metrics import org.apache.spark.sql.execution.metric.SQLMetric +import scala.collection.JavaConverters._ + class UnionMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater { override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { throw new UnsupportedOperationException() @@ -31,5 +33,6 @@ class UnionMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUp metrics("inputBytes") += localExchangeMetrics.inputBytes metrics("cpuCount") += localExchangeMetrics.cpuCount metrics("wallNanos") += localExchangeMetrics.wallNanos + metrics("loadLazyVectorTime") += unionMetrics.asScala.last.loadLazyVectorTime } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/WindowMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/WindowMetricsUpdater.scala index 4a098102b1..2f648bd448 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/WindowMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/WindowMetricsUpdater.scala @@ -34,6 +34,7 @@ class WindowMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU metrics("spilledRows") += operatorMetrics.spilledRows metrics("spilledPartitions") += operatorMetrics.spilledPartitions metrics("spilledFiles") += operatorMetrics.spilledFiles + metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala index 4a89691895..7dc0ca880c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala @@ -27,6 +27,7 @@ class WriteFilesMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metr metrics("writeIONanos") += operatorMetrics.writeIOTime metrics("wallNanos") += operatorMetrics.wallNanos metrics("numWrittenFiles") += operatorMetrics.numWrittenFiles + metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime } } } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 11e7ad15de..fe69708192 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -259,7 +259,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { env, metricsBuilderClass, "<init>", - "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); + "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); @@ -590,6 +590,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp longArray[Metrics::kPhysicalWrittenBytes], longArray[Metrics::kWriteIOTime], longArray[Metrics::kNumWrittenFiles], + longArray[Metrics::kLoadLazyVectorTime], metrics && metrics->stats.has_value() ? env->NewStringUTF(metrics->stats->c_str()) : nullptr); JNI_METHOD_END(nullptr) diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h index d3169eb69f..01d25182cd 100644 --- a/cpp/core/utils/Metrics.h +++ b/cpp/core/utils/Metrics.h @@ -88,6 +88,9 @@ struct Metrics { kWriteIOTime, kNumWrittenFiles, + // Load lazy vector. + kLoadLazyVectorTime, + // The end of enum items. kEnd, kNum = kEnd - kBegin diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 039e67bd4f..d2b130f629 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -250,8 +250,12 @@ std::shared_ptr<ColumnarBatch> WholeStageResultIterator::nextInternal() { if (numRows == 0) { return nullptr; } - for (auto& child : vector->children()) { - child->loadedVector(); + + { + ScopedTimer timer(&loadLazyVectorTime_); + for (auto& child : vector->children()) { + child->loadedVector(); + } } return std::make_shared<VeloxColumnarBatch>(vector); @@ -407,6 +411,8 @@ void WholeStageResultIterator::collectMetrics() { int metricIndex = 0; for (int idx = 0; idx < orderedNodeIds_.size(); idx++) { + metrics_->get(Metrics::kLoadLazyVectorTime)[metricIndex] = 0; + const auto& nodeId = orderedNodeIds_[idx]; if (planStats.find(nodeId) == planStats.end()) { // Special handing for Filter over Project case. Filter metrics are @@ -481,6 +487,9 @@ void WholeStageResultIterator::collectMetrics() { } } + // Put the loadLazyVector time into the metrics of the last operator. + metrics_->get(Metrics::kLoadLazyVectorTime)[orderedNodeIds_.size() - 1] = loadLazyVectorTime_; + // Populate the metrics with task stats for long running tasks. if (const int64_t collectTaskStatsThreshold = veloxCfg_->get<int64_t>(kTaskMetricsToEventLogThreshold, kTaskMetricsToEventLogThresholdDefault); diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index f24817ca8b..671016596b 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -141,6 +141,8 @@ class WholeStageResultIterator : public ColumnarBatchIterator { std::vector<facebook::velox::core::PlanNodeId> streamIds_; std::vector<std::vector<facebook::velox::exec::Split>> splits_; bool noMoreSplits_ = false; + + int64_t loadLazyVectorTime_ = 0; }; } // namespace gluten --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org