This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e72beb1fa chore: Follow-on PR to fully enable onheap memory usage 
(#1210)
e72beb1fa is described below

commit e72beb1faaba39d45e05e537d9b84151db7e73ff
Author: Andy Grove <[email protected]>
AuthorDate: Mon Jan 6 16:28:31 2025 -0700

    chore: Follow-on PR to fully enable onheap memory usage (#1210)
    
    * Make datafusion's native memory pool configurable
    
    * save
    
    * fix
    
    * Update memory calculation and add draft documentation
    
    * ready for review
    
    * ready for review
    
    * address feedback
    
    * Update docs/source/user-guide/tuning.md
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    
    * Update docs/source/user-guide/tuning.md
    
    Co-authored-by: Kristin Cowalcijk <[email protected]>
    
    * Update docs/source/user-guide/tuning.md
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    
    * Update docs/source/user-guide/tuning.md
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    
    * remove unused config
    
    ---------
    
    Co-authored-by: Kristin Cowalcijk <[email protected]>
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
 .../main/scala/org/apache/comet/CometConf.scala    |  9 ----
 docs/source/user-guide/configs.md                  |  1 -
 docs/source/user-guide/tuning.md                   | 57 +++++++++++++++++++---
 native/core/src/execution/jni_api.rs               |  7 +--
 .../shuffle/comet/CometShuffleMemoryAllocator.java |  2 +-
 .../scala/org/apache/comet/CometExecIterator.scala | 12 +++--
 .../apache/comet/CometSparkSessionExtensions.scala | 21 +-------
 spark/src/main/scala/org/apache/comet/Native.scala |  1 -
 8 files changed, 64 insertions(+), 46 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 2fff0a04c..4d63de75a 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -452,15 +452,6 @@ object CometConf extends ShimCometConf {
     .intConf
     .createWithDefault(8192)
 
-  val COMET_EXEC_MEMORY_FRACTION: ConfigEntry[Double] = 
conf("spark.comet.exec.memoryFraction")
-    .doc(
-      "The fraction of memory from Comet memory overhead that the native 
memory " +
-        "manager can use for execution. The purpose of this config is to set 
aside memory for " +
-        "untracked data structures, as well as imprecise size estimation 
during memory " +
-        "acquisition.")
-    .doubleConf
-    .createWithDefault(0.7)
-
   val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
     conf("spark.comet.parquet.enable.directBuffer")
       .doc("Whether to use Java direct byte buffer when reading Parquet.")
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index ecea70254..20923b93a 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -47,7 +47,6 @@ Comet provides the following configuration settings.
 | spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by 
default. | true |
 | spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | 
true |
 | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by 
default. | true |
-| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory 
overhead that the native memory manager can use for execution. The purpose of 
this config is to set aside memory for untracked data structures, as well as 
imprecise size estimation during memory acquisition. | 0.7 |
 | spark.comet.exec.memoryPool | The type of memory pool to be used for Comet 
native execution. Available memory pool types are 'greedy', 'fair_spill', 
'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 
'fair_spill_global', By default, this config is 'greedy_task_shared'. | 
greedy_task_shared |
 | spark.comet.exec.project.enabled | Whether to enable project by default. | 
true |
 | spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark 
to replace SortMergeJoin with ShuffledHashJoin for improved performance. This 
feature is not stable yet. For more information, refer to the Comet Tuning 
Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md
index e04e750b4..30b6d0f46 100644
--- a/docs/source/user-guide/tuning.md
+++ b/docs/source/user-guide/tuning.md
@@ -23,11 +23,52 @@ Comet provides some tuning options to help you get the best 
performance from you
 
 ## Memory Tuning
 
-Comet shares an off-heap memory pool between Spark and Comet. This requires 
setting `spark.memory.offHeap.enabled=true`.
-If this setting is not enabled, Comet will not accelerate queries and will 
fall back to Spark.
+### Unified Memory Management with Off-Heap Memory
+
+The recommended way to share memory between Spark and Comet is to set 
`spark.memory.offHeap.enabled=true`. This allows
+Comet to share an off-heap memory pool with Spark. The size of the pool is 
specified by `spark.memory.offHeap.size`. For more details about Spark off-heap 
memory mode, please refer to Spark documentation: 
https://spark.apache.org/docs/latest/configuration.html.
+
+### Dedicated Comet Memory Pools
+
+Spark uses on-heap memory mode by default, i.e., the 
`spark.memory.offHeap.enabled` setting is not enabled. If Spark is under 
on-heap memory mode, Comet will use its own dedicated memory pools that
+are not shared with Spark. This requires additional configuration settings to 
be specified to set the size and type of
+memory pool to use.
+
+The size of the pool can be set explicitly with `spark.comet.memoryOverhead`. 
If this setting is not specified then
+the memory overhead will be calculated by multiplying the executor memory by 
`spark.comet.memory.overhead.factor`
+(defaults to `0.2`).
+
+The type of pool can be specified with `spark.comet.exec.memoryPool`. The 
default setting is `greedy_task_shared`.
+
+The valid pool types are:
+
+- `greedy`
+- `greedy_global`
+- `greedy_task_shared`
+- `fair_spill`
+- `fair_spill_global`
+- `fair_spill_task_shared`
+
+Pool types ending with `_global` use a single global memory pool between all 
tasks on same executor.
+
+Pool types ending with `_task_shared` share a single memory pool across all 
attempts for a single task.
+
+Other pool types create a dedicated pool per native query plan using a 
fraction of the available pool size based on number of cores 
+and cores per task.
+
+The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements 
a greedy first-come first-serve limit. This
+pool works well for queries that do not need to spill or have a single 
spillable operator.
+
+The `fair_spill*` pool types use DataFusion's [FairSpillPool], which prevents 
spillable reservations from using more
+than an even fraction of the available memory sans any unspillable reservations
+(i.e. `(pool_size - unspillable_memory) / num_spillable_reservations)`). This 
pool works best when you know beforehand
+the query has multiple spillable operators that will likely all need to spill. 
Sometimes it will cause spills even
+when there was sufficient memory (reserved for other operators) to avoid doing 
so. Unspillable memory is allocated in
+a first-come, first-serve fashion
+
+[GreedyMemoryPool]: 
https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.GreedyMemoryPool.html
+[FairSpillPool]: 
https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html
 
-Each executor will have a single memory pool which will be shared by all 
native plans being executed within that
-process, and by Spark itself. The size of the pool is specified by 
`spark.memory.offHeap.size`.
 
 ### Determining How Much Memory to Allocate
 
@@ -106,15 +147,19 @@ then any shuffle operations that cannot be supported in 
this mode will fall back
 ### Shuffle Compression
 
 By default, Spark compresses shuffle files using LZ4 compression. Comet 
overrides this behavior with ZSTD compression.
-Compression can be disabled by setting `spark.shuffle.compress=false`, which 
may result in faster shuffle times in 
+Compression can be disabled by setting `spark.shuffle.compress=false`, which 
may result in faster shuffle times in
 certain environments, such as single-node setups with fast NVMe drives, at the 
expense of increased disk space usage.
 
 ## Explain Plan
+
 ### Extended Explain
+
 With Spark 4.0.0 and newer, Comet can provide extended explain plan 
information in the Spark UI. Currently this lists
 reasons why Comet may not have been enabled for specific operations.
 To enable this, in the Spark configuration, set the following:
+
 ```shell
 -c spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo
 ```
-This will add a section to the detailed plan displayed in the Spark SQL UI 
page.
\ No newline at end of file
+
+This will add a section to the detailed plan displayed in the Spark SQL UI 
page.
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index b1190d905..7d8d577fe 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -162,7 +162,6 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
     memory_pool_type: jstring,
     memory_limit: jlong,
     memory_limit_per_task: jlong,
-    memory_fraction: jdouble,
     task_attempt_id: jlong,
     debug_native: jboolean,
     explain_native: jboolean,
@@ -208,7 +207,6 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
             memory_pool_type,
             memory_limit,
             memory_limit_per_task,
-            memory_fraction,
         )?;
         let memory_pool =
             create_memory_pool(&memory_pool_config, task_memory_manager, 
task_attempt_id);
@@ -281,14 +279,13 @@ fn parse_memory_pool_config(
     memory_pool_type: String,
     memory_limit: i64,
     memory_limit_per_task: i64,
-    memory_fraction: f64,
 ) -> CometResult<MemoryPoolConfig> {
     let memory_pool_config = if use_unified_memory_manager {
         MemoryPoolConfig::new(MemoryPoolType::Unified, 0)
     } else {
         // Use the memory pool from DF
-        let pool_size = (memory_limit as f64 * memory_fraction) as usize;
-        let pool_size_per_task = (memory_limit_per_task as f64 * 
memory_fraction) as usize;
+        let pool_size = memory_limit as usize;
+        let pool_size_per_task = memory_limit_per_task as usize;
         match memory_pool_type.as_str() {
             "fair_spill_task_shared" => {
                 MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, 
pool_size_per_task)
diff --git 
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
 
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
index 54e349c13..f6e6ca96a 100644
--- 
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
+++ 
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
@@ -52,7 +52,7 @@ public final class CometShuffleMemoryAllocator extends 
CometShuffleMemoryAllocat
         (boolean)
             
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();
 
-    if (isSparkTesting && !useUnifiedMemAllocator) {
+    if (!useUnifiedMemAllocator) {
       synchronized (CometShuffleMemoryAllocator.class) {
         if (INSTANCE == null) {
           // CometTestShuffleMemoryAllocator handles pages by itself so it can 
be a singleton.
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 76da6c2d9..6066c890d 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -20,10 +20,11 @@
 package org.apache.comet
 
 import org.apache.spark._
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.comet.CometMetricNode
 import org.apache.spark.sql.vectorized._
 
-import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, 
COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXEC_MEMORY_POOL_TYPE, 
COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
+import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, 
COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, 
COMET_WORKER_THREADS}
 import org.apache.comet.vector.NativeUtil
 
 /**
@@ -52,7 +53,8 @@ class CometExecIterator(
     nativeMetrics: CometMetricNode,
     numParts: Int,
     partitionIndex: Int)
-    extends Iterator[ColumnarBatch] {
+    extends Iterator[ColumnarBatch]
+    with Logging {
 
   private val nativeLib = new Native()
   private val nativeUtil = new NativeUtil()
@@ -75,7 +77,6 @@ class CometExecIterator(
       memory_pool_type = COMET_EXEC_MEMORY_POOL_TYPE.get(),
       memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
       memory_limit_per_task = getMemoryLimitPerTask(conf),
-      memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(),
       task_attempt_id = TaskContext.get().taskAttemptId,
       debug = COMET_DEBUG_ENABLED.get(),
       explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
@@ -94,7 +95,10 @@ class CometExecIterator(
     val coresPerTask = conf.get("spark.task.cpus", "1").toFloat
     // example 16GB maxMemory * 16 cores with 4 cores per task results
     // in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
-    (maxMemory.toFloat * coresPerTask / numCores).toLong
+    val limit = (maxMemory.toFloat * coresPerTask / numCores).toLong
+    logInfo(
+      s"Calculated per-task memory limit of $limit ($maxMemory * $coresPerTask 
/ $numCores)")
+    limit
   }
 
   private def numDriverOrExecutorCores(conf: SparkConf): Int = {
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 4f5a57e9b..cd8ac7882 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}
 
 import org.apache.comet.CometConf._
 import org.apache.comet.CometExplainInfo.getActualPlan
-import org.apache.comet.CometSparkSessionExtensions.{createMessage, 
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, 
isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, 
isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, 
isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled, 
isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo, 
withInfos}
+import org.apache.comet.CometSparkSessionExtensions.{createMessage, 
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, 
isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, 
isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, 
isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, 
isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos}
 import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
 import org.apache.comet.rules.RewriteJoin
 import org.apache.comet.serde.OperatorOuterClass.Operator
@@ -919,14 +919,6 @@ class CometSparkSessionExtensions
     }
 
     override def apply(plan: SparkPlan): SparkPlan = {
-
-      // Comet required off-heap memory to be enabled
-      if (!isOffHeapEnabled(conf) && !isTesting) {
-        logWarning("Comet native exec disabled because 
spark.memory.offHeap.enabled=false")
-        withInfo(plan, "Comet native exec disabled because 
spark.memory.offHeap.enabled=false")
-        return plan
-      }
-
       // DataFusion doesn't have ANSI mode. For now we just disable CometExec 
if ANSI mode is
       // enabled.
       if (isANSIEnabled(conf)) {
@@ -1187,21 +1179,12 @@ object CometSparkSessionExtensions extends Logging {
     }
   }
 
-  private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean =
-    conf.getConfString("spark.memory.offHeap.enabled", "false").toBoolean
-
-  // Copied from org.apache.spark.util.Utils which is private to Spark.
-  private[comet] def isTesting: Boolean = {
-    System.getenv("SPARK_TESTING") != null || 
System.getProperty("spark.testing") != null
-  }
-
   // Check whether Comet shuffle is enabled:
   // 1. `COMET_EXEC_SHUFFLE_ENABLED` is true
   // 2. `spark.shuffle.manager` is set to `CometShuffleManager`
   // 3. Off-heap memory is enabled || Spark/Comet unit testing
   private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
-    COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) 
&&
-      (isOffHeapEnabled(conf) || isTesting)
+    COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf)
 
   private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): 
Option[String] = {
     if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) {
diff --git a/spark/src/main/scala/org/apache/comet/Native.scala 
b/spark/src/main/scala/org/apache/comet/Native.scala
index 5fd84989b..e5728009e 100644
--- a/spark/src/main/scala/org/apache/comet/Native.scala
+++ b/spark/src/main/scala/org/apache/comet/Native.scala
@@ -55,7 +55,6 @@ class Native extends NativeBase {
       memory_pool_type: String,
       memory_limit: Long,
       memory_limit_per_task: Long,
-      memory_fraction: Double,
       task_attempt_id: Long,
       debug: Boolean,
       explain: Boolean,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to