This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 75ddcfc3f chore: Making comet native operators write spill files to 
spark local dir (#1581)
75ddcfc3f is described below

commit 75ddcfc3fc4393fa2b97ac90b7fb398f5833bf8f
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Sat Apr 5 02:17:52 2025 +0800

    chore: Making comet native operators write spill files to spark local dir 
(#1581)
    
    * Use spark local dirs in comet
    
    * Add unit test
---
 native/core/src/execution/jni_api.rs               | 22 ++++++++++--
 .../scala/org/apache/comet/CometExecIterator.scala |  2 ++
 spark/src/main/scala/org/apache/comet/Native.scala |  1 +
 .../scala/org/apache/comet/CometNativeSuite.scala  | 40 ++++++++++++++++++++--
 4 files changed, 60 insertions(+), 5 deletions(-)

diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index dd4d031b9..0779bae9a 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -38,6 +38,7 @@ use jni::{
     sys::{jbyteArray, jint, jlong, jlongArray},
     JNIEnv,
 };
+use std::path::PathBuf;
 use std::time::{Duration, Instant};
 use std::{collections::HashMap, sync::Arc, task::Poll};
 
@@ -167,6 +168,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
     metrics_node: JObject,
     metrics_update_interval: jlong,
     comet_task_memory_manager_obj: JObject,
+    local_dirs: jobjectArray,
     batch_size: jint,
     off_heap_mode: jboolean,
     memory_pool_type: jstring,
@@ -208,6 +210,8 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
             let input_source = Arc::new(jni_new_global_ref!(env, 
input_source)?);
             input_sources.push(input_source);
         }
+
+        // Create DataFusion memory pool
         let task_memory_manager =
             Arc::new(jni_new_global_ref!(env, comet_task_memory_manager_obj)?);
 
@@ -221,10 +225,21 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
         let memory_pool =
             create_memory_pool(&memory_pool_config, task_memory_manager, 
task_attempt_id);
 
+        // Get local directories for storing spill files
+        let local_dirs_array = JObjectArray::from_raw(local_dirs);
+        let num_local_dirs = env.get_array_length(&local_dirs_array)?;
+        let mut local_dirs = vec![];
+        for i in 0..num_local_dirs {
+            let local_dir: JString = 
env.get_object_array_element(&local_dirs_array, i)?.into();
+            let local_dir = env.get_string(&local_dir)?;
+            local_dirs.push(local_dir.into());
+        }
+
         // We need to keep the session context alive. Some session state like 
temporary
         // dictionaries are stored in session context. If it is dropped, the 
temporary
         // dictionaries will be dropped as well.
-        let session = prepare_datafusion_session_context(batch_size as usize, 
memory_pool)?;
+        let session =
+            prepare_datafusion_session_context(batch_size as usize, 
memory_pool, local_dirs)?;
 
         let plan_creation_time = start.elapsed();
 
@@ -262,8 +277,11 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
 fn prepare_datafusion_session_context(
     batch_size: usize,
     memory_pool: Arc<dyn MemoryPool>,
+    local_dirs: Vec<String>,
 ) -> CometResult<SessionContext> {
-    let mut rt_config = 
RuntimeEnvBuilder::new().with_disk_manager(DiskManagerConfig::NewOs);
+    let disk_manager_config =
+        
DiskManagerConfig::NewSpecified(local_dirs.into_iter().map(PathBuf::from).collect());
+    let mut rt_config = 
RuntimeEnvBuilder::new().with_disk_manager(disk_manager_config);
     rt_config = rt_config.with_memory_pool(memory_pool);
 
     // Get Datafusion configuration from Spark Execution context
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index b9bb3b2b9..f409c79e3 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -64,6 +64,7 @@ class CometExecIterator(
   }.toArray
   private val plan = {
     val conf = SparkEnv.get.conf
+    val localDiskDirs = SparkEnv.get.blockManager.getLocalDiskDirs
 
     val offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf)
     val memoryLimit = if (offHeapMode) {
@@ -83,6 +84,7 @@ class CometExecIterator(
       nativeMetrics,
       metricsUpdateInterval = COMET_METRICS_UPDATE_INTERVAL.get(),
       new CometTaskMemoryManager(id),
+      localDiskDirs,
       batchSize = COMET_BATCH_SIZE.get(),
       offHeapMode,
       memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),
diff --git a/spark/src/main/scala/org/apache/comet/Native.scala 
b/spark/src/main/scala/org/apache/comet/Native.scala
index e466b2f4d..3a88622da 100644
--- a/spark/src/main/scala/org/apache/comet/Native.scala
+++ b/spark/src/main/scala/org/apache/comet/Native.scala
@@ -58,6 +58,7 @@ class Native extends NativeBase {
       metrics: CometMetricNode,
       metricsUpdateInterval: Long,
       taskMemoryManager: CometTaskMemoryManager,
+      localDirs: Array[String],
       batchSize: Int,
       offHeapMode: Boolean,
       memoryPoolType: String,
diff --git a/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala
index 2ff38eae8..325ef51f6 100644
--- a/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala
@@ -19,7 +19,7 @@
 
 package org.apache.comet
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkEnv, SparkException}
 import org.apache.spark.sql.CometTestBase
 import org.apache.spark.sql.catalyst.expressions.PrettyAttribute
 import org.apache.spark.sql.comet.{CometExec, CometExecUtils}
@@ -40,8 +40,11 @@ class CometNativeSuite extends CometTestBase {
         limitOp,
         1,
         0)
-      cometIter.next()
-      cometIter.close()
+      try {
+        cometIter.next()
+      } finally {
+        cometIter.close()
+      }
       value
     }
 
@@ -63,4 +66,35 @@ class CometNativeSuite extends CometTestBase {
     }
     assert(exception2.getMessage contains "null context handle")
   }
+
+  test("Comet native should use spark local dir as temp dir") {
+    withParquetTable((0 until 100000).map(i => (i, i + 1)), "table") {
+      val dirs = SparkEnv.get.blockManager.getLocalDiskDirs
+      dirs.foreach { dir =>
+        val files = new java.io.File(dir).listFiles()
+        assert(!files.exists(f => f.isDirectory && 
f.getName.startsWith("datafusion-")))
+      }
+
+      // Check if the DataFusion temporary dir exists in the Spark local dirs 
when a spark job involving
+      // Comet native operator is running.
+      val observedDataFusionDir = spark
+        .table("table")
+        .selectExpr("_1 + _2 as value")
+        .rdd
+        .mapPartitions { _ =>
+          dirs.map { dir =>
+            val files = new java.io.File(dir).listFiles()
+            files.count(f => f.isDirectory && 
f.getName.startsWith("datafusion-"))
+          }.iterator
+        }
+        .sum()
+      assert(observedDataFusionDir > 0)
+
+      // DataFusion temporary dir should be cleaned up after the job is done.
+      dirs.foreach { dir =>
+        val files = new java.io.File(dir).listFiles()
+        assert(!files.exists(f => f.isDirectory && 
f.getName.startsWith("datafusion-")))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to