This is an automated email from the ASF dual-hosted git repository.
kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 4fe4f57da feat: Add unbounded memory pool (#1386)
4fe4f57da is described below
commit 4fe4f57da6339a5d66114750aee915525e601d4d
Author: KAZUYUKI TANIMURA <[email protected]>
AuthorDate: Wed Feb 12 10:17:02 2025 -0800
feat: Add unbounded memory pool (#1386)
## Which issue does this PR close?
## Rationale for this change
DataFusion has an unbounded memory pool. I found it useful for experimental
purpose.
## What changes are included in this PR?
Added an option for unbounded memory pool.
## How are these changes tested?
existing tests
---
common/src/main/scala/org/apache/comet/CometConf.scala | 4 ++--
docs/source/user-guide/configs.md | 2 +-
native/core/src/execution/jni_api.rs | 5 ++++-
3 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 8c099d167..a65da43a5 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -504,8 +504,8 @@ object CometConf extends ShimCometConf {
.doc(
"The type of memory pool to be used for Comet native execution. " +
"Available memory pool types are 'greedy', 'fair_spill',
'greedy_task_shared', " +
- "'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global'.
For off-heap " +
- "types are 'unified' and `fair_unified`.")
+ "'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and
`unbounded`. " +
+ "For off-heap types are 'unified' and `fair_unified`.")
.stringConf
.createWithDefault("greedy_task_shared")
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index 60e2e5dfc..6874b8d5f 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -48,7 +48,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. |
true |
| spark.comet.exec.initCap.enabled | Whether to enable initCap by default. |
false |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by
default. | true |
-| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet
native execution. Available memory pool types are 'greedy', 'fair_spill',
'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and
'fair_spill_global'. For off-heap types are 'unified' and `fair_unified`. |
greedy_task_shared |
+| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet
native execution. Available memory pool types are 'greedy', 'fair_spill',
'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global',
'fair_spill_global', and `unbounded`. For off-heap types are 'unified' and
`fair_unified`. | greedy_task_shared |
| spark.comet.exec.project.enabled | Whether to enable project by default. |
true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark
to replace SortMergeJoin with ShuffledHashJoin for improved performance. This
feature is not stable yet. For more information, refer to the Comet Tuning
Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native
shuffle used to compress shuffle data. lz4, zstd, and snappy are supported.
Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index fb0b62a72..1b11d9b25 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -26,7 +26,7 @@ use datafusion::{
prelude::{SessionConfig, SessionContext},
};
use datafusion_execution::memory_pool::{
- FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
+ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
UnboundedMemoryPool,
};
use futures::poll;
use jni::{
@@ -118,6 +118,7 @@ enum MemoryPoolType {
FairSpillTaskShared,
GreedyGlobal,
FairSpillGlobal,
+ Unbounded,
}
struct MemoryPoolConfig {
@@ -319,6 +320,7 @@ fn parse_memory_pool_config(
"greedy_global" =>
MemoryPoolConfig::new(MemoryPoolType::GreedyGlobal, pool_size),
"fair_spill" => MemoryPoolConfig::new(MemoryPoolType::FairSpill,
pool_size_per_task),
"greedy" => MemoryPoolConfig::new(MemoryPoolType::Greedy,
pool_size_per_task),
+ "unbounded" => MemoryPoolConfig::new(MemoryPoolType::Unbounded, 0),
_ => {
return Err(CometError::Config(format!(
"Unsupported memory pool type: {}",
@@ -397,6 +399,7 @@ fn create_memory_pool(
per_task_memory_pool.num_plans += 1;
Arc::clone(&per_task_memory_pool.memory_pool)
}
+ MemoryPoolType::Unbounded => Arc::new(UnboundedMemoryPool::default()),
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]