This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 617d69b84b [GLUTEN-10683][VL] Add lock to restrict single task for one
GPU (#10684)
617d69b84b is described below
commit 617d69b84bf5b5793b65bf8d215ddb9445f22c24
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu Sep 18 10:38:32 2025 +0100
[GLUTEN-10683][VL] Add lock to restrict single task for one GPU (#10684)
---
.../gluten/extension/CudfNodeValidationRule.scala | 20 ++++--------
cpp/core/jni/JniWrapper.cc | 4 +--
cpp/velox/compute/WholeStageResultIterator.cc | 37 ++++++++++++++++++----
cpp/velox/compute/WholeStageResultIterator.h | 14 ++++++++
4 files changed, 53 insertions(+), 22 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
index aeb21dd049..9b63fb2c5b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
@@ -17,8 +17,7 @@
package org.apache.gluten.extension
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.cudf.VeloxCudfPlanValidatorJniWrapper
-import org.apache.gluten.execution.{CudfTag, LeafTransformSupport,
TransformSupport, WholeStageTransformer}
+import org.apache.gluten.execution.{CudfTag, LeafTransformSupport,
WholeStageTransformer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
@@ -32,17 +31,12 @@ case class CudfNodeValidationRule(glutenConf: GlutenConfig)
extends Rule[SparkPl
}
plan.transformUp {
case transformer: WholeStageTransformer =>
- if (
- VeloxCudfPlanValidatorJniWrapper.validate(
- transformer.substraitPlan.toProtobuf.toByteArray)
- ) {
- transformer.foreach {
- case _: LeafTransformSupport =>
- case t: TransformSupport =>
- t.setTagValue(CudfTag.CudfTag, true)
- case _ =>
- }
- }
+ // Spark3.2 does not have exists
+ val hasLeaf = transformer.find {
+ case _: LeafTransformSupport => true
+ case _ => false
+ }.isDefined
+ transformer.setTagValue(CudfTag.CudfTag, !hasLeaf)
transformer
}
}
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 0d047b9f63..543bc31597 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -454,9 +454,7 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
auto ctx = getRuntime(env, wrapper);
auto conf = ctx->getConfMap();
#ifdef GLUTEN_ENABLE_GPU
- if (enableCudf) {
- conf[kCudfEnabled] = "true";
- }
+ conf[kCudfEnabled] = std::to_string(enableCudf);
#endif
ctx->setSparkTaskInfo({stageId, partitionId, taskId});
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 7e3af2a3fe..b70676a62e 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -22,6 +22,7 @@
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/PlanNodeStats.h"
#ifdef GLUTEN_ENABLE_GPU
+#include <mutex>
#include "velox/experimental/cudf/exec/ToCudf.h"
#endif
@@ -70,6 +71,9 @@ WholeStageResultIterator::WholeStageResultIterator(
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>(confMap))),
taskInfo_(taskInfo),
veloxPlan_(planNode),
+#ifdef GLUTEN_ENABLE_GPU
+ lock_(mutex_, std::defer_lock),
+#endif
scanNodeIds_(scanNodeIds),
scanInfos_(scanInfos),
streamIds_(streamIds) {
@@ -80,6 +84,13 @@ WholeStageResultIterator::WholeStageResultIterator(
}
getOrderedNodeIds(veloxPlan_, orderedNodeIds_);
+#ifdef GLUTEN_ENABLE_GPU
+ enableCudf_ = veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault);
+ if (enableCudf_) {
+ lock_.lock();
+ }
+#endif
+
// Create task instance.
std::unordered_set<velox::core::PlanNodeId> emptySet;
velox::core::PlanFragment planFragment{planNode,
velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
@@ -176,6 +187,10 @@ WholeStageResultIterator::WholeStageResultIterator(
}
}
+#ifdef GLUTEN_ENABLE_GPU
+std::mutex WholeStageResultIterator::mutex_;
+#endif
+
std::shared_ptr<velox::core::QueryCtx>
WholeStageResultIterator::createNewVeloxQueryCtx() {
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>>
connectorConfigs;
connectorConfigs[kHiveConnectorId] = createConnectorConfig();
@@ -195,6 +210,17 @@ std::shared_ptr<velox::core::QueryCtx>
WholeStageResultIterator::createNewVeloxQ
}
std::shared_ptr<ColumnarBatch> WholeStageResultIterator::next() {
+ auto result = nextInternal();
+#ifdef GLUTEN_ENABLE_GPU
+ if (result == nullptr && enableCudf_) {
+ lock_.unlock();
+ }
+#endif
+
+ return result;
+}
+
+std::shared_ptr<ColumnarBatch> WholeStageResultIterator::nextInternal() {
tryAddSplitsToTask();
if (task_->isFinished()) {
return nullptr;
@@ -349,9 +375,11 @@ void WholeStageResultIterator::collectMetrics() {
veloxCfg_->get<bool>(kShowTaskMetricsWhenFinished,
kShowTaskMetricsWhenFinishedDefault)) {
auto planWithStats = velox::exec::printPlanWithStats(*veloxPlan_.get(),
taskStats, true);
std::ostringstream oss;
- oss << "Native Plan with stats for: " << taskInfo_;
+ oss << "Native Plan with stats for: " << taskInfo_ << "\n";
+ oss << "TaskStats: totalTime: " << taskStats.executionEndTimeMs -
taskStats.executionStartTimeMs
+ << "; startTime: " << taskStats.executionStartTimeMs << "; endTime: "
<< taskStats.executionEndTimeMs;
oss << "\n" << planWithStats << std::endl;
- LOG(INFO) << oss.str();
+ LOG(WARNING) << oss.str();
}
auto planStats = velox::exec::toPlanStats(taskStats);
@@ -591,10 +619,7 @@ std::unordered_map<std::string, std::string>
WholeStageResultIterator::getQueryC
std::to_string(veloxCfg_->get<bool>(kSparkJsonIgnoreNullFields, true));
#ifdef GLUTEN_ENABLE_GPU
- if (veloxCfg_->get<bool>(kCudfEnabled, false)) {
- // TODO: wait for PR
https://github.com/facebookincubator/velox/pull/13341
- // configs[cudf_velox::kCudfEnabled] = "false";
- }
+ configs[cudf_velox::kCudfEnabled] =
std::to_string(veloxCfg_->get<bool>(kCudfEnabled, false));
#endif
const auto setIfExists = [&](const std::string& glutenKey, const
std::string& veloxKey) {
diff --git a/cpp/velox/compute/WholeStageResultIterator.h
b/cpp/velox/compute/WholeStageResultIterator.h
index d0dd47bfd1..f24817ca8b 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -47,6 +47,11 @@ class WholeStageResultIterator : public
ColumnarBatchIterator {
// calling .wait() may take no effect in single thread execution mode
task_->requestCancel().wait();
}
+#ifdef GLUTEN_ENABLE_GPU
+ if (enableCudf_ && lock_.owns_lock()) {
+ lock_.unlock();
+ }
+#endif
}
std::shared_ptr<ColumnarBatch> next() override;
@@ -70,6 +75,8 @@ class WholeStageResultIterator : public ColumnarBatchIterator
{
}
private:
+ std::shared_ptr<ColumnarBatch> nextInternal();
+
/// Get the Spark confs to Velox query context.
std::unordered_map<std::string, std::string> getQueryContextConf();
@@ -117,6 +124,13 @@ class WholeStageResultIterator : public
ColumnarBatchIterator {
/// Metrics
std::unique_ptr<Metrics> metrics_{};
+#ifdef GLUTEN_ENABLE_GPU
+ // Mutex for thread safety.
+ static std::mutex mutex_;
+ std::unique_lock<std::mutex> lock_;
+ bool enableCudf_;
+#endif
+
/// All the children plan node ids with postorder traversal.
std::vector<facebook::velox::core::PlanNodeId> orderedNodeIds_;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]