This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 382ac938f docs: various improvements to tuning guide (#1525)
382ac938f is described below

commit 382ac938f2d10666eefc08ec5c1c82025ddf3726
Author: Andy Grove <[email protected]>
AuthorDate: Wed Mar 19 19:28:20 2025 -0600

    docs: various improvements to tuning guide (#1525)
---
 .../main/scala/org/apache/comet/CometConf.scala    |  56 +++-----
 docs/source/user-guide/configs.md                  |   6 +-
 docs/source/user-guide/tuning.md                   | 156 ++++++++++++++-------
 native/core/src/execution/jni_api.rs               |  15 +-
 .../comet/CometBoundedShuffleMemoryAllocator.java  |  10 +-
 .../scala/org/apache/comet/CometExecIterator.scala |  11 +-
 .../apache/comet/CometSparkSessionExtensions.scala |  11 +-
 spark/src/main/scala/org/apache/comet/Native.scala |  10 +-
 .../src/main/scala/org/apache/spark/Plugins.scala  |  17 +--
 9 files changed, 170 insertions(+), 122 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 33039b744..f354f3d15 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -235,27 +235,28 @@ object CometConf extends ShimCometConf {
 
   val COMET_MEMORY_OVERHEAD: OptionalConfigEntry[Long] = 
conf("spark.comet.memoryOverhead")
     .doc(
-      "The amount of additional memory to be allocated per executor process 
for Comet, in MiB. " +
+      "The amount of additional memory to be allocated per executor process 
for Comet, in MiB, " +
+        "when running in on-heap mode or when using the `fair_unified` pool in 
off-heap mode. " +
         "This config is optional. If this is not specified, it will be set to 
" +
-        "`spark.comet.memory.overhead.factor` * `spark.executor.memory`. " +
-        "This is memory that accounts for things like Comet native execution, 
Comet shuffle, etc.")
+        s"`spark.comet.memory.overhead.factor` * `spark.executor.memory`. 
$TUNING_GUIDE.")
     .bytesConf(ByteUnit.MiB)
     .createOptional
 
-  val COMET_MEMORY_OVERHEAD_FACTOR: ConfigEntry[Double] = conf(
-    "spark.comet.memory.overhead.factor")
-    .doc(
-      "Fraction of executor memory to be allocated as additional non-heap 
memory per executor " +
-        "process for Comet.")
-    .doubleConf
-    .checkValue(
-      factor => factor > 0,
-      "Ensure that Comet memory overhead factor is a double greater than 0")
-    .createWithDefault(0.2)
+  val COMET_MEMORY_OVERHEAD_FACTOR: ConfigEntry[Double] =
+    conf("spark.comet.memory.overhead.factor")
+      .doc("Fraction of executor memory to be allocated as additional memory 
for Comet " +
+        "when running in on-heap mode or when using the `fair_unified` pool in 
off-heap mode. " +
+        s"$TUNING_GUIDE.")
+      .doubleConf
+      .checkValue(
+        factor => factor > 0,
+        "Ensure that Comet memory overhead factor is a double greater than 0")
+      .createWithDefault(0.2)
 
   val COMET_MEMORY_OVERHEAD_MIN_MIB: ConfigEntry[Long] = 
conf("spark.comet.memory.overhead.min")
     .doc("Minimum amount of additional memory to be allocated per executor 
process for Comet, " +
-      "in MiB.")
+      "in MiB, when running in on-heap mode or when using the `fair_unified` 
pool in off-heap " +
+      s"mode. $TUNING_GUIDE.")
     .bytesConf(ByteUnit.MiB)
     .checkValue(
       _ >= 0,
@@ -274,11 +275,10 @@ object CometConf extends ShimCometConf {
       .createWithDefault(true)
 
   val COMET_SHUFFLE_MODE: ConfigEntry[String] = 
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode")
-    .doc("The mode of Comet shuffle. This config is only effective if Comet 
shuffle " +
-      "is enabled. Available modes are 'native', 'jvm', and 'auto'. " +
-      "'native' is for native shuffle which has best performance in general. " 
+
-      "'jvm' is for jvm-based columnar shuffle which has higher coverage than 
native shuffle. " +
-      "'auto' is for Comet to choose the best shuffle mode based on the query 
plan.")
+    .doc(
+      "This is test config to allow tests to force a particular shuffle 
implementation to be " +
+        "used. Valid values are `jvm` for Columnar Shuffle, `native` for 
Native Shuffle, " +
+        s"and `auto` to pick the best supported option (`native` has 
priority). $TUNING_GUIDE.")
     .internal()
     .stringConf
     .transform(_.toLowerCase(Locale.ROOT))
@@ -378,26 +378,16 @@ object CometConf extends ShimCometConf {
   val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] =
     conf("spark.comet.columnar.shuffle.memorySize")
       .internal()
-      .doc(
-        "Test-only config. This is only used to test Comet shuffle with Spark 
tests. " +
-          "The optional maximum size of the memory used for Comet columnar 
shuffle, in MiB. " +
-          "Note that this config is only used when 
`spark.comet.exec.shuffle.mode` is " +
-          "`jvm`. Once allocated memory size reaches this config, the current 
batch will be " +
-          "flushed to disk immediately. If this is not configured, Comet will 
use " +
-          "`spark.comet.shuffle.memory.factor` * `spark.comet.memoryOverhead` 
as " +
-          "shuffle memory size. If final calculated value is larger than Comet 
memory " +
-          "overhead, Comet will use Comet memory overhead as shuffle memory 
size.")
+      .doc("Amount of memory to reserve for columnar shuffle when running in 
on-heap mode. " +
+        s"$TUNING_GUIDE.")
       .bytesConf(ByteUnit.MiB)
       .createOptional
 
   val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
     conf("spark.comet.columnar.shuffle.memory.factor")
       .internal()
-      .doc(
-        "Test-only config. This is only used to test Comet shuffle with Spark 
tests. " +
-          "Fraction of Comet memory to be allocated per executor process for 
Comet shuffle. " +
-          "Comet memory size is specified by `spark.comet.memoryOverhead` or " 
+
-          "calculated by `spark.comet.memory.overhead.factor` * 
`spark.executor.memory`.")
+      .doc("Fraction of Comet memory to be allocated per executor process for 
columnar shuffle " +
+        s"when running in on-heap mode. $TUNING_GUIDE.")
       .doubleConf
       .checkValue(
         factor => factor > 0,
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index a208d211e..32c6a3db8 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -71,9 +71,9 @@ Comet provides the following configuration settings.
 | spark.comet.explain.verbose.enabled | When this setting is enabled, Comet 
will provide a verbose tree representation of the extended information. | false 
|
 | spark.comet.explainFallback.enabled | When this setting is enabled, Comet 
will provide logging explaining the reason(s) why a query stage cannot be 
executed natively. Set this to false to reduce the amount of logging. | false |
 | spark.comet.expression.allowIncompatible | Comet is not currently fully 
compatible with Spark for all expressions. Set this config to true to allow 
them anyway. For more information, refer to the Comet Compatibility Guide 
(https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
-| spark.comet.memory.overhead.factor | Fraction of executor memory to be 
allocated as additional non-heap memory per executor process for Comet. | 0.2 |
-| spark.comet.memory.overhead.min | Minimum amount of additional memory to be 
allocated per executor process for Comet, in MiB. | 402653184b |
-| spark.comet.memoryOverhead | The amount of additional memory to be allocated 
per executor process for Comet, in MiB. This config is optional. If this is not 
specified, it will be set to `spark.comet.memory.overhead.factor` * 
`spark.executor.memory`. This is memory that accounts for things like Comet 
native execution, Comet shuffle, etc. | |
+| spark.comet.memory.overhead.factor | Fraction of executor memory to be 
allocated as additional memory for Comet when running in on-heap mode or when 
using the `fair_unified` pool in off-heap mode. For more information, refer to 
the Comet Tuning Guide 
(https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 |
+| spark.comet.memory.overhead.min | Minimum amount of additional memory to be 
allocated per executor process for Comet, in MiB, when running in on-heap mode 
or when using the `fair_unified` pool in off-heap mode. For more information, 
refer to the Comet Tuning Guide 
(https://datafusion.apache.org/comet/user-guide/tuning.html). | 402653184b |
+| spark.comet.memoryOverhead | The amount of additional memory to be allocated 
per executor process for Comet, in MiB, when running in on-heap mode or when 
using the `fair_unified` pool in off-heap mode. This config is optional. If 
this is not specified, it will be set to `spark.comet.memory.overhead.factor` * 
`spark.executor.memory`. For more information, refer to the Comet Tuning Guide 
(https://datafusion.apache.org/comet/user-guide/tuning.html). | |
 | spark.comet.metrics.updateInterval | The interval in milliseconds to update 
metrics. If interval is negative, metrics will be updated upon task completion. 
| 3000 |
 | spark.comet.nativeLoadRequired | Whether to require Comet native library to 
load successfully when Comet is enabled. If not, Comet will silently fallback 
to Spark when it fails to load the native lib. Otherwise, an error will be 
thrown and the Spark job will be aborted. | false |
 | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte 
buffer when reading Parquet. | false |
diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md
index a57bb3f80..1a17f4ccc 100644
--- a/docs/source/user-guide/tuning.md
+++ b/docs/source/user-guide/tuning.md
@@ -17,18 +17,96 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Tuning Guide
+# Comet Tuning Guide
 
 Comet provides some tuning options to help you get the best performance from 
your queries.
 
 ## Memory Tuning
 
-### Unified Memory Management with Off-Heap Memory
+It is necessary to specify how much memory Comet can use in addition to memory 
already allocated to Spark. In some
+cases, it may be possible to reduce the amount of memory allocated to Spark so 
that overall memory allocation is
+the same or lower than the original configuration. In other cases, enabling 
Comet may require allocating more memory
+than before. See the [Determining How Much Memory to Allocate] section for 
more details.
 
-The recommended way to share memory between Spark and Comet is to set 
`spark.memory.offHeap.enabled=true`. This allows
-Comet to share an off-heap memory pool with Spark. The size of the pool is 
specified by `spark.memory.offHeap.size`. For more details about Spark off-heap 
memory mode, please refer to Spark documentation: 
https://spark.apache.org/docs/latest/configuration.html.
+[Determining How Much Memory to Allocate]: 
#determining-how-much-memory-to-allocate
 
-The type of pool can be specified with `spark.comet.exec.memoryPool`.
+Comet supports Spark's on-heap (the default) and off-heap mode for allocating 
memory. However, we strongly recommend
+using off-heap mode. Comet has some limitations when running in on-heap mode, 
such as requiring more memory overall,
+and requiring shuffle memory to be separately configured.
+
+### Configuring Comet Memory in Off-Heap Mode
+
+The recommended way to allocate memory for Comet is to set 
`spark.memory.offHeap.enabled=true`. This allows
+Comet to share an off-heap memory pool with Spark, reducing the overall memory 
overhead. The size of the pool is
+specified by `spark.memory.offHeap.size`. For more details about Spark 
off-heap memory mode, please refer to
+Spark documentation: https://spark.apache.org/docs/latest/configuration.html.
+
+### Configuring Comet Memory in On-Heap Mode
+
+When running in on-heap mode, Comet memory can be allocated by setting 
`spark.comet.memoryOverhead`. If this setting
+is not provided, it will be calculated by multiplying the current Spark 
executor memory by
+`spark.comet.memory.overhead.factor` (default value is `0.2`) which may or may 
not result in enough memory for 
+Comet to operate. It is not recommended to rely on this behavior. It is better 
to specify `spark.comet.memoryOverhead`
+explicitly.
+
+Comet supports native shuffle and columnar shuffle (these terms are explained 
in the [shuffle] section below). 
+In on-heap mode, columnar shuffle memory must be separately allocated using 
`spark.comet.columnar.shuffle.memorySize`. 
+If this setting is not provided, it will be calculated by multiplying 
`spark.comet.memoryOverhead` by
+`spark.comet.columnar.shuffle.memory.factor` (default value is `1.0`). If a 
shuffle exceeds this amount of memory 
+then the query will fail.
+
+[shuffle]: #shuffle
+
+### Determining How Much Memory to Allocate
+
+Generally, increasing the amount of memory allocated to Comet will improve 
query performance by reducing the
+amount of time spent spilling to disk, especially for aggregate, join, and 
shuffle operations. Allocating insufficient
+memory can result in out-of-memory errors. This is no different from 
allocating memory in Spark and the amount of
+memory will vary for different workloads, so some experimentation will be 
required.
+
+Here is a real-world example, based on running benchmarks derived from TPC-H, 
running on a single executor against 
+local Parquet files using the 100 GB data set.
+
+Baseline Spark Performance 
+
+- Spark completes the benchmark in 632 seconds with 8 cores and 8 GB RAM
+- With less than 8 GB RAM, performance degrades due to spilling
+- Spark can complete the benchmark with as little as 3 GB of RAM, but with 
worse performance (744 seconds)
+
+Comet Performance
+
+- Comet requires at least 5 GB of RAM in off-heap mode and 6 GB RAM in on-heap 
mode, but performance at this level 
+  is around 340 seconds, which is significantly faster than Spark with any 
amount of RAM
+- Comet running in off-heap with 8 cores completes the benchmark in 295 
seconds, more than 2x faster than Spark
+- It is worth noting that running Comet with only 4 cores and 4 GB RAM 
completes the benchmark in 520 seconds, 
+  providing better performance than Spark for half the resource
+
+It may be possible to reduce Comet's memory overhead by reducing batch sizes 
or increasing number of partitions.
+
+### SortExec
+
+Comet's SortExec implementation spills to disk when under memory pressure, but 
there are some known issues in the 
+underlying DataFusion SortExec implementation that could cause out-of-memory 
errors during spilling. See
+https://github.com/apache/datafusion/issues/14692 for more information.
+
+Workarounds for this problem include:
+
+- Allocating more off-heap memory
+- Disabling native sort by setting `spark.comet.exec.sort.enabled=false` 
+
+## Advanced Memory Tuning
+
+### Configuring spark.executor.memoryOverhead in On-Heap Mode
+
+In some environments, such as Kubernetes and YARN, it is important to 
correctly set `spark.executor.memoryOverhead` so
+that it is possible to allocate off-heap memory when running in on-heap mode.
+
+Comet will automatically set `spark.executor.memoryOverhead` based on the 
`spark.comet.memory*` settings so that
+resource managers respect Apache Spark memory configuration before starting 
the containers.
+
+### Configuring Off-Heap Memory Pools
+
+Comet implements multiple memory pool implementations. The type of pool can be 
specified with `spark.comet.exec.memoryPool`.
 
 The valid pool types are:
 
@@ -36,22 +114,24 @@ The valid pool types are:
 - `fair_unified`
 
 The `unified` pool type implements a greedy first-come first-serve limit. This 
pool works well for queries that do not
-need to spill or have a single spillable operator.
+need to spill or have a single spillable operator. The size of the pool is 
specified by `spark.memory.offHeap.size` 
+and the pool interacts with Spark's memory pool, effectively sharing the 
off-heap memory between Spark and Comet. This 
+approach is sometimes referred to as unified memory management.
 
 The `fair_unified` pool type prevents operators from using more than an even 
fraction of the available memory
 (i.e. `pool_size / num_reservations`). This pool works best when you know 
beforehand
 the query has multiple operators that will likely all need to spill. Sometimes 
it will cause spills even
 when there is sufficient memory in order to leave enough memory for other 
operators.
 
-### Dedicated Comet Memory Pools
+The pool size configuration for the `fair_unified` pool, is a little more 
complex. The total pool size is computed by 
+multiplying `spark.memory.offHeap.size` by 
`spark.comet.memory.overhead.factor` with the minimum amount being 
+`spark.comet.memory.overhead.min`. It is also possible to manually specify 
`spark.comet.memoryOverhead` instead to 
+override this default behavior. Note that the `fair_unified` pool does not use 
unified memory management to interact 
+with Spark's memory pools, which is why the allocation defaults to a fraction 
of off-heap memory.
 
-Spark uses on-heap memory mode by default, i.e., the 
`spark.memory.offHeap.enabled` setting is not enabled. If Spark is under 
on-heap memory mode, Comet will use its own dedicated memory pools that
-are not shared with Spark. This requires additional configuration settings to 
be specified to set the size and type of
-memory pool to use.
+### Configuring On-Heap Memory Pools
 
-The size of the pool can be set explicitly with `spark.comet.memoryOverhead`. 
If this setting is not specified then
-the memory overhead will be calculated by multiplying the executor memory by 
`spark.comet.memory.overhead.factor`
-(defaults to `0.2`).
+When running in on-heap mode, Comet will use its own dedicated memory pools 
that are not shared with Spark.
 
 The type of pool can be specified with `spark.comet.exec.memoryPool`. The 
default setting is `greedy_task_shared`.
 
@@ -69,7 +149,7 @@ Pool types ending with `_global` use a single global memory 
pool between all tas
 
 Pool types ending with `_task_shared` share a single memory pool across all 
attempts for a single task.
 
-Other pool types create a dedicated pool per native query plan using a 
fraction of the available pool size based on number of cores 
+Other pool types create a dedicated pool per native query plan using a 
fraction of the available pool size based on number of cores
 and cores per task.
 
 The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements 
a greedy first-come first-serve limit. This
@@ -91,28 +171,6 @@ adjusting how much memory to allocate.
 [FairSpillPool]: 
https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html
 [UnboundedMemoryPool]: 
https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.UnboundedMemoryPool.html
 
-
-### Determining How Much Memory to Allocate
-
-Generally, increasing memory overhead will improve query performance, 
especially for queries containing joins and
-aggregates.
-
-Once a memory pool is exhausted, the native plan will start spilling to disk, 
which will slow down the query.
-
-Insufficient memory allocation can also lead to out-of-memory (OOM) errors.
-
-## Configuring spark.executor.memoryOverhead
-
-In some environments, such as Kubernetes and YARN, it is important to 
correctly set `spark.executor.memoryOverhead` so
-that it is possible to allocate off-heap memory.
-
-Comet will automatically set `spark.executor.memoryOverhead` based on the 
`spark.comet.memory*` settings so that
-resource managers respect Apache Spark memory configuration before starting 
the containers.
-
-Note that there is currently a known issue where this will be inaccurate when 
using Native Memory Management because it
-does not take executor concurrency into account. The tracking issue for this is
-https://github.com/apache/datafusion-comet/issues/949.
-
 ## Optimizing Joins
 
 Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability 
reasons. If the build-side of a
@@ -141,30 +199,22 @@ It must be set before the Spark context is created. You 
can enable or disable Co
 at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`.
 Once it is disabled, Comet will fall back to the default Spark shuffle manager.
 
-### Shuffle Mode
+### Shuffle Implementations
 
-Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and Auto 
Mode.
+Comet provides two shuffle implementations: Native Shuffle and Columnar 
Shuffle. Comet will first try to use Native
+Shuffle and if that is not possible it will try to use Columnar Shuffle. If 
neither can be applied, it will fall
+back to Spark for shuffle operations.
 
-#### Auto Mode
+#### Native Shuffle
 
-`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best 
shuffle mode based on the query plan. This
-is the default.
+Comet provides a fully native shuffle implementation, which generally provides 
the best performance. However,
+native shuffle currently only supports `HashPartitioning` and 
`SinglePartitioning` and has some restrictions on
+supported data types.
 
 #### Columnar (JVM) Shuffle
 
 Comet Columnar shuffle is JVM-based and supports `HashPartitioning`, 
`RoundRobinPartitioning`, `RangePartitioning`, and
-`SinglePartitioning`. This mode has the highest query coverage.
-
-Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to 
`jvm`. If this mode is explicitly set,
-then any shuffle operations that cannot be supported in this mode will fall 
back to Spark.
-
-#### Native Shuffle
-
-Comet also provides a fully native shuffle implementation, which generally 
provides the best performance. However,
-native shuffle currently only supports `HashPartitioning` and 
`SinglePartitioning`.
-
-To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If 
this mode is explicitly set,
-then any shuffle operations that cannot be supported in this mode will fall 
back to Spark.
+`SinglePartitioning`. This shuffle implementation supports more data types 
than native shuffle.
 
 ### Shuffle Compression
 
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index aeb3e3efd..0aa1b113d 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -168,7 +168,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
     metrics_update_interval: jlong,
     comet_task_memory_manager_obj: JObject,
     batch_size: jint,
-    use_unified_memory_manager: jboolean,
+    off_heap_mode: jboolean,
     memory_pool_type: jstring,
     memory_limit: jlong,
     memory_limit_per_task: jlong,
@@ -213,7 +213,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
 
         let memory_pool_type = 
env.get_string(&JString::from_raw(memory_pool_type))?.into();
         let memory_pool_config = parse_memory_pool_config(
-            use_unified_memory_manager != JNI_FALSE,
+            off_heap_mode != JNI_FALSE,
             memory_pool_type,
             memory_limit,
             memory_limit_per_task,
@@ -293,16 +293,21 @@ fn prepare_datafusion_session_context(
 }
 
 fn parse_memory_pool_config(
-    use_unified_memory_manager: bool,
+    off_heap_mode: bool,
     memory_pool_type: String,
     memory_limit: i64,
     memory_limit_per_task: i64,
 ) -> CometResult<MemoryPoolConfig> {
     let pool_size = memory_limit as usize;
-    let memory_pool_config = if use_unified_memory_manager {
+    let memory_pool_config = if off_heap_mode {
         match memory_pool_type.as_str() {
             "fair_unified" => 
MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size),
-            _ => MemoryPoolConfig::new(MemoryPoolType::Unified, 0),
+            _ => {
+                // the Unified memory pool interacts with Spark's memory pool 
to allocate
+                // memory therefore does not need a size to be explicitly set. 
The pool size
+                // shared with Spark is set by `spark.memory.offHeap.size`.
+                MemoryPoolConfig::new(MemoryPoolType::Unified, 0)
+            }
         }
     } else {
         // Use the memory pool from DF
diff --git 
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
 
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
index bd6782a5d..051b1c6fa 100644
--- 
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
+++ 
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
@@ -43,10 +43,12 @@ import org.apache.comet.CometSparkSessionExtensions$;
  * <p>Some methods are copied from 
`org.apache.spark.unsafe.memory.TaskMemoryManager` with
  * modifications. Most modifications are to remove the dependency on the 
configured memory mode.
  *
- * <p>This allocator is test-only and should not be used in production. It is 
used to test Comet JVM
- * shuffle and execution with Spark tests which basically require on-heap 
memory configuration.
- * Thus, this allocator is used to allocate separate off-heap memory 
allocation for Comet JVM
- * shuffle and execution apart from Spark's on-heap memory configuration.
+ * <p>This allocator is only used by Comet Columnar Shuffle when running in 
on-heap mode. It is used
+ * when users run in on-heap mode as well as in the Spark tests which require 
on-heap memory
+ * configuration.
+ *
+ * <p>Thus, this allocator is used to allocate separate off-heap memory 
allocation for Comet
+ * Columnar Shuffle and execution apart from Spark's on-heap memory 
configuration.
  */
 public final class CometBoundedShuffleMemoryAllocator extends 
CometShuffleMemoryAllocatorTrait {
   private final UnsafeMemoryAllocator allocator = new UnsafeMemoryAllocator();
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index f88a89d98..e4befefa6 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -75,12 +75,11 @@ class CometExecIterator(
       metricsUpdateInterval = COMET_METRICS_UPDATE_INTERVAL.get(),
       new CometTaskMemoryManager(id),
       batchSize = COMET_BATCH_SIZE.get(),
-      use_unified_memory_manager =
-        CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf),
-      memory_pool_type = COMET_EXEC_MEMORY_POOL_TYPE.get(),
-      memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
-      memory_limit_per_task = getMemoryLimitPerTask(conf),
-      task_attempt_id = TaskContext.get().taskAttemptId,
+      offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf),
+      memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),
+      memoryLimit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
+      memoryLimitPerTask = getMemoryLimitPerTask(conf),
+      taskAttemptId = TaskContext.get().taskAttemptId,
       debug = COMET_DEBUG_ENABLED.get(),
       explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
       workerThreads = COMET_WORKER_THREADS.get(),
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 105144e55..a669b36eb 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -1336,7 +1336,7 @@ object CometSparkSessionExtensions extends Logging {
 
   /** Calculates required memory overhead in MB per executor process for 
Comet. */
   def getCometMemoryOverheadInMiB(sparkConf: SparkConf): Long = {
-    val baseMemoryMiB = if (cometUnifiedMemoryManagerEnabled(sparkConf)) {
+    val baseMemoryMiB = if (isOffHeapEnabled(sparkConf)) {
       ConfigHelpers
         .byteFromString(sparkConf.get("spark.memory.offHeap.size"), 
ByteUnit.MiB)
     } else {
@@ -1367,8 +1367,13 @@ object CometSparkSessionExtensions extends Logging {
     ByteUnit.MiB.toBytes(getCometMemoryOverheadInMiB(sparkConf))
   }
 
-  /** Calculates required shuffle memory size in bytes per executor process 
for Comet. */
+  /**
+   * Calculates required shuffle memory size in bytes per executor process for 
Comet when running
+   * in on-heap mode.
+   */
   def getCometShuffleMemorySize(sparkConf: SparkConf, conf: SQLConf = 
SQLConf.get): Long = {
+    assert(!isOffHeapEnabled(sparkConf))
+
     val cometMemoryOverhead = getCometMemoryOverheadInMiB(sparkConf)
 
     val overheadFactor = COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.get(conf)
@@ -1391,7 +1396,7 @@ object CometSparkSessionExtensions extends Logging {
     ByteUnit.BYTE.toMiB(getCometShuffleMemorySize(sparkConf, conf))
   }
 
-  def cometUnifiedMemoryManagerEnabled(sparkConf: SparkConf): Boolean = {
+  def isOffHeapEnabled(sparkConf: SparkConf): Boolean = {
     sparkConf.getBoolean("spark.memory.offHeap.enabled", false)
   }
 
diff --git a/spark/src/main/scala/org/apache/comet/Native.scala 
b/spark/src/main/scala/org/apache/comet/Native.scala
index 2be0bf006..e466b2f4d 100644
--- a/spark/src/main/scala/org/apache/comet/Native.scala
+++ b/spark/src/main/scala/org/apache/comet/Native.scala
@@ -59,11 +59,11 @@ class Native extends NativeBase {
       metricsUpdateInterval: Long,
       taskMemoryManager: CometTaskMemoryManager,
       batchSize: Int,
-      use_unified_memory_manager: Boolean,
-      memory_pool_type: String,
-      memory_limit: Long,
-      memory_limit_per_task: Long,
-      task_attempt_id: Long,
+      offHeapMode: Boolean,
+      memoryPoolType: String,
+      memoryLimit: Long,
+      memoryLimitPerTask: Long,
+      taskAttemptId: Long,
       debug: Boolean,
       explain: Boolean,
       workerThreads: Int,
diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala 
b/spark/src/main/scala/org/apache/spark/Plugins.scala
index 835fa515b..64426c716 100644
--- a/spark/src/main/scala/org/apache/spark/Plugins.scala
+++ b/spark/src/main/scala/org/apache/spark/Plugins.scala
@@ -63,13 +63,10 @@ class CometDriverPlugin extends DriverPlugin with Logging 
with ShimCometDriverPl
         Math.max((executorMemory * memoryOverheadFactor).toLong, 
memoryOverheadMinMib)
       }
 
-      val cometMemOverhead =
-        if 
(!CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(sc.getConf)) {
-          CometSparkSessionExtensions.getCometMemoryOverheadInMiB(sc.getConf)
-        } else {
-          // comet shuffle unified memory manager is disabled, so we need to 
add overhead memory
-          
CometSparkSessionExtensions.getCometShuffleMemorySizeInMiB(sc.getConf)
-        }
+      // we should never reach this code in off-heap mode due to earlier check
+      // in `shouldOverrideMemoryConf`
+      assert(!CometSparkSessionExtensions.isOffHeapEnabled(sc.getConf))
+      val cometMemOverhead = 
CometSparkSessionExtensions.getCometMemoryOverheadInMiB(sc.getConf)
       sc.conf.set(EXECUTOR_MEMORY_OVERHEAD.key, s"${execMemOverhead + 
cometMemOverhead}M")
       val newExecMemOverhead = 
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
 
@@ -98,7 +95,7 @@ class CometDriverPlugin extends DriverPlugin with Logging 
with ShimCometDriverPl
   /**
    * Whether we should override Spark memory configuration for Comet. This 
only returns true when
    * Comet native execution is enabled and/or Comet shuffle is enabled and 
Comet doesn't use
-   * unified memory manager.
+   * off-heap mode (unified memory manager).
    */
   private def shouldOverrideMemoryConf(conf: SparkConf): Boolean = {
     val cometEnabled =
@@ -109,9 +106,9 @@ class CometDriverPlugin extends DriverPlugin with Logging 
with ShimCometDriverPl
     val cometExec = conf.getBoolean(
       CometConf.COMET_EXEC_ENABLED.key,
       CometConf.COMET_EXEC_ENABLED.defaultValue.get)
-    val unifiedMemory = 
CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf)
+    val offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf)
 
-    cometEnabled && (cometExecShuffle || cometExec) && !unifiedMemory
+    cometEnabled && (cometExecShuffle || cometExec) && !offHeapMode
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to