This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1e6d973b8 [GLUTEN-6053][CH] Move collect native metrics from last
hasNext to close and cancel (#6069)
1e6d973b8 is described below
commit 1e6d973b8408c003d32a96a859972b3a16d54090
Author: Wenzheng Liu <[email protected]>
AuthorDate: Tue Jun 18 14:30:13 2024 +0800
[GLUTEN-6053][CH] Move collect native metrics from last hasNext to close
and cancel (#6069)
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 128 ++++++++++-----------
.../metrics/GlutenClickHouseTPCHMetricsSuite.scala | 35 +++++-
2 files changed, 93 insertions(+), 70 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 5f13b96a3..941237629 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -25,10 +25,11 @@ import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils.LogLevelUtil
-import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator,
CloseableCHColumnBatchIterator, GeneralInIterator}
+import org.apache.gluten.vectorized.{BatchIterator,
CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator, GeneralInIterator}
import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
import org.apache.spark.affinity.CHAffinity
+import org.apache.spark.executor.InputMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
@@ -209,46 +210,26 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
val splitInfoByteArray = inputPartition
.asInstanceOf[GlutenPartition]
.splitInfosByteArray
- val resIter =
+ val nativeIter =
transKernel.createKernelWithBatchIterator(
inputPartition.plan,
splitInfoByteArray,
inBatchIters,
false)
+ val iter = new CollectMetricIterator(
+ nativeIter,
+ updateNativeMetrics,
+ updateInputMetrics,
+ context.taskMetrics().inputMetrics)
+
context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
- resIter.cancel()
+ iter.cancel()
}
})
- context.addTaskCompletionListener[Unit](_ => resIter.close())
- val iter = new Iterator[Any] {
- private val inputMetrics = context.taskMetrics().inputMetrics
- private var outputRowCount = 0L
- private var outputVectorCount = 0L
- private var metricsUpdated = false
-
- override def hasNext: Boolean = {
- val res = resIter.hasNext
- // avoid to collect native metrics more than once, 'hasNext' is a
idempotent operation
- if (!res && !metricsUpdated) {
- val nativeMetrics = resIter.getMetrics.asInstanceOf[NativeMetrics]
- nativeMetrics.setFinalOutputMetrics(outputRowCount,
outputVectorCount)
- updateNativeMetrics(nativeMetrics)
- updateInputMetrics(inputMetrics)
- metricsUpdated = true
- }
- res
- }
-
- override def next(): Any = {
- val cb = resIter.next()
- outputVectorCount += 1
- outputRowCount += cb.numRows()
- cb
- }
- }
+ context.addTaskCompletionListener[Unit](_ => iter.close())
// TODO: SPARK-25083 remove the type erasure hack in data source scan
new InterruptibleIterator(
@@ -288,51 +269,16 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
materializeInput
)
- val resIter = new Iterator[ColumnarBatch] {
- private var outputRowCount = 0L
- private var outputVectorCount = 0L
- private var metricsUpdated = false
-
- override def hasNext: Boolean = {
- val res = nativeIterator.hasNext
- // avoid to collect native metrics more than once, 'hasNext' is a
idempotent operation
- if (!res && !metricsUpdated) {
- val nativeMetrics =
nativeIterator.getMetrics.asInstanceOf[NativeMetrics]
- nativeMetrics.setFinalOutputMetrics(outputRowCount,
outputVectorCount)
- updateNativeMetrics(nativeMetrics)
- metricsUpdated = true
- }
- res
- }
-
- override def next(): ColumnarBatch = {
- val cb = nativeIterator.next()
- outputVectorCount += 1
- outputRowCount += cb.numRows()
- cb
- }
- }
- var closed = false
- val cancelled = false
-
- def close(): Unit = {
- closed = true
- nativeIterator.close()
- // relationHolder.clear()
- }
-
- def cancel(): Unit = {
- nativeIterator.cancel()
- }
+ val iter = new CollectMetricIterator(nativeIterator, updateNativeMetrics,
null, null)
context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
- cancel()
+ iter.cancel()
}
})
- context.addTaskCompletionListener[Unit](_ => close())
- new CloseableCHColumnBatchIterator(resIter, Some(pipelineTime))
+ context.addTaskCompletionListener[Unit](_ => iter.close())
+ new CloseableCHColumnBatchIterator(iter, Some(pipelineTime))
}
}
@@ -346,3 +292,47 @@ object CHIteratorApi {
}
}
}
+
+class CollectMetricIterator(
+ val nativeIterator: BatchIterator,
+ val updateNativeMetrics: IMetrics => Unit,
+ val updateInputMetrics: InputMetricsWrapper => Unit,
+ val inputMetrics: InputMetrics
+) extends Iterator[ColumnarBatch] {
+ private var outputRowCount = 0L
+ private var outputVectorCount = 0L
+ private var metricsUpdated = false
+
+ override def hasNext: Boolean = {
+ nativeIterator.hasNext
+ }
+
+ override def next(): ColumnarBatch = {
+ val cb = nativeIterator.next()
+ outputVectorCount += 1
+ outputRowCount += cb.numRows()
+ cb
+ }
+
+ def close(): Unit = {
+ collectStageMetrics()
+ nativeIterator.close()
+ }
+
+ def cancel(): Unit = {
+ collectStageMetrics()
+ nativeIterator.cancel()
+ }
+
+ private def collectStageMetrics(): Unit = {
+ if (!metricsUpdated) {
+ val nativeMetrics = nativeIterator.getMetrics.asInstanceOf[NativeMetrics]
+ nativeMetrics.setFinalOutputMetrics(outputRowCount, outputVectorCount)
+ updateNativeMetrics(nativeMetrics)
+ if (updateInputMetrics != null) {
+ updateInputMetrics(inputMetrics)
+ }
+ metricsUpdated = true
+ }
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
index 1484d4653..09fa3ff10 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.InputIteratorTransformer
import scala.collection.JavaConverters._
class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite {
-
+ private val parquetMaxBlockSize = 4096;
override protected val needCopyParquetToTablePath = true
override protected val tablesPath: String = basePath + "/tpch-data"
@@ -38,6 +38,7 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
protected val metricsJsonFilePath: String = rootPath + "metrics-json"
protected val substraitPlansDatPath: String = rootPath + "substrait-plans"
+ // scalastyle:off line.size.limit
/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
super.sparkConf
@@ -45,10 +46,15 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "1")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"DEBUG")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size",
+ s"$parquetMaxBlockSize")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_streaming_aggregating",
"true")
}
+ // scalastyle:on line.size.limit
override protected def createTPCHNotNullTables(): Unit = {
createNotNullTPCHTablesInParquet(tablesPath)
@@ -76,6 +82,33 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
}
}
+ test("test simple limit query scan metrics") {
+ val sql = "select * from nation limit 5"
+ runSql(sql) {
+ df =>
+ val plans = df.queryExecution.executedPlan.collect {
+ case scanExec: BasicScanExecTransformer => scanExec
+ }
+ assert(plans.size == 1)
+ assert(plans.head.metrics("numOutputRows").value === 25)
+ assert(plans.head.metrics("outputVectors").value === 1)
+ assert(plans.head.metrics("outputBytes").value > 0)
+ }
+
+ val sql2 = "select * from lineitem limit 3"
+ runSql(sql2) {
+ df =>
+ val plans = df.queryExecution.executedPlan.collect {
+ case scanExec: BasicScanExecTransformer => scanExec
+ }
+ assert(plans.size == 1)
+ // 1 block keep in SubstraitFileStep, and 4 blocks keep in other steps
+ assert(plans.head.metrics("numOutputRows").value === 5 *
parquetMaxBlockSize)
+ assert(plans.head.metrics("outputVectors").value === 1)
+ assert(plans.head.metrics("outputBytes").value > 0)
+ }
+ }
+
test("test Generate metrics") {
val sql =
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]