This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c832b4a feat: Require offHeap memory to be enabled (always use 
unified memory) (#1062)
2c832b4a is described below

commit 2c832b4a56eafa3dacbe3ef31d99adabccb803bf
Author: Andy Grove <[email protected]>
AuthorDate: Thu Nov 14 12:55:57 2024 -0700

    feat: Require offHeap memory to be enabled (always use unified memory) 
(#1062)
    
    * Require offHeap memory
    
    * remove unused import
    
    * use off heap memory in stability tests
    
    * reorder imports
---
 docs/source/user-guide/tuning.md                   | 32 ++--------------------
 native/core/src/execution/jni_api.rs               | 24 ++--------------
 .../scala/org/apache/comet/CometExecIterator.scala | 11 +-------
 .../apache/comet/CometSparkSessionExtensions.scala |  7 +++++
 .../spark/sql/comet/CometPlanStabilitySuite.scala  |  3 ++
 5 files changed, 16 insertions(+), 61 deletions(-)

diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md
index 30ada4c9..b1838ca8 100644
--- a/docs/source/user-guide/tuning.md
+++ b/docs/source/user-guide/tuning.md
@@ -23,40 +23,12 @@ Comet provides some tuning options to help you get the best 
performance from you
 
 ## Memory Tuning
 
-Comet provides two options for memory management:
-
-- **Unified Memory Management** shares an off-heap memory pool between Spark 
and Comet. This is the recommended option.
-- **Native Memory Management** leverages DataFusion's memory management for 
the native plans and allocates memory independently of Spark.
-
-### Unified Memory Management
-
-This option is automatically enabled when `spark.memory.offHeap.enabled=true`.
+Comet shares an off-heap memory pool between Spark and Comet. This requires 
setting `spark.memory.offHeap.enabled=true`. 
+If this setting is not enabled, Comet will not accelerate queries and will 
fall back to Spark.
 
 Each executor will have a single memory pool which will be shared by all 
native plans being executed within that
 process, and by Spark itself. The size of the pool is specified by 
`spark.memory.offHeap.size`.
 
-### Native Memory Management
-
-This option is automatically enabled when `spark.memory.offHeap.enabled=false`.
-
-Each native plan has a dedicated memory pool.
-
-By default, the size of each pool is `spark.comet.memory.overhead.factor * 
spark.executor.memory`. The default value
-for `spark.comet.memory.overhead.factor` is `0.2`.
-
-It is important to take executor concurrency into account. The maximum number 
of concurrent plans in an executor can
-be calculated with `spark.executor.cores / spark.task.cpus`.
-
-For example, if the executor can execute 4 plans concurrently, then the total 
amount of memory allocated will be
-`4 * spark.comet.memory.overhead.factor * spark.executor.memory`.
-
-It is also possible to set `spark.comet.memoryOverhead` to the desired size 
for each pool, rather than calculating
-it based on `spark.comet.memory.overhead.factor`.
-
-If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor` 
are set, the former will be used.
-
-Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool.
-
 ### Determining How Much Memory to Allocate
 
 Generally, increasing memory overhead will improve query performance, 
especially for queries containing joins and
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index d7e8ccab..47d87fe1 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -202,27 +202,9 @@ fn prepare_datafusion_session_context(
 
     let mut rt_config = 
RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
 
-    // Check if we are using unified memory manager integrated with Spark. 
Default to false if not
-    // set.
-    let use_unified_memory_manager = parse_bool(conf, 
"use_unified_memory_manager")?;
-
-    if use_unified_memory_manager {
-        // Set Comet memory pool for native
-        let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
-        rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
-    } else {
-        // Use the memory pool from DF
-        if conf.contains_key("memory_limit") {
-            let memory_limit = 
conf.get("memory_limit").unwrap().parse::<usize>()?;
-            let memory_fraction = conf
-                .get("memory_fraction")
-                .ok_or(CometError::Internal(
-                    "Config 'memory_fraction' is not specified from Comet JVM 
side".to_string(),
-                ))?
-                .parse::<f64>()?;
-            rt_config = rt_config.with_memory_limit(memory_limit, 
memory_fraction)
-        }
-    }
+    // Set Comet memory pool for native
+    let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
+    rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
 
     // Get Datafusion configuration from Spark Execution context
     // can be configured in Comet Spark JVM using Spark --conf parameters
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 07dd80c3..b1f22726 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -23,7 +23,7 @@ import org.apache.spark._
 import org.apache.spark.sql.comet.CometMetricNode
 import org.apache.spark.sql.vectorized._
 
-import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, 
COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, 
COMET_WORKER_THREADS}
+import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, 
COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
 import org.apache.comet.vector.NativeUtil
 
 /**
@@ -75,15 +75,6 @@ class CometExecIterator(
     val result = new java.util.HashMap[String, String]()
     val conf = SparkEnv.get.conf
 
-    val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf)
-    // Only enable unified memory manager when off-heap mode is enabled. 
Otherwise,
-    // we'll use the built-in memory pool from DF, and initializes with 
`memory_limit`
-    // and `memory_fraction` below.
-    result.put(
-      "use_unified_memory_manager",
-      String.valueOf(conf.get("spark.memory.offHeap.enabled", "false")))
-    result.put("memory_limit", String.valueOf(maxMemory))
-    result.put("memory_fraction", 
String.valueOf(COMET_EXEC_MEMORY_FRACTION.get()))
     result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get()))
     result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get()))
     result.put("explain_native", 
String.valueOf(COMET_EXPLAIN_NATIVE_ENABLED.get()))
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 6a5c0efe..1c4ffcf3 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -915,6 +915,13 @@ class CometSparkSessionExtensions
     }
 
     override def apply(plan: SparkPlan): SparkPlan = {
+
+      // Comet required off-heap memory to be enabled
+      if ("true" != conf.getConfString("spark.memory.offHeap.enabled", 
"false")) {
+        logInfo("Comet extension disabled because 
spark.memory.offHeap.enabled=false")
+        return plan
+      }
+
       // DataFusion doesn't have ANSI mode. For now we just disable CometExec 
if ANSI mode is
       // enabled.
       if (isANSIEnabled(conf)) {
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index a553e61c..16a7e533 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.SparkContext
+import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, 
MEMORY_OFFHEAP_SIZE}
 import org.apache.spark.sql.TPCDSBase
 import org.apache.spark.sql.catalyst.expressions.AttributeSet
 import org.apache.spark.sql.catalyst.util.resourceToString
@@ -286,6 +287,8 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
     conf.set(
       "spark.shuffle.manager",
       "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+    conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
+    conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
     conf.set(CometConf.COMET_ENABLED.key, "true")
     conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
     conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to