This is an automated email from the ASF dual-hosted git repository.

kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fe4f57da feat: Add unbounded memory pool (#1386)
4fe4f57da is described below

commit 4fe4f57da6339a5d66114750aee915525e601d4d
Author: KAZUYUKI TANIMURA <[email protected]>
AuthorDate: Wed Feb 12 10:17:02 2025 -0800

    feat: Add unbounded memory pool (#1386)
    
    ## Which issue does this PR close?
    
    ## Rationale for this change
    
    DataFusion has an unbounded memory pool. I found it useful for experimental 
purpose.
    
    ## What changes are included in this PR?
    
    Added an option for unbounded memory pool.
    
    ## How are these changes tested?
    
    existing tests
---
 common/src/main/scala/org/apache/comet/CometConf.scala | 4 ++--
 docs/source/user-guide/configs.md                      | 2 +-
 native/core/src/execution/jni_api.rs                   | 5 ++++-
 3 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 8c099d167..a65da43a5 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -504,8 +504,8 @@ object CometConf extends ShimCometConf {
     .doc(
       "The type of memory pool to be used for Comet native execution. " +
         "Available memory pool types are 'greedy', 'fair_spill', 
'greedy_task_shared', " +
-        "'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global'. 
For off-heap " +
-        "types are 'unified' and `fair_unified`.")
+        "'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and 
`unbounded`. " +
+        "For off-heap types are 'unified' and `fair_unified`.")
     .stringConf
     .createWithDefault("greedy_task_shared")
 
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 60e2e5dfc..6874b8d5f 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -48,7 +48,7 @@ Comet provides the following configuration settings.
 | spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | 
true |
 | spark.comet.exec.initCap.enabled | Whether to enable initCap by default. | 
false |
 | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by 
default. | true |
-| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet 
native execution. Available memory pool types are 'greedy', 'fair_spill', 
'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 
'fair_spill_global'. For off-heap types are 'unified' and `fair_unified`. | 
greedy_task_shared |
+| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet 
native execution. Available memory pool types are 'greedy', 'fair_spill', 
'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 
'fair_spill_global', and `unbounded`. For off-heap types are 'unified' and 
`fair_unified`. | greedy_task_shared |
 | spark.comet.exec.project.enabled | Whether to enable project by default. | 
true |
 | spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark 
to replace SortMergeJoin with ShuffledHashJoin for improved performance. This 
feature is not stable yet. For more information, refer to the Comet Tuning 
Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
 | spark.comet.exec.shuffle.compression.codec | The codec of Comet native 
shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. 
Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index fb0b62a72..1b11d9b25 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -26,7 +26,7 @@ use datafusion::{
     prelude::{SessionConfig, SessionContext},
 };
 use datafusion_execution::memory_pool::{
-    FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
+    FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, 
UnboundedMemoryPool,
 };
 use futures::poll;
 use jni::{
@@ -118,6 +118,7 @@ enum MemoryPoolType {
     FairSpillTaskShared,
     GreedyGlobal,
     FairSpillGlobal,
+    Unbounded,
 }
 
 struct MemoryPoolConfig {
@@ -319,6 +320,7 @@ fn parse_memory_pool_config(
             "greedy_global" => 
MemoryPoolConfig::new(MemoryPoolType::GreedyGlobal, pool_size),
             "fair_spill" => MemoryPoolConfig::new(MemoryPoolType::FairSpill, 
pool_size_per_task),
             "greedy" => MemoryPoolConfig::new(MemoryPoolType::Greedy, 
pool_size_per_task),
+            "unbounded" => MemoryPoolConfig::new(MemoryPoolType::Unbounded, 0),
             _ => {
                 return Err(CometError::Config(format!(
                     "Unsupported memory pool type: {}",
@@ -397,6 +399,7 @@ fn create_memory_pool(
             per_task_memory_pool.num_plans += 1;
             Arc::clone(&per_task_memory_pool.memory_pool)
         }
+        MemoryPoolType::Unbounded => Arc::new(UnboundedMemoryPool::default()),
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to