This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 20f2d2020b [GLUTEN-7243][VL] Fix hanging by cross-task spilling (#7479)
20f2d2020b is described below
commit 20f2d2020b9f22094ffdd71084012613fc0203c7
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Oct 14 09:42:55 2024 +0800
[GLUTEN-7243][VL] Fix hanging by cross-task spilling (#7479)
Closes #7243
---
.../org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala | 3 +--
cpp/core/compute/Runtime.cc | 7 +++++++
cpp/core/compute/Runtime.h | 4 +---
cpp/core/jni/JniWrapper.cc | 7 ++-----
cpp/velox/benchmarks/GenericBenchmark.cc | 2 +-
cpp/velox/compute/VeloxRuntime.cc | 7 ++-----
cpp/velox/compute/VeloxRuntime.h | 2 --
cpp/velox/tests/RuntimeTest.cc | 3 ---
.../java/org/apache/gluten/vectorized/NativePlanEvaluator.java | 9 ++++++---
.../org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java | 4 ++--
10 files changed, 22 insertions(+), 26 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 9871efd06f..061daaac0f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -165,8 +165,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
}
override def injectWriteFilesTempPath(path: String, fileName: String): Unit
= {
- val transKernel = NativePlanEvaluator.create()
- transKernel.injectWriteFilesTempPath(path)
+ NativePlanEvaluator.injectWriteFilesTempPath(path)
}
/** Generate Iterator[ColumnarBatch] for first stage. */
diff --git a/cpp/core/compute/Runtime.cc b/cpp/core/compute/Runtime.cc
index c6bae1e7bc..49565fac69 100644
--- a/cpp/core/compute/Runtime.cc
+++ b/cpp/core/compute/Runtime.cc
@@ -68,4 +68,11 @@ void Runtime::release(Runtime* runtime) {
delete runtime;
}
+std::optional<std::string>* Runtime::localWriteFilesTempPath() {
+ // This is thread-local to conform to Java side ColumnarWriteFilesExec's
design.
+ // FIXME: Pass the path through relevant member functions.
+ static thread_local std::optional<std::string> path;
+ return &path;
+}
+
} // namespace gluten
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 4aa67dfe7b..2901a22b0b 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -62,6 +62,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
std::unique_ptr<AllocationListener> listener,
const std::unordered_map<std::string, std::string>& sessionConf = {});
static void release(Runtime*);
+ static std::optional<std::string>* localWriteFilesTempPath();
Runtime(std::shared_ptr<MemoryManager> memoryManager, const
std::unordered_map<std::string, std::string>& confMap)
: memoryManager_(memoryManager), confMap_(confMap) {}
@@ -74,8 +75,6 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
virtual std::string planString(bool details, const
std::unordered_map<std::string, std::string>& sessionConf) = 0;
- virtual void injectWriteFilesTempPath(const std::string& path) = 0;
-
// Just for benchmark
::substrait::Plan& getPlan() {
return substraitPlan_;
@@ -134,7 +133,6 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
::substrait::Plan substraitPlan_;
std::vector<::substrait::ReadRel_LocalFiles> localFiles_;
- std::optional<std::string> writeFilesTempPath_;
SparkTaskInfo taskInfo_;
};
} // namespace gluten
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 3077e9ed4d..e6c87b7bd0 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -311,16 +311,13 @@ JNIEXPORT jstring JNICALL
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrap
JNIEXPORT void JNICALL
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_injectWriteFilesTempPath(
// NOLINT
JNIEnv* env,
- jobject wrapper,
+ jclass,
jbyteArray path) {
JNI_METHOD_START
-
auto len = env->GetArrayLength(path);
auto safeArray = gluten::getByteArrayElementsSafe(env, path);
std::string pathStr(reinterpret_cast<char*>(safeArray.elems()), len);
- auto ctx = gluten::getRuntime(env, wrapper);
- ctx->injectWriteFilesTempPath(pathStr);
-
+ *gluten::Runtime::localWriteFilesTempPath() = pathStr;
JNI_METHOD_END()
}
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 0c520b6bd3..ebbb0e0ea8 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -337,7 +337,7 @@ auto BM_Generic = [](::benchmark::State& state,
return static_cast<FileReaderIterator*>(iter->getInputIter());
});
}
- runtime->injectWriteFilesTempPath(FLAGS_write_path);
+ *Runtime::localWriteFilesTempPath() = FLAGS_write_path;
runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(),
std::nullopt);
for (auto& split : splits) {
runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()),
split.size(), std::nullopt);
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index e93dc2367e..398597f9b4 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -128,10 +128,6 @@ std::string VeloxRuntime::planString(bool details, const
std::unordered_map<std:
return veloxPlan->toString(details, true);
}
-void VeloxRuntime::injectWriteFilesTempPath(const std::string& path) {
- writeFilesTempPath_ = path;
-}
-
VeloxMemoryManager* VeloxRuntime::memoryManager() {
return vmm_;
}
@@ -142,7 +138,8 @@ std::shared_ptr<ResultIterator>
VeloxRuntime::createResultIterator(
const std::unordered_map<std::string, std::string>& sessionConf) {
LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" <<
printConfig(confMap_);
- VeloxPlanConverter veloxPlanConverter(inputs,
vmm_->getLeafMemoryPool().get(), sessionConf, writeFilesTempPath_);
+ VeloxPlanConverter veloxPlanConverter(
+ inputs, vmm_->getLeafMemoryPool().get(), sessionConf,
*localWriteFilesTempPath());
veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_,
std::move(localFiles_));
// Scan node can be required.
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 405bbae635..8101426859 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -78,8 +78,6 @@ class VeloxRuntime final : public Runtime {
std::string planString(bool details, const std::unordered_map<std::string,
std::string>& sessionConf) override;
- void injectWriteFilesTempPath(const std::string& path) override;
-
void dumpConf(const std::string& path) override;
std::shared_ptr<const facebook::velox::core::PlanNode> getVeloxPlan() {
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 0135b35dcd..e978f2eec7 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -91,9 +91,6 @@ class DummyRuntime final : public Runtime {
std::string planString(bool details, const std::unordered_map<std::string,
std::string>& sessionConf) override {
throw GlutenException("Not yet implemented");
}
- void injectWriteFilesTempPath(const std::string& path) override {
- throw GlutenException("Not yet implemented");
- }
void dumpConf(const std::string& path) override {
throw GlutenException("Not yet implemented");
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index ef9dbc457d..2a3d6013a9 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -28,10 +28,13 @@ import org.apache.spark.TaskContext;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
public class NativePlanEvaluator {
+ private static final AtomicInteger id = new AtomicInteger(0);
+ private final Runtime runtime =
+ Runtimes.contextInstance(String.format("NativePlanEvaluator-%d",
id.getAndIncrement()));
- private final Runtime runtime =
Runtimes.contextInstance("WholeStageIterator");
private final PlanEvaluatorJniWrapper jniWrapper;
private NativePlanEvaluator() {
@@ -46,8 +49,8 @@ public class NativePlanEvaluator {
return jniWrapper.nativeValidateWithFailureReason(subPlan);
}
- public void injectWriteFilesTempPath(String path) {
- jniWrapper.injectWriteFilesTempPath(path.getBytes(StandardCharsets.UTF_8));
+ public static void injectWriteFilesTempPath(String path) {
+
PlanEvaluatorJniWrapper.injectWriteFilesTempPath(path.getBytes(StandardCharsets.UTF_8));
}
// Used by WholeStageTransform to create the native computing pipeline and
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
index 915091a7db..f7e9a7b78f 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
@@ -41,6 +41,8 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware {
return runtime.getHandle();
}
+ public static native void injectWriteFilesTempPath(byte[] path);
+
/**
* Validate the Substrait plan in native compute engine.
*
@@ -51,8 +53,6 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware {
public native String nativePlanString(byte[] substraitPlan, Boolean details);
- public native void injectWriteFilesTempPath(byte[] path);
-
/**
* Create a native compute kernel and return a columnar result iterator.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]