This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5f42737465 [GLUTEN-9785][VL] Pass additional configuration options
programmatically from Java to C++ (#11032)
5f42737465 is described below
commit 5f4273746545c6dcfe7b732b44b3e02b85e43ea2
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu Nov 13 16:57:40 2025 +0000
[GLUTEN-9785][VL] Pass additional configuration options programmatically
from Java to C++ (#11032)
---
.../backendsapi/velox/VeloxIteratorApi.scala | 16 +++++++--------
cpp/core/compute/Runtime.h | 3 +--
cpp/core/jni/JniWrapper.cc | 9 ++------
cpp/velox/benchmarks/GenericBenchmark.cc | 2 +-
cpp/velox/compute/VeloxRuntime.cc | 5 ++---
cpp/velox/compute/VeloxRuntime.h | 3 +--
cpp/velox/compute/WholeStageResultIterator.cc | 5 ++---
cpp/velox/compute/WholeStageResultIterator.h | 4 ++--
cpp/velox/tests/RuntimeTest.cc | 5 ++---
.../gluten/vectorized/NativePlanEvaluator.java | 13 ++++++++----
.../gluten/vectorized/PlanEvaluatorJniWrapper.java | 3 +--
.../scala/org/apache/gluten/runtime/Runtime.scala | 24 +++++++++++++++-------
.../scala/org/apache/gluten/runtime/Runtimes.scala | 18 ++++++++++++----
13 files changed, 62 insertions(+), 48 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 9bb670f10e..fdb859bfee 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.backendsapi.{BackendsApiManager, IteratorApi}
import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName
-import org.apache.gluten.config.VeloxConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.gluten.execution._
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.metrics.{IMetrics, IteratorMetricsJniWrapper}
@@ -215,7 +215,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
new JArrayList[ColumnarBatchInIterator](inputIterators.map {
iter => new ColumnarBatchInIterator(BackendsApiManager.getBackendName,
iter.asJava)
}.asJava)
- val transKernel =
NativePlanEvaluator.create(BackendsApiManager.getBackendName)
+
+ val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key ->
enableCudf.toString).asJava
+ val transKernel =
NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf)
val splitInfoByteArray = inputPartition
.asInstanceOf[GlutenPartition]
@@ -233,8 +235,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
splitInfoByteArray,
columnarNativeIterators,
partitionIndex,
-
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
- enableCudf
+
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
)
val itrMetrics = IteratorMetricsJniWrapper.create()
@@ -265,8 +266,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partitionIndex: Int,
materializeInput: Boolean,
enableCudf: Boolean = false): Iterator[ColumnarBatch] = {
-
- val transKernel =
NativePlanEvaluator.create(BackendsApiManager.getBackendName)
+ val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key ->
enableCudf.toString).asJava
+ val transKernel =
NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf)
val columnarNativeIterator =
inputIterators.map {
iter => new ColumnarBatchInIterator(BackendsApiManager.getBackendName,
iter.asJava)
@@ -283,8 +284,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
new Array[Array[Byte]](0),
columnarNativeIterator.asJava,
partitionIndex,
-
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
- enableCudf
+
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
)
val itrMetrics = IteratorMetricsJniWrapper.create()
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 4eda64129a..c1f82a0c34 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -101,8 +101,7 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
virtual std::shared_ptr<ResultIterator> createResultIterator(
const std::string& spillDir,
- const std::vector<std::shared_ptr<ResultIterator>>& inputs,
- const std::unordered_map<std::string, std::string>& sessionConf) {
+ const std::vector<std::shared_ptr<ResultIterator>>& inputs) {
throw GlutenException("Not implemented");
}
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 0420f5a6c3..5bc311f782 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -447,15 +447,10 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
jint partitionId,
jlong taskId,
jboolean enableDumping,
- jstring spillDir,
- jboolean enableCudf) {
+ jstring spillDir) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
- auto conf = ctx->getConfMap();
-#ifdef GLUTEN_ENABLE_GPU
- conf[kCudfEnabled] = std::to_string(enableCudf);
-#endif
ctx->setSparkTaskInfo({stageId, partitionId, taskId});
@@ -489,7 +484,7 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
inputIters.push_back(std::move(resultIter));
}
- return ctx->saveObject(ctx->createResultIterator(spillDirStr, inputIters,
conf));
+ return ctx->saveObject(ctx->createResultIterator(spillDirStr, inputIters));
JNI_METHOD_END(kInvalidObjectHandle)
}
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 1559c5c39f..b3f2747a52 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -449,7 +449,7 @@ auto BM_Generic = [](::benchmark::State& state,
runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()),
split.size(), i);
}
- auto resultIter = runtime->createResultIterator(veloxSpillDir,
std::move(inputIters), runtime->getConfMap());
+ auto resultIter = runtime->createResultIterator(veloxSpillDir,
std::move(inputIters));
listenerPtr->setIterator(resultIter.get());
if (FLAGS_with_shuffle) {
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index bc858f85c6..c8290276a9 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -152,8 +152,7 @@ VeloxMemoryManager* VeloxRuntime::memoryManager() {
std::shared_ptr<ResultIterator> VeloxRuntime::createResultIterator(
const std::string& spillDir,
- const std::vector<std::shared_ptr<ResultIterator>>& inputs,
- const std::unordered_map<std::string, std::string>& sessionConf) {
+ const std::vector<std::shared_ptr<ResultIterator>>& inputs) {
LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" <<
printConfig(confMap_);
VeloxPlanConverter veloxPlanConverter(
@@ -182,7 +181,7 @@ std::shared_ptr<ResultIterator>
VeloxRuntime::createResultIterator(
scanInfos,
streamIds,
spillDir,
- sessionConf,
+ veloxCfg_.get(),
taskInfo_.has_value() ? taskInfo_.value() : SparkTaskInfo{});
return std::make_shared<ResultIterator>(std::move(wholeStageIter), this);
}
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 96e17e8980..59eb43028f 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -57,8 +57,7 @@ class VeloxRuntime final : public Runtime {
// FIXME This is not thread-safe?
std::shared_ptr<ResultIterator> createResultIterator(
const std::string& spillDir,
- const std::vector<std::shared_ptr<ResultIterator>>& inputs = {},
- const std::unordered_map<std::string, std::string>& sessionConf = {})
override;
+ const std::vector<std::shared_ptr<ResultIterator>>& inputs = {})
override;
std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t
column2RowMemThreshold) override;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index cdb115578a..16a865ac31 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -71,11 +71,10 @@ WholeStageResultIterator::WholeStageResultIterator(
const std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
const std::vector<facebook::velox::core::PlanNodeId>& streamIds,
const std::string spillDir,
- const std::unordered_map<std::string, std::string>& confMap,
+ const facebook::velox::config::ConfigBase* veloxCfg,
const SparkTaskInfo& taskInfo)
: memoryManager_(memoryManager),
- veloxCfg_(
-
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>(confMap))),
+ veloxCfg_(veloxCfg),
#ifdef GLUTEN_ENABLE_GPU
enableCudf_(veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)),
#endif
diff --git a/cpp/velox/compute/WholeStageResultIterator.h
b/cpp/velox/compute/WholeStageResultIterator.h
index 8bd8484a56..bca377cf19 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -42,7 +42,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator
{
const std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
const std::vector<facebook::velox::core::PlanNodeId>& streamIds,
const std::string spillDir,
- const std::unordered_map<std::string, std::string>& confMap,
+ const facebook::velox::config::ConfigBase* veloxCfg,
const SparkTaskInfo& taskInfo);
virtual ~WholeStageResultIterator() {
@@ -113,7 +113,7 @@ class WholeStageResultIterator : public
ColumnarBatchIterator {
VeloxMemoryManager* memoryManager_;
/// Config, task and plan.
- std::shared_ptr<config::ConfigBase> veloxCfg_;
+ const config::ConfigBase* veloxCfg_;
#ifdef GLUTEN_ENABLE_GPU
const bool enableCudf_;
#endif
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 2241ddcdaa..8847fed685 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -60,8 +60,7 @@ class DummyRuntime final : public Runtime {
std::shared_ptr<ResultIterator> createResultIterator(
const std::string& spillDir,
- const std::vector<std::shared_ptr<ResultIterator>>& inputs,
- const std::unordered_map<std::string, std::string>& sessionConf)
override {
+ const std::vector<std::shared_ptr<ResultIterator>>& inputs) override {
auto resIter = std::make_unique<DummyResultIterator>();
auto iter = std::make_shared<ResultIterator>(std::move(resIter));
return iter;
@@ -150,7 +149,7 @@ TEST(TestRuntime, CreateVeloxRuntime) {
TEST(TestRuntime, GetResultIterator) {
DummyMemoryManager mm(kDummyBackendKind);
auto runtime = std::make_shared<DummyRuntime>(kDummyBackendKind, &mm,
std::unordered_map<std::string, std::string>());
- auto iter = runtime->createResultIterator("/tmp/test-spill", {}, {});
+ auto iter = runtime->createResultIterator("/tmp/test-spill", {});
ASSERT_TRUE(iter->hasNext());
auto next = iter->next();
ASSERT_NE(next, nullptr);
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index 58ca86b8c3..813578566b 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class NativePlanEvaluator {
@@ -43,6 +44,12 @@ public class NativePlanEvaluator {
this.jniWrapper = PlanEvaluatorJniWrapper.create(runtime);
}
+ public static NativePlanEvaluator create(String backendName, Map<String,
String> extraConf) {
+ return new NativePlanEvaluator(
+ Runtimes.contextInstance(
+ backendName, String.format("NativePlanEvaluator-%d",
id.getAndIncrement()), extraConf));
+ }
+
public static NativePlanEvaluator create(String backendName) {
return new NativePlanEvaluator(
Runtimes.contextInstance(
@@ -69,8 +76,7 @@ public class NativePlanEvaluator {
byte[][] splitInfo,
List<ColumnarBatchInIterator> iterList,
int partitionIndex,
- String spillDirPath,
- boolean enableCudf)
+ String spillDirPath)
throws RuntimeException {
final long itrHandle =
jniWrapper.nativeCreateKernelWithIterator(
@@ -81,8 +87,7 @@ public class NativePlanEvaluator {
partitionIndex, // TaskContext.getPartitionId(),
TaskContext.get().taskAttemptId(),
DebugUtil.isDumpingEnabledForTask(),
- spillDirPath,
- enableCudf);
+ spillDirPath);
final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle);
runtime
.memoryManager()
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
index 9ac1c0a62b..b8de4d63b5 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
@@ -74,7 +74,6 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware {
int partitionId,
long taskId,
boolean enableDumping,
- String spillDir,
- boolean enableCudf)
+ String spillDir)
throws RuntimeException;
}
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
index f4fb2fa42c..bd5ff1c185 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
@@ -24,10 +24,11 @@ import org.apache.gluten.utils.ConfigUtil
import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf}
import org.apache.spark.task.TaskResource
-import org.slf4j.LoggerFactory
-
+import java.util
import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.JavaConverters._ // for 2.12
+
trait Runtime {
def memoryManager(): NativeMemoryManager
def getHandle(): Long
@@ -35,19 +36,28 @@ trait Runtime {
object Runtime {
private[runtime] def apply(backendName: String, name: String): Runtime with
TaskResource = {
- new RuntimeImpl(backendName, name)
+ new RuntimeImpl(backendName, name, new util.HashMap[String, String]())
+ }
+
+ private[runtime] def apply(
+ backendName: String,
+ name: String,
+ extraConf: util.Map[String, String]): Runtime with TaskResource = {
+ new RuntimeImpl(backendName, name, extraConf)
}
- private class RuntimeImpl(backendName: String, name: String) extends Runtime
with TaskResource {
- private val LOGGER = LoggerFactory.getLogger(classOf[Runtime])
+ private class RuntimeImpl(backendName: String, name: String, extraConf:
util.Map[String, String])
+ extends Runtime
+ with TaskResource {
private val nmm: NativeMemoryManager = NativeMemoryManager(backendName,
name)
private val handle = RuntimeJniWrapper.createRuntime(
backendName,
nmm.getHandle(),
ConfigUtil.serialize(
- GlutenConfig
- .getNativeSessionConf(backendName,
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
+ (GlutenConfig
+ .getNativeSessionConf(backendName,
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs))
+ .asScala ++ extraConf.asScala).asJava)
)
private val released: AtomicBoolean = new AtomicBoolean(false)
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
index f3e75757ab..b4c344ca98 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
@@ -18,15 +18,25 @@ package org.apache.gluten.runtime
import org.apache.spark.task.TaskResources
+import java.util
+
object Runtimes {
- /** Get or create the runtime which bound with Spark TaskContext. */
- def contextInstance(backendName: String, name: String): Runtime = {
+ def contextInstance(
+ backendName: String,
+ name: String,
+ extraConf: util.Map[String, String]): Runtime = {
if (!TaskResources.inSparkTask()) {
throw new IllegalStateException("This method must be called in a Spark
task.")
}
TaskResources.addResourceIfNotRegistered(
- s"$backendName:$name",
- () => Runtime(backendName, name))
+ s"$backendName:$name:$extraConf",
+ () => Runtime(backendName, name, extraConf))
+ }
+
+ /** Get or create the runtime which bound with Spark TaskContext. */
+ def contextInstance(backendName: String, name: String): Runtime = {
+ contextInstance(backendName, name, new util.HashMap[String, String]())
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]