This is an automated email from the ASF dual-hosted git repository.
weitingchen pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/branch-1.2 by this push:
new 015af6090e [VL][1.2] Port #6573 #7025 #7132 (#7973)
015af6090e is described below
commit 015af6090ee9f8322c42841e8756acd28de252fb
Author: Wei-Ting Chen <[email protected]>
AuthorDate: Wed Nov 20 15:45:15 2024 +0800
[VL][1.2] Port #6573 #7025 #7132 (#7973)
* [GLUTEN-7024][VL] Skip call collectMetrics when the task does not call
next() (#7025)
* [GLUTEN-7130][CORE] Skip command execution when collect qe fallback
summary (#7132)
* [VL] Add config for show velox task metrics when finished (#6573)
---------
Co-authored-by: zhaokuo <[email protected]>
Co-authored-by: Zhen Wang <[email protected]>
Co-authored-by: Yang Zhang <[email protected]>
---
cpp/velox/compute/WholeStageResultIterator.cc | 13 ++++++++++---
cpp/velox/compute/WholeStageResultIterator.h | 4 +++-
cpp/velox/config/VeloxConfig.h | 3 +++
.../org/apache/spark/sql/execution/GlutenImplicits.scala | 2 +-
.../src/main/scala/org/apache/gluten/GlutenConfig.scala | 7 +++++++
5 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index c417202b9d..aa36315e37 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -311,15 +311,22 @@ void WholeStageResultIterator::collectMetrics() {
return;
}
- if (veloxCfg_->get<bool>(kDebugModeEnabled, false)) {
- auto planWithStats = velox::exec::printPlanWithStats(*veloxPlan_.get(),
task_->taskStats(), true);
+ const auto& taskStats = task_->taskStats();
+ if (taskStats.executionStartTimeMs == 0) {
+ LOG(INFO) << "Skip collect task metrics since task did not call next().";
+ return;
+ }
+
+ if (veloxCfg_->get<bool>(kDebugModeEnabled, false) ||
+ veloxCfg_->get<bool>(kShowTaskMetricsWhenFinished,
kShowTaskMetricsWhenFinishedDefault)) {
+ auto planWithStats = velox::exec::printPlanWithStats(*veloxPlan_.get(),
taskStats, true);
std::ostringstream oss;
oss << "Native Plan with stats for: " << taskInfo_;
oss << "\n" << planWithStats << std::endl;
LOG(INFO) << oss.str();
}
- auto planStats = velox::exec::toPlanStats(task_->taskStats());
+ auto planStats = velox::exec::toPlanStats(taskStats);
// Calculate the total number of metrics.
int statsNum = 0;
for (int idx = 0; idx < orderedNodeIds_.size(); idx++) {
diff --git a/cpp/velox/compute/WholeStageResultIterator.h
b/cpp/velox/compute/WholeStageResultIterator.h
index 5e661f4048..8d20ee6e9f 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -55,7 +55,9 @@ class WholeStageResultIterator : public ColumnarBatchIterator
{
Metrics* getMetrics(int64_t exportNanos) {
collectMetrics();
- metrics_->veloxToArrow = exportNanos;
+ if (metrics_) {
+ metrics_->veloxToArrow = exportNanos;
+ }
return metrics_.get();
}
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 7a96f03f49..65c7cb61d9 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -59,6 +59,9 @@ const std::string kBloomFilterNumBits =
"spark.gluten.sql.columnar.backend.velox
const std::string kBloomFilterMaxNumBits =
"spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits";
const std::string kVeloxSplitPreloadPerDriver =
"spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver";
+const std::string kShowTaskMetricsWhenFinished =
"spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished";
+const bool kShowTaskMetricsWhenFinishedDefault = false;
+
const std::string kEnableUserExceptionStacktrace =
"spark.gluten.sql.columnar.backend.velox.enableUserExceptionStacktrace";
const bool kEnableUserExceptionStacktraceDefault = true;
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index 2e2af6517d..55664df71d 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -176,7 +176,7 @@ object GlutenImplicits {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// AQE is not materialized, so the columnar rules are not applied.
// For this case, We apply columnar rules manually with disable AQE.
- val qe = spark.sessionState.executePlan(logicalPlan)
+ val qe = spark.sessionState.executePlan(logicalPlan,
CommandExecutionMode.SKIP)
processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
}
} else {
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index ef4618fca1..fd1a4907cd 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1685,6 +1685,13 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)
+ val COLUMNAR_VELOX_SHOW_TASK_METRICS_WHEN_FINISHED =
+
buildConf("spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished")
+ .internal()
+ .doc("Show velox full task metrics when finished.")
+ .booleanConf
+ .createWithDefault(false)
+
val COLUMNAR_VELOX_MEMORY_USE_HUGE_PAGES =
buildConf("spark.gluten.sql.columnar.backend.velox.memoryUseHugePages")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]