This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new fbe5350c34 [GLUTEN-8851][VL] cuDF: Validate the plan before execution 
(#10889)
fbe5350c34 is described below

commit fbe5350c34d3c566a363c3684f68d31b26314cdc
Author: Jin Chengcheng <[email protected]>
AuthorDate: Mon Oct 20 14:39:42 2025 +0100

    [GLUTEN-8851][VL] cuDF: Validate the plan before execution (#10889)
    
    Initializing task in Task constructor has been merged, so we can get the 
operator information from task, if validate successfully, set the whole stage 
transformer cudf tag to true to enable session config kCudfEnabled to true, and 
set the children operator CUDF tag to true to update the name in Spark UI with 
"Cudf".
---
 .../scala/org/apache/gluten/config/VeloxConfig.scala | 10 ++++++++++
 .../gluten/extension/CudfNodeValidationRule.scala    | 20 ++++++++++++++++++--
 cpp/velox/cudf/CudfPlanValidator.cc                  | 17 +++++++++++++----
 docs/velox-configuration.md                          |  7 ++++---
 4 files changed, 45 insertions(+), 9 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala 
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 093186f569..f68a23c350 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -78,6 +78,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
 
   def cudfEnableTableScan: Boolean = getConf(CUDF_ENABLE_TABLE_SCAN)
 
+  def cudfEnableValidation: Boolean = getConf(CUDF_ENABLE_VALIDATION)
+
   def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES)
 
   def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES)
@@ -624,6 +626,14 @@ object VeloxConfig extends ConfigRegistry {
       .booleanConf
       .createWithDefault(false)
 
+  val CUDF_ENABLE_VALIDATION =
+    
buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.enableValidation")
+      .doc(
+        "Heuristics you can apply to validate a cuDF/GPU plan and only offload 
when " +
+          "the entire stage can be fully and profitably executed on GPU")
+      .booleanConf
+      .createWithDefault(true)
+
   val MEMORY_DUMP_ON_EXIT =
     buildConf("spark.gluten.monitor.memoryDumpOnExit")
       .internal()
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
index 20e819e215..a092b984c8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
@@ -17,7 +17,8 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
-import org.apache.gluten.execution.{CudfTag, LeafTransformSupport, 
WholeStageTransformer}
+import org.apache.gluten.cudf.VeloxCudfPlanValidatorJniWrapper
+import org.apache.gluten.execution.{CudfTag, LeafTransformSupport, 
TransformSupport, WholeStageTransformer}
 
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkPlan
@@ -37,7 +38,22 @@ case class CudfNodeValidationRule(glutenConf: GlutenConfig) 
extends Rule[SparkPl
             case _: LeafTransformSupport => true
             case _ => false
           }.isDefined
-          transformer.setTagValue(CudfTag.CudfTag, !hasLeaf)
+          if (!hasLeaf && VeloxConfig.get.cudfEnableValidation) {
+            if (
+              VeloxCudfPlanValidatorJniWrapper.validate(
+                transformer.substraitPlan.toProtobuf.toByteArray)
+            ) {
+              transformer.foreach {
+                case _: LeafTransformSupport =>
+                case t: TransformSupport =>
+                  t.setTagValue(CudfTag.CudfTag, true)
+                case _ =>
+              }
+              transformer.setTagValue(CudfTag.CudfTag, true)
+            }
+          } else {
+            transformer.setTagValue(CudfTag.CudfTag, !hasLeaf)
+          }
         } else {
           transformer.setTagValue(CudfTag.CudfTag, true)
         }
diff --git a/cpp/velox/cudf/CudfPlanValidator.cc 
b/cpp/velox/cudf/CudfPlanValidator.cc
index cad8aa5afe..49949ca100 100644
--- a/cpp/velox/cudf/CudfPlanValidator.cc
+++ b/cpp/velox/cudf/CudfPlanValidator.cc
@@ -24,11 +24,21 @@
 #include "velox/core/PlanNode.h"
 #include "velox/exec/Task.h"
 #include "velox/exec/TableScan.h"
+#include "velox/experimental/cudf/exec/NvtxHelper.h"
 #include "velox/experimental/cudf/exec/ToCudf.h"
 
 using namespace facebook;
 
 namespace gluten {
+
+namespace {
+
+bool isCudfOperator(const exec::Operator* op) {
+  return dynamic_cast<const velox::cudf_velox::NvtxHelper*>(op) != nullptr;
+}
+
+}
+
 bool CudfPlanValidator::validate(const ::substrait::Plan& substraitPlan) {
   auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool();
   std::vector<::substrait::ReadRel_LocalFiles> localFiles;
@@ -64,10 +74,9 @@ bool CudfPlanValidator::validate(const ::substrait::Plan& 
substraitPlan) {
     if (dynamic_cast<const velox::exec::TableScan*>(op) != nullptr) {
       continue;
     }
-    // TODO: wait for PR https://github.com/facebookincubator/velox/pull/13341
-    // if (cudf_velox::isCudfOperator(op)) {
-    //   continue;
-    // }
+    if (isCudfOperator(op)) {
+      continue;
+    }
     if (dynamic_cast<const ValueStream*>(op) != nullptr) {
       continue;
     }
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index 44996a5cd4..fd7cd5cdca 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -9,7 +9,7 @@ nav_order: 16
 
 ## Gluten Velox backend configurations
 
-| Key                                                                          
    | Default           | Description                                           
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|                                       Key                                    
    |      Default      |                                                       
                                                                                
                                                                               
Description                                                                     
                                                                                
               [...]
 
|----------------------------------------------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | spark.gluten.sql.columnar.backend.velox.IOThreads                            
    | &lt;undefined&gt; | The Size of the IO thread pool in the Connector. This 
thread pool is used for split preloading and DirectBufferedInput. By default, 
the value is the same as the maximum task slots per Spark executor.             
                                                                                
                                                                                
                [...]
 | spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver                
    | 2                 | The split preload per task                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
@@ -23,6 +23,7 @@ nav_order: 16
 | spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct                  
    | 0                 | Set prefetch cache min pct for velox file scan        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.checkUsageLeak                       
    | true              | Enable check memory usage leak.                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan                 
    | false             | Enable cudf table scan                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation                
    | true              | Heuristics you can apply to validate a cuDF/GPU plan 
and only offload when the entire stage can be fully and profitably executed on 
GPU                                                                             
                                                                                
                                                                                
                [...]
 | spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent                   
    | 50                | The initial percent of GPU memory to allocate for 
memory resource for one thread.                                                 
                                                                                
                                                                                
                                                                                
                  [...]
 | spark.gluten.sql.columnar.backend.velox.cudf.memoryResource                  
    | async             | GPU RMM memory resource.                              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.directorySizeGuess                   
    | 32KB              | Deprecated, rename to 
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                     
                                                                                
                                                                                
                                                                                
                                              [...]
@@ -48,8 +49,8 @@ nav_order: 16
 | 
spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks   
 | true              | Whether to allow memory capacity transfer between memory 
pools from different tasks.                                                     
                                                                                
                                                                                
                                                                                
           [...]
 | spark.gluten.sql.columnar.backend.velox.memoryUseHugePages                   
    | false             | Use explicit huge pages for Velox memory allocation.  
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.orc.scan.enabled                     
    | true              | Enable velox orc scan. If disabled, vanilla spark orc 
scan will be used.                                                              
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames                    
    | true              | Maps table field names to file field names using 
names, not indices for ORC files. If this is set to false Gluten will fallback 
to vanilla Spark if it does not support all column types present in any of the 
schemas of the tables being read, at this time unsupported types include 
TimestampNTZ and Char.                                                          
                            [...]
-| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames                
    | true              | Maps table field names to file field names using 
names, not indices for Parquet files. If this is set to false Gluten will 
fallback to vanilla Spark if it does not support all column types present in 
any of the schemas of the tables being read, at this time unsupported types 
include TimestampNTZ and Char.                                                  
                                [...]
+| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames                    
    | true              | Maps table field names to file field names using 
names, not indices for ORC files.                                               
                                                                                
                                                                                
                                                                                
                   [...]
+| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames                
    | true              | Maps table field names to file field names using 
names, not indices for Parquet files.                                           
                                                                                
                                                                                
                                                                                
                   [...]
 | spark.gluten.sql.columnar.backend.velox.prefetchRowGroups                    
    | 1                 | Set the prefetch row groups for velox file scan       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.queryTraceEnabled                    
    | false             | Enable query tracing flag.                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs                     
    | 3600000ms         | The max time in ms to wait for memory reclaim.        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to