This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new fbe5350c34 [GLUTEN-8851][VL] cuDF: Validate the plan before execution
(#10889)
fbe5350c34 is described below
commit fbe5350c34d3c566a363c3684f68d31b26314cdc
Author: Jin Chengcheng <[email protected]>
AuthorDate: Mon Oct 20 14:39:42 2025 +0100
[GLUTEN-8851][VL] cuDF: Validate the plan before execution (#10889)
Initializing task in Task constructor has been merged, so we can get the
operator information from task, if validate successfully, set the whole stage
transformer cudf tag to true to enable session config kCudfEnabled to true, and
set the children operator CUDF tag to true to update the name in Spark UI with
"Cudf".
---
.../scala/org/apache/gluten/config/VeloxConfig.scala | 10 ++++++++++
.../gluten/extension/CudfNodeValidationRule.scala | 20 ++++++++++++++++++--
cpp/velox/cudf/CudfPlanValidator.cc | 17 +++++++++++++----
docs/velox-configuration.md | 7 ++++---
4 files changed, 45 insertions(+), 9 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 093186f569..f68a23c350 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -78,6 +78,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def cudfEnableTableScan: Boolean = getConf(CUDF_ENABLE_TABLE_SCAN)
+ def cudfEnableValidation: Boolean = getConf(CUDF_ENABLE_VALIDATION)
+
def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES)
def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES)
@@ -624,6 +626,14 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(false)
+ val CUDF_ENABLE_VALIDATION =
+
buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.enableValidation")
+ .doc(
+ "Heuristics you can apply to validate a cuDF/GPU plan and only offload
when " +
+ "the entire stage can be fully and profitably executed on GPU")
+ .booleanConf
+ .createWithDefault(true)
+
val MEMORY_DUMP_ON_EXIT =
buildConf("spark.gluten.monitor.memoryDumpOnExit")
.internal()
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
index 20e819e215..a092b984c8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
@@ -17,7 +17,8 @@
package org.apache.gluten.extension
import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
-import org.apache.gluten.execution.{CudfTag, LeafTransformSupport,
WholeStageTransformer}
+import org.apache.gluten.cudf.VeloxCudfPlanValidatorJniWrapper
+import org.apache.gluten.execution.{CudfTag, LeafTransformSupport,
TransformSupport, WholeStageTransformer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
@@ -37,7 +38,22 @@ case class CudfNodeValidationRule(glutenConf: GlutenConfig)
extends Rule[SparkPl
case _: LeafTransformSupport => true
case _ => false
}.isDefined
- transformer.setTagValue(CudfTag.CudfTag, !hasLeaf)
+ if (!hasLeaf && VeloxConfig.get.cudfEnableValidation) {
+ if (
+ VeloxCudfPlanValidatorJniWrapper.validate(
+ transformer.substraitPlan.toProtobuf.toByteArray)
+ ) {
+ transformer.foreach {
+ case _: LeafTransformSupport =>
+ case t: TransformSupport =>
+ t.setTagValue(CudfTag.CudfTag, true)
+ case _ =>
+ }
+ transformer.setTagValue(CudfTag.CudfTag, true)
+ }
+ } else {
+ transformer.setTagValue(CudfTag.CudfTag, !hasLeaf)
+ }
} else {
transformer.setTagValue(CudfTag.CudfTag, true)
}
diff --git a/cpp/velox/cudf/CudfPlanValidator.cc
b/cpp/velox/cudf/CudfPlanValidator.cc
index cad8aa5afe..49949ca100 100644
--- a/cpp/velox/cudf/CudfPlanValidator.cc
+++ b/cpp/velox/cudf/CudfPlanValidator.cc
@@ -24,11 +24,21 @@
#include "velox/core/PlanNode.h"
#include "velox/exec/Task.h"
#include "velox/exec/TableScan.h"
+#include "velox/experimental/cudf/exec/NvtxHelper.h"
#include "velox/experimental/cudf/exec/ToCudf.h"
using namespace facebook;
namespace gluten {
+
+namespace {
+
+bool isCudfOperator(const exec::Operator* op) {
+ return dynamic_cast<const velox::cudf_velox::NvtxHelper*>(op) != nullptr;
+}
+
+}
+
bool CudfPlanValidator::validate(const ::substrait::Plan& substraitPlan) {
auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool();
std::vector<::substrait::ReadRel_LocalFiles> localFiles;
@@ -64,10 +74,9 @@ bool CudfPlanValidator::validate(const ::substrait::Plan&
substraitPlan) {
if (dynamic_cast<const velox::exec::TableScan*>(op) != nullptr) {
continue;
}
- // TODO: wait for PR https://github.com/facebookincubator/velox/pull/13341
- // if (cudf_velox::isCudfOperator(op)) {
- // continue;
- // }
+ if (isCudfOperator(op)) {
+ continue;
+ }
if (dynamic_cast<const ValueStream*>(op) != nullptr) {
continue;
}
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index 44996a5cd4..fd7cd5cdca 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -9,7 +9,7 @@ nav_order: 16
## Gluten Velox backend configurations
-| Key
| Default | Description
[...]
+| Key
| Default |
Description
[...]
|----------------------------------------------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| spark.gluten.sql.columnar.backend.velox.IOThreads
| <undefined> | The Size of the IO thread pool in the Connector. This
thread pool is used for split preloading and DirectBufferedInput. By default,
the value is the same as the maximum task slots per Spark executor.
[...]
| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver
| 2 | The split preload per task
[...]
@@ -23,6 +23,7 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct
| 0 | Set prefetch cache min pct for velox file scan
[...]
| spark.gluten.sql.columnar.backend.velox.checkUsageLeak
| true | Enable check memory usage leak.
[...]
| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan
| false | Enable cudf table scan
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation
| true | Heuristics you can apply to validate a cuDF/GPU plan
and only offload when the entire stage can be fully and profitably executed on
GPU
[...]
| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent
| 50 | The initial percent of GPU memory to allocate for
memory resource for one thread.
[...]
| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource
| async | GPU RMM memory resource.
[...]
| spark.gluten.sql.columnar.backend.velox.directorySizeGuess
| 32KB | Deprecated, rename to
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize
[...]
@@ -48,8 +49,8 @@ nav_order: 16
|
spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks
| true | Whether to allow memory capacity transfer between memory
pools from different tasks.
[...]
| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages
| false | Use explicit huge pages for Velox memory allocation.
[...]
| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled
| true | Enable velox orc scan. If disabled, vanilla spark orc
scan will be used.
[...]
-| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames
| true | Maps table field names to file field names using
names, not indices for ORC files. If this is set to false Gluten will fallback
to vanilla Spark if it does not support all column types present in any of the
schemas of the tables being read, at this time unsupported types include
TimestampNTZ and Char.
[...]
-| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames
| true | Maps table field names to file field names using
names, not indices for Parquet files. If this is set to false Gluten will
fallback to vanilla Spark if it does not support all column types present in
any of the schemas of the tables being read, at this time unsupported types
include TimestampNTZ and Char.
[...]
+| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames
| true | Maps table field names to file field names using
names, not indices for ORC files.
[...]
+| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames
| true | Maps table field names to file field names using
names, not indices for Parquet files.
[...]
| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups
| 1 | Set the prefetch row groups for velox file scan
[...]
| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled
| false | Enable query tracing flag.
[...]
| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs
| 3600000ms | The max time in ms to wait for memory reclaim.
[...]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]