This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f42737465 [GLUTEN-9785][VL] Pass additional configuration options 
programmatically from Java to C++ (#11032)
5f42737465 is described below

commit 5f4273746545c6dcfe7b732b44b3e02b85e43ea2
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu Nov 13 16:57:40 2025 +0000

    [GLUTEN-9785][VL] Pass additional configuration options programmatically 
from Java to C++ (#11032)
---
 .../backendsapi/velox/VeloxIteratorApi.scala       | 16 +++++++--------
 cpp/core/compute/Runtime.h                         |  3 +--
 cpp/core/jni/JniWrapper.cc                         |  9 ++------
 cpp/velox/benchmarks/GenericBenchmark.cc           |  2 +-
 cpp/velox/compute/VeloxRuntime.cc                  |  5 ++---
 cpp/velox/compute/VeloxRuntime.h                   |  3 +--
 cpp/velox/compute/WholeStageResultIterator.cc      |  5 ++---
 cpp/velox/compute/WholeStageResultIterator.h       |  4 ++--
 cpp/velox/tests/RuntimeTest.cc                     |  5 ++---
 .../gluten/vectorized/NativePlanEvaluator.java     | 13 ++++++++----
 .../gluten/vectorized/PlanEvaluatorJniWrapper.java |  3 +--
 .../scala/org/apache/gluten/runtime/Runtime.scala  | 24 +++++++++++++++-------
 .../scala/org/apache/gluten/runtime/Runtimes.scala | 18 ++++++++++++----
 13 files changed, 62 insertions(+), 48 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 9bb670f10e..fdb859bfee 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi.velox
 
 import org.apache.gluten.backendsapi.{BackendsApiManager, IteratorApi}
 import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName
-import org.apache.gluten.config.VeloxConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
 import org.apache.gluten.execution._
 import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.metrics.{IMetrics, IteratorMetricsJniWrapper}
@@ -215,7 +215,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       new JArrayList[ColumnarBatchInIterator](inputIterators.map {
         iter => new ColumnarBatchInIterator(BackendsApiManager.getBackendName, 
iter.asJava)
       }.asJava)
-    val transKernel = 
NativePlanEvaluator.create(BackendsApiManager.getBackendName)
+
+    val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> 
enableCudf.toString).asJava
+    val transKernel = 
NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf)
 
     val splitInfoByteArray = inputPartition
       .asInstanceOf[GlutenPartition]
@@ -233,8 +235,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
         splitInfoByteArray,
         columnarNativeIterators,
         partitionIndex,
-        
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
-        enableCudf
+        
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
       )
     val itrMetrics = IteratorMetricsJniWrapper.create()
 
@@ -265,8 +266,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       partitionIndex: Int,
       materializeInput: Boolean,
       enableCudf: Boolean = false): Iterator[ColumnarBatch] = {
-
-    val transKernel = 
NativePlanEvaluator.create(BackendsApiManager.getBackendName)
+    val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> 
enableCudf.toString).asJava
+    val transKernel = 
NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf)
     val columnarNativeIterator =
       inputIterators.map {
         iter => new ColumnarBatchInIterator(BackendsApiManager.getBackendName, 
iter.asJava)
@@ -283,8 +284,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
         new Array[Array[Byte]](0),
         columnarNativeIterator.asJava,
         partitionIndex,
-        
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
-        enableCudf
+        
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
       )
     val itrMetrics = IteratorMetricsJniWrapper.create()
 
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 4eda64129a..c1f82a0c34 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -101,8 +101,7 @@ class Runtime : public 
std::enable_shared_from_this<Runtime> {
 
   virtual std::shared_ptr<ResultIterator> createResultIterator(
       const std::string& spillDir,
-      const std::vector<std::shared_ptr<ResultIterator>>& inputs,
-      const std::unordered_map<std::string, std::string>& sessionConf) {
+      const std::vector<std::shared_ptr<ResultIterator>>& inputs) {
     throw GlutenException("Not implemented");
   }
 
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 0420f5a6c3..5bc311f782 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -447,15 +447,10 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
     jint partitionId,
     jlong taskId,
     jboolean enableDumping,
-    jstring spillDir,
-    jboolean enableCudf) {
+    jstring spillDir) {
   JNI_METHOD_START
 
   auto ctx = getRuntime(env, wrapper);
-  auto conf = ctx->getConfMap();
-#ifdef GLUTEN_ENABLE_GPU
-  conf[kCudfEnabled] = std::to_string(enableCudf);
-#endif
 
   ctx->setSparkTaskInfo({stageId, partitionId, taskId});
 
@@ -489,7 +484,7 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
     inputIters.push_back(std::move(resultIter));
   }
 
-  return ctx->saveObject(ctx->createResultIterator(spillDirStr, inputIters, 
conf));
+  return ctx->saveObject(ctx->createResultIterator(spillDirStr, inputIters));
   JNI_METHOD_END(kInvalidObjectHandle)
 }
 
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 1559c5c39f..b3f2747a52 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -449,7 +449,7 @@ auto BM_Generic = [](::benchmark::State& state,
         runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()), 
split.size(), i);
       }
 
-      auto resultIter = runtime->createResultIterator(veloxSpillDir, 
std::move(inputIters), runtime->getConfMap());
+      auto resultIter = runtime->createResultIterator(veloxSpillDir, 
std::move(inputIters));
       listenerPtr->setIterator(resultIter.get());
 
       if (FLAGS_with_shuffle) {
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index bc858f85c6..c8290276a9 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -152,8 +152,7 @@ VeloxMemoryManager* VeloxRuntime::memoryManager() {
 
 std::shared_ptr<ResultIterator> VeloxRuntime::createResultIterator(
     const std::string& spillDir,
-    const std::vector<std::shared_ptr<ResultIterator>>& inputs,
-    const std::unordered_map<std::string, std::string>& sessionConf) {
+    const std::vector<std::shared_ptr<ResultIterator>>& inputs) {
   LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" << 
printConfig(confMap_);
 
   VeloxPlanConverter veloxPlanConverter(
@@ -182,7 +181,7 @@ std::shared_ptr<ResultIterator> 
VeloxRuntime::createResultIterator(
       scanInfos,
       streamIds,
       spillDir,
-      sessionConf,
+      veloxCfg_.get(),
       taskInfo_.has_value() ? taskInfo_.value() : SparkTaskInfo{});
   return std::make_shared<ResultIterator>(std::move(wholeStageIter), this);
 }
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 96e17e8980..59eb43028f 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -57,8 +57,7 @@ class VeloxRuntime final : public Runtime {
   // FIXME This is not thread-safe?
   std::shared_ptr<ResultIterator> createResultIterator(
       const std::string& spillDir,
-      const std::vector<std::shared_ptr<ResultIterator>>& inputs = {},
-      const std::unordered_map<std::string, std::string>& sessionConf = {}) 
override;
+      const std::vector<std::shared_ptr<ResultIterator>>& inputs = {}) 
override;
 
   std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t 
column2RowMemThreshold) override;
 
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index cdb115578a..16a865ac31 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -71,11 +71,10 @@ WholeStageResultIterator::WholeStageResultIterator(
     const std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
     const std::vector<facebook::velox::core::PlanNodeId>& streamIds,
     const std::string spillDir,
-    const std::unordered_map<std::string, std::string>& confMap,
+    const facebook::velox::config::ConfigBase* veloxCfg,
     const SparkTaskInfo& taskInfo)
     : memoryManager_(memoryManager),
-      veloxCfg_(
-          
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>(confMap))),
+      veloxCfg_(veloxCfg),
 #ifdef GLUTEN_ENABLE_GPU
       enableCudf_(veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)),
 #endif
diff --git a/cpp/velox/compute/WholeStageResultIterator.h 
b/cpp/velox/compute/WholeStageResultIterator.h
index 8bd8484a56..bca377cf19 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -42,7 +42,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator 
{
       const std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
       const std::vector<facebook::velox::core::PlanNodeId>& streamIds,
       const std::string spillDir,
-      const std::unordered_map<std::string, std::string>& confMap,
+      const facebook::velox::config::ConfigBase* veloxCfg,
       const SparkTaskInfo& taskInfo);
 
   virtual ~WholeStageResultIterator() {
@@ -113,7 +113,7 @@ class WholeStageResultIterator : public 
ColumnarBatchIterator {
   VeloxMemoryManager* memoryManager_;
 
   /// Config, task and plan.
-  std::shared_ptr<config::ConfigBase> veloxCfg_;
+  const config::ConfigBase* veloxCfg_;
 #ifdef GLUTEN_ENABLE_GPU
   const bool enableCudf_;
 #endif
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 2241ddcdaa..8847fed685 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -60,8 +60,7 @@ class DummyRuntime final : public Runtime {
 
   std::shared_ptr<ResultIterator> createResultIterator(
       const std::string& spillDir,
-      const std::vector<std::shared_ptr<ResultIterator>>& inputs,
-      const std::unordered_map<std::string, std::string>& sessionConf) 
override {
+      const std::vector<std::shared_ptr<ResultIterator>>& inputs) override {
     auto resIter = std::make_unique<DummyResultIterator>();
     auto iter = std::make_shared<ResultIterator>(std::move(resIter));
     return iter;
@@ -150,7 +149,7 @@ TEST(TestRuntime, CreateVeloxRuntime) {
 TEST(TestRuntime, GetResultIterator) {
   DummyMemoryManager mm(kDummyBackendKind);
   auto runtime = std::make_shared<DummyRuntime>(kDummyBackendKind, &mm, 
std::unordered_map<std::string, std::string>());
-  auto iter = runtime->createResultIterator("/tmp/test-spill", {}, {});
+  auto iter = runtime->createResultIterator("/tmp/test-spill", {});
   ASSERT_TRUE(iter->hasNext());
   auto next = iter->next();
   ASSERT_NE(next, nullptr);
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index 58ca86b8c3..813578566b 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class NativePlanEvaluator {
@@ -43,6 +44,12 @@ public class NativePlanEvaluator {
     this.jniWrapper = PlanEvaluatorJniWrapper.create(runtime);
   }
 
+  public static NativePlanEvaluator create(String backendName, Map<String, 
String> extraConf) {
+    return new NativePlanEvaluator(
+        Runtimes.contextInstance(
+            backendName, String.format("NativePlanEvaluator-%d", 
id.getAndIncrement()), extraConf));
+  }
+
   public static NativePlanEvaluator create(String backendName) {
     return new NativePlanEvaluator(
         Runtimes.contextInstance(
@@ -69,8 +76,7 @@ public class NativePlanEvaluator {
       byte[][] splitInfo,
       List<ColumnarBatchInIterator> iterList,
       int partitionIndex,
-      String spillDirPath,
-      boolean enableCudf)
+      String spillDirPath)
       throws RuntimeException {
     final long itrHandle =
         jniWrapper.nativeCreateKernelWithIterator(
@@ -81,8 +87,7 @@ public class NativePlanEvaluator {
             partitionIndex, // TaskContext.getPartitionId(),
             TaskContext.get().taskAttemptId(),
             DebugUtil.isDumpingEnabledForTask(),
-            spillDirPath,
-            enableCudf);
+            spillDirPath);
     final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle);
     runtime
         .memoryManager()
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
index 9ac1c0a62b..b8de4d63b5 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
@@ -74,7 +74,6 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware {
       int partitionId,
       long taskId,
       boolean enableDumping,
-      String spillDir,
-      boolean enableCudf)
+      String spillDir)
       throws RuntimeException;
 }
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala 
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
index f4fb2fa42c..bd5ff1c185 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
@@ -24,10 +24,11 @@ import org.apache.gluten.utils.ConfigUtil
 import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf}
 import org.apache.spark.task.TaskResource
 
-import org.slf4j.LoggerFactory
-
+import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 
+import scala.collection.JavaConverters._ // for 2.12
+
 trait Runtime {
   def memoryManager(): NativeMemoryManager
   def getHandle(): Long
@@ -35,19 +36,28 @@ trait Runtime {
 
 object Runtime {
   private[runtime] def apply(backendName: String, name: String): Runtime with 
TaskResource = {
-    new RuntimeImpl(backendName, name)
+    new RuntimeImpl(backendName, name, new util.HashMap[String, String]())
+  }
+
+  private[runtime] def apply(
+      backendName: String,
+      name: String,
+      extraConf: util.Map[String, String]): Runtime with TaskResource = {
+    new RuntimeImpl(backendName, name, extraConf)
   }
 
-  private class RuntimeImpl(backendName: String, name: String) extends Runtime 
with TaskResource {
-    private val LOGGER = LoggerFactory.getLogger(classOf[Runtime])
+  private class RuntimeImpl(backendName: String, name: String, extraConf: 
util.Map[String, String])
+    extends Runtime
+    with TaskResource {
 
     private val nmm: NativeMemoryManager = NativeMemoryManager(backendName, 
name)
     private val handle = RuntimeJniWrapper.createRuntime(
       backendName,
       nmm.getHandle(),
       ConfigUtil.serialize(
-        GlutenConfig
-          .getNativeSessionConf(backendName, 
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
+        (GlutenConfig
+          .getNativeSessionConf(backendName, 
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs))
+          .asScala ++ extraConf.asScala).asJava)
     )
 
     private val released: AtomicBoolean = new AtomicBoolean(false)
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala 
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
index f3e75757ab..b4c344ca98 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
@@ -18,15 +18,25 @@ package org.apache.gluten.runtime
 
 import org.apache.spark.task.TaskResources
 
+import java.util
+
 object Runtimes {
 
-  /** Get or create the runtime which bound with Spark TaskContext. */
-  def contextInstance(backendName: String, name: String): Runtime = {
+  def contextInstance(
+      backendName: String,
+      name: String,
+      extraConf: util.Map[String, String]): Runtime = {
     if (!TaskResources.inSparkTask()) {
       throw new IllegalStateException("This method must be called in a Spark 
task.")
     }
     TaskResources.addResourceIfNotRegistered(
-      s"$backendName:$name",
-      () => Runtime(backendName, name))
+      s"$backendName:$name:$extraConf",
+      () => Runtime(backendName, name, extraConf))
+  }
+
+  /** Get or create the runtime which bound with Spark TaskContext. */
+  def contextInstance(backendName: String, name: String): Runtime = {
+    contextInstance(backendName, name, new util.HashMap[String, String]())
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to