This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 20f2d2020b [GLUTEN-7243][VL] Fix hanging by cross-task spilling (#7479)
20f2d2020b is described below

commit 20f2d2020b9f22094ffdd71084012613fc0203c7
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Oct 14 09:42:55 2024 +0800

    [GLUTEN-7243][VL] Fix hanging by cross-task spilling (#7479)
    
    Closes #7243
---
 .../org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala   | 3 +--
 cpp/core/compute/Runtime.cc                                      | 7 +++++++
 cpp/core/compute/Runtime.h                                       | 4 +---
 cpp/core/jni/JniWrapper.cc                                       | 7 ++-----
 cpp/velox/benchmarks/GenericBenchmark.cc                         | 2 +-
 cpp/velox/compute/VeloxRuntime.cc                                | 7 ++-----
 cpp/velox/compute/VeloxRuntime.h                                 | 2 --
 cpp/velox/tests/RuntimeTest.cc                                   | 3 ---
 .../java/org/apache/gluten/vectorized/NativePlanEvaluator.java   | 9 ++++++---
 .../org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java    | 4 ++--
 10 files changed, 22 insertions(+), 26 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 9871efd06f..061daaac0f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -165,8 +165,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
   }
 
   override def injectWriteFilesTempPath(path: String, fileName: String): Unit 
= {
-    val transKernel = NativePlanEvaluator.create()
-    transKernel.injectWriteFilesTempPath(path)
+    NativePlanEvaluator.injectWriteFilesTempPath(path)
   }
 
   /** Generate Iterator[ColumnarBatch] for first stage. */
diff --git a/cpp/core/compute/Runtime.cc b/cpp/core/compute/Runtime.cc
index c6bae1e7bc..49565fac69 100644
--- a/cpp/core/compute/Runtime.cc
+++ b/cpp/core/compute/Runtime.cc
@@ -68,4 +68,11 @@ void Runtime::release(Runtime* runtime) {
   delete runtime;
 }
 
+std::optional<std::string>* Runtime::localWriteFilesTempPath() {
+  // This is thread-local to conform to Java side ColumnarWriteFilesExec's 
design.
+  // FIXME: Pass the path through relevant member functions.
+  static thread_local std::optional<std::string> path;
+  return &path;
+}
+
 } // namespace gluten
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 4aa67dfe7b..2901a22b0b 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -62,6 +62,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
       std::unique_ptr<AllocationListener> listener,
       const std::unordered_map<std::string, std::string>& sessionConf = {});
   static void release(Runtime*);
+  static std::optional<std::string>* localWriteFilesTempPath();
 
   Runtime(std::shared_ptr<MemoryManager> memoryManager, const 
std::unordered_map<std::string, std::string>& confMap)
       : memoryManager_(memoryManager), confMap_(confMap) {}
@@ -74,8 +75,6 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
 
   virtual std::string planString(bool details, const 
std::unordered_map<std::string, std::string>& sessionConf) = 0;
 
-  virtual void injectWriteFilesTempPath(const std::string& path) = 0;
-
   // Just for benchmark
   ::substrait::Plan& getPlan() {
     return substraitPlan_;
@@ -134,7 +133,6 @@ class Runtime : public 
std::enable_shared_from_this<Runtime> {
 
   ::substrait::Plan substraitPlan_;
   std::vector<::substrait::ReadRel_LocalFiles> localFiles_;
-  std::optional<std::string> writeFilesTempPath_;
   SparkTaskInfo taskInfo_;
 };
 } // namespace gluten
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 3077e9ed4d..e6c87b7bd0 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -311,16 +311,13 @@ JNIEXPORT jstring JNICALL 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrap
 
 JNIEXPORT void JNICALL 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_injectWriteFilesTempPath(
 // NOLINT
     JNIEnv* env,
-    jobject wrapper,
+    jclass,
     jbyteArray path) {
   JNI_METHOD_START
-
   auto len = env->GetArrayLength(path);
   auto safeArray = gluten::getByteArrayElementsSafe(env, path);
   std::string pathStr(reinterpret_cast<char*>(safeArray.elems()), len);
-  auto ctx = gluten::getRuntime(env, wrapper);
-  ctx->injectWriteFilesTempPath(pathStr);
-
+  *gluten::Runtime::localWriteFilesTempPath() = pathStr;
   JNI_METHOD_END()
 }
 
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 0c520b6bd3..ebbb0e0ea8 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -337,7 +337,7 @@ auto BM_Generic = [](::benchmark::State& state,
               return static_cast<FileReaderIterator*>(iter->getInputIter());
             });
       }
-      runtime->injectWriteFilesTempPath(FLAGS_write_path);
+      *Runtime::localWriteFilesTempPath() = FLAGS_write_path;
       runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(), 
std::nullopt);
       for (auto& split : splits) {
         runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()), 
split.size(), std::nullopt);
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index e93dc2367e..398597f9b4 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -128,10 +128,6 @@ std::string VeloxRuntime::planString(bool details, const 
std::unordered_map<std:
   return veloxPlan->toString(details, true);
 }
 
-void VeloxRuntime::injectWriteFilesTempPath(const std::string& path) {
-  writeFilesTempPath_ = path;
-}
-
 VeloxMemoryManager* VeloxRuntime::memoryManager() {
   return vmm_;
 }
@@ -142,7 +138,8 @@ std::shared_ptr<ResultIterator> 
VeloxRuntime::createResultIterator(
     const std::unordered_map<std::string, std::string>& sessionConf) {
   LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" << 
printConfig(confMap_);
 
-  VeloxPlanConverter veloxPlanConverter(inputs, 
vmm_->getLeafMemoryPool().get(), sessionConf, writeFilesTempPath_);
+  VeloxPlanConverter veloxPlanConverter(
+      inputs, vmm_->getLeafMemoryPool().get(), sessionConf, 
*localWriteFilesTempPath());
   veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, 
std::move(localFiles_));
 
   // Scan node can be required.
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 405bbae635..8101426859 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -78,8 +78,6 @@ class VeloxRuntime final : public Runtime {
 
   std::string planString(bool details, const std::unordered_map<std::string, 
std::string>& sessionConf) override;
 
-  void injectWriteFilesTempPath(const std::string& path) override;
-
   void dumpConf(const std::string& path) override;
 
   std::shared_ptr<const facebook::velox::core::PlanNode> getVeloxPlan() {
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 0135b35dcd..e978f2eec7 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -91,9 +91,6 @@ class DummyRuntime final : public Runtime {
   std::string planString(bool details, const std::unordered_map<std::string, 
std::string>& sessionConf) override {
     throw GlutenException("Not yet implemented");
   }
-  void injectWriteFilesTempPath(const std::string& path) override {
-    throw GlutenException("Not yet implemented");
-  }
 
   void dumpConf(const std::string& path) override {
     throw GlutenException("Not yet implemented");
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index ef9dbc457d..2a3d6013a9 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -28,10 +28,13 @@ import org.apache.spark.TaskContext;
 
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class NativePlanEvaluator {
+  private static final AtomicInteger id = new AtomicInteger(0);
+  private final Runtime runtime =
+      Runtimes.contextInstance(String.format("NativePlanEvaluator-%d", 
id.getAndIncrement()));
 
-  private final Runtime runtime = 
Runtimes.contextInstance("WholeStageIterator");
   private final PlanEvaluatorJniWrapper jniWrapper;
 
   private NativePlanEvaluator() {
@@ -46,8 +49,8 @@ public class NativePlanEvaluator {
     return jniWrapper.nativeValidateWithFailureReason(subPlan);
   }
 
-  public void injectWriteFilesTempPath(String path) {
-    jniWrapper.injectWriteFilesTempPath(path.getBytes(StandardCharsets.UTF_8));
+  public static void injectWriteFilesTempPath(String path) {
+    
PlanEvaluatorJniWrapper.injectWriteFilesTempPath(path.getBytes(StandardCharsets.UTF_8));
   }
 
   // Used by WholeStageTransform to create the native computing pipeline and
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
index 915091a7db..f7e9a7b78f 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
@@ -41,6 +41,8 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware {
     return runtime.getHandle();
   }
 
+  public static native void injectWriteFilesTempPath(byte[] path);
+
   /**
    * Validate the Substrait plan in native compute engine.
    *
@@ -51,8 +53,6 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware {
 
   public native String nativePlanString(byte[] substraitPlan, Boolean details);
 
-  public native void injectWriteFilesTempPath(byte[] path);
-
   /**
    * Create a native compute kernel and return a columnar result iterator.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to