This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 617d69b84b [GLUTEN-10683][VL] Add lock to restrict single task for one 
GPU (#10684)
617d69b84b is described below

commit 617d69b84bf5b5793b65bf8d215ddb9445f22c24
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu Sep 18 10:38:32 2025 +0100

    [GLUTEN-10683][VL] Add lock to restrict single task for one GPU (#10684)
---
 .../gluten/extension/CudfNodeValidationRule.scala  | 20 ++++--------
 cpp/core/jni/JniWrapper.cc                         |  4 +--
 cpp/velox/compute/WholeStageResultIterator.cc      | 37 ++++++++++++++++++----
 cpp/velox/compute/WholeStageResultIterator.h       | 14 ++++++++
 4 files changed, 53 insertions(+), 22 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
index aeb21dd049..9b63fb2c5b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
@@ -17,8 +17,7 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.cudf.VeloxCudfPlanValidatorJniWrapper
-import org.apache.gluten.execution.{CudfTag, LeafTransformSupport, 
TransformSupport, WholeStageTransformer}
+import org.apache.gluten.execution.{CudfTag, LeafTransformSupport, 
WholeStageTransformer}
 
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkPlan
@@ -32,17 +31,12 @@ case class CudfNodeValidationRule(glutenConf: GlutenConfig) 
extends Rule[SparkPl
     }
     plan.transformUp {
       case transformer: WholeStageTransformer =>
-        if (
-          VeloxCudfPlanValidatorJniWrapper.validate(
-            transformer.substraitPlan.toProtobuf.toByteArray)
-        ) {
-          transformer.foreach {
-            case _: LeafTransformSupport =>
-            case t: TransformSupport =>
-              t.setTagValue(CudfTag.CudfTag, true)
-            case _ =>
-          }
-        }
+        // Spark3.2 does not have exists
+        val hasLeaf = transformer.find {
+          case _: LeafTransformSupport => true
+          case _ => false
+        }.isDefined
+        transformer.setTagValue(CudfTag.CudfTag, !hasLeaf)
         transformer
     }
   }
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 0d047b9f63..543bc31597 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -454,9 +454,7 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
   auto ctx = getRuntime(env, wrapper);
   auto conf = ctx->getConfMap();
 #ifdef GLUTEN_ENABLE_GPU
-  if (enableCudf) {
-    conf[kCudfEnabled] = "true";
-  }
+  conf[kCudfEnabled] = std::to_string(enableCudf);
 #endif
 
   ctx->setSparkTaskInfo({stageId, partitionId, taskId});
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 7e3af2a3fe..b70676a62e 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -22,6 +22,7 @@
 #include "velox/connectors/hive/HiveConnectorSplit.h"
 #include "velox/exec/PlanNodeStats.h"
 #ifdef GLUTEN_ENABLE_GPU
+#include <mutex>
 #include "velox/experimental/cudf/exec/ToCudf.h"
 #endif
 
@@ -70,6 +71,9 @@ WholeStageResultIterator::WholeStageResultIterator(
           
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>(confMap))),
       taskInfo_(taskInfo),
       veloxPlan_(planNode),
+#ifdef GLUTEN_ENABLE_GPU
+      lock_(mutex_, std::defer_lock),
+#endif
       scanNodeIds_(scanNodeIds),
       scanInfos_(scanInfos),
       streamIds_(streamIds) {
@@ -80,6 +84,13 @@ WholeStageResultIterator::WholeStageResultIterator(
   }
   getOrderedNodeIds(veloxPlan_, orderedNodeIds_);
 
+#ifdef GLUTEN_ENABLE_GPU
+  enableCudf_ = veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault);
+  if (enableCudf_) {
+    lock_.lock();
+  }
+#endif
+
   // Create task instance.
   std::unordered_set<velox::core::PlanNodeId> emptySet;
   velox::core::PlanFragment planFragment{planNode, 
velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
@@ -176,6 +187,10 @@ WholeStageResultIterator::WholeStageResultIterator(
   }
 }
 
+#ifdef GLUTEN_ENABLE_GPU
+std::mutex WholeStageResultIterator::mutex_;
+#endif
+
 std::shared_ptr<velox::core::QueryCtx> 
WholeStageResultIterator::createNewVeloxQueryCtx() {
   std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>> 
connectorConfigs;
   connectorConfigs[kHiveConnectorId] = createConnectorConfig();
@@ -195,6 +210,17 @@ std::shared_ptr<velox::core::QueryCtx> 
WholeStageResultIterator::createNewVeloxQ
 }
 
 std::shared_ptr<ColumnarBatch> WholeStageResultIterator::next() {
+  auto result = nextInternal();
+#ifdef GLUTEN_ENABLE_GPU
+  if (result == nullptr && enableCudf_) {
+    lock_.unlock();
+  }
+#endif
+
+  return result;
+}
+
+std::shared_ptr<ColumnarBatch> WholeStageResultIterator::nextInternal() {
   tryAddSplitsToTask();
   if (task_->isFinished()) {
     return nullptr;
@@ -349,9 +375,11 @@ void WholeStageResultIterator::collectMetrics() {
       veloxCfg_->get<bool>(kShowTaskMetricsWhenFinished, 
kShowTaskMetricsWhenFinishedDefault)) {
     auto planWithStats = velox::exec::printPlanWithStats(*veloxPlan_.get(), 
taskStats, true);
     std::ostringstream oss;
-    oss << "Native Plan with stats for: " << taskInfo_;
+    oss << "Native Plan with stats for: " << taskInfo_ << "\n";
+    oss << "TaskStats: totalTime: " << taskStats.executionEndTimeMs - 
taskStats.executionStartTimeMs
+        << "; startTime: " << taskStats.executionStartTimeMs << "; endTime: " 
<< taskStats.executionEndTimeMs;
     oss << "\n" << planWithStats << std::endl;
-    LOG(INFO) << oss.str();
+    LOG(WARNING) << oss.str();
   }
 
   auto planStats = velox::exec::toPlanStats(taskStats);
@@ -591,10 +619,7 @@ std::unordered_map<std::string, std::string> 
WholeStageResultIterator::getQueryC
         std::to_string(veloxCfg_->get<bool>(kSparkJsonIgnoreNullFields, true));
 
 #ifdef GLUTEN_ENABLE_GPU
-    if (veloxCfg_->get<bool>(kCudfEnabled, false)) {
-      // TODO: wait for PR 
https://github.com/facebookincubator/velox/pull/13341
-      // configs[cudf_velox::kCudfEnabled] = "false";
-    }
+    configs[cudf_velox::kCudfEnabled] = 
std::to_string(veloxCfg_->get<bool>(kCudfEnabled, false));
 #endif
 
     const auto setIfExists = [&](const std::string& glutenKey, const 
std::string& veloxKey) {
diff --git a/cpp/velox/compute/WholeStageResultIterator.h 
b/cpp/velox/compute/WholeStageResultIterator.h
index d0dd47bfd1..f24817ca8b 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -47,6 +47,11 @@ class WholeStageResultIterator : public 
ColumnarBatchIterator {
       // calling .wait() may take no effect in single thread execution mode
       task_->requestCancel().wait();
     }
+#ifdef GLUTEN_ENABLE_GPU
+    if (enableCudf_ && lock_.owns_lock()) {
+      lock_.unlock();
+    }
+#endif
   }
 
   std::shared_ptr<ColumnarBatch> next() override;
@@ -70,6 +75,8 @@ class WholeStageResultIterator : public ColumnarBatchIterator 
{
   }
 
  private:
+  std::shared_ptr<ColumnarBatch> nextInternal();
+
   /// Get the Spark confs to Velox query context.
   std::unordered_map<std::string, std::string> getQueryContextConf();
 
@@ -117,6 +124,13 @@ class WholeStageResultIterator : public 
ColumnarBatchIterator {
   /// Metrics
   std::unique_ptr<Metrics> metrics_{};
 
+#ifdef GLUTEN_ENABLE_GPU
+  // Mutex for thread safety.
+  static std::mutex mutex_;
+  std::unique_lock<std::mutex> lock_;
+  bool enableCudf_;
+#endif
+
   /// All the children plan node ids with postorder traversal.
   std::vector<facebook::velox::core::PlanNodeId> orderedNodeIds_;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to