This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 382ac938f docs: various improvements to tuning guide (#1525)
382ac938f is described below
commit 382ac938f2d10666eefc08ec5c1c82025ddf3726
Author: Andy Grove <[email protected]>
AuthorDate: Wed Mar 19 19:28:20 2025 -0600
docs: various improvements to tuning guide (#1525)
---
.../main/scala/org/apache/comet/CometConf.scala | 56 +++-----
docs/source/user-guide/configs.md | 6 +-
docs/source/user-guide/tuning.md | 156 ++++++++++++++-------
native/core/src/execution/jni_api.rs | 15 +-
.../comet/CometBoundedShuffleMemoryAllocator.java | 10 +-
.../scala/org/apache/comet/CometExecIterator.scala | 11 +-
.../apache/comet/CometSparkSessionExtensions.scala | 11 +-
spark/src/main/scala/org/apache/comet/Native.scala | 10 +-
.../src/main/scala/org/apache/spark/Plugins.scala | 17 +--
9 files changed, 170 insertions(+), 122 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 33039b744..f354f3d15 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -235,27 +235,28 @@ object CometConf extends ShimCometConf {
val COMET_MEMORY_OVERHEAD: OptionalConfigEntry[Long] =
conf("spark.comet.memoryOverhead")
.doc(
- "The amount of additional memory to be allocated per executor process
for Comet, in MiB. " +
+ "The amount of additional memory to be allocated per executor process
for Comet, in MiB, " +
+ "when running in on-heap mode or when using the `fair_unified` pool in
off-heap mode. " +
"This config is optional. If this is not specified, it will be set to
" +
- "`spark.comet.memory.overhead.factor` * `spark.executor.memory`. " +
- "This is memory that accounts for things like Comet native execution,
Comet shuffle, etc.")
+ s"`spark.comet.memory.overhead.factor` * `spark.executor.memory`.
$TUNING_GUIDE.")
.bytesConf(ByteUnit.MiB)
.createOptional
- val COMET_MEMORY_OVERHEAD_FACTOR: ConfigEntry[Double] = conf(
- "spark.comet.memory.overhead.factor")
- .doc(
- "Fraction of executor memory to be allocated as additional non-heap
memory per executor " +
- "process for Comet.")
- .doubleConf
- .checkValue(
- factor => factor > 0,
- "Ensure that Comet memory overhead factor is a double greater than 0")
- .createWithDefault(0.2)
+ val COMET_MEMORY_OVERHEAD_FACTOR: ConfigEntry[Double] =
+ conf("spark.comet.memory.overhead.factor")
+ .doc("Fraction of executor memory to be allocated as additional memory
for Comet " +
+ "when running in on-heap mode or when using the `fair_unified` pool in
off-heap mode. " +
+ s"$TUNING_GUIDE.")
+ .doubleConf
+ .checkValue(
+ factor => factor > 0,
+ "Ensure that Comet memory overhead factor is a double greater than 0")
+ .createWithDefault(0.2)
val COMET_MEMORY_OVERHEAD_MIN_MIB: ConfigEntry[Long] =
conf("spark.comet.memory.overhead.min")
.doc("Minimum amount of additional memory to be allocated per executor
process for Comet, " +
- "in MiB.")
+ "in MiB, when running in on-heap mode or when using the `fair_unified`
pool in off-heap " +
+ s"mode. $TUNING_GUIDE.")
.bytesConf(ByteUnit.MiB)
.checkValue(
_ >= 0,
@@ -274,11 +275,10 @@ object CometConf extends ShimCometConf {
.createWithDefault(true)
val COMET_SHUFFLE_MODE: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode")
- .doc("The mode of Comet shuffle. This config is only effective if Comet
shuffle " +
- "is enabled. Available modes are 'native', 'jvm', and 'auto'. " +
- "'native' is for native shuffle which has best performance in general. "
+
- "'jvm' is for jvm-based columnar shuffle which has higher coverage than
native shuffle. " +
- "'auto' is for Comet to choose the best shuffle mode based on the query
plan.")
+ .doc(
+ "This is test config to allow tests to force a particular shuffle
implementation to be " +
+ "used. Valid values are `jvm` for Columnar Shuffle, `native` for
Native Shuffle, " +
+ s"and `auto` to pick the best supported option (`native` has
priority). $TUNING_GUIDE.")
.internal()
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
@@ -378,26 +378,16 @@ object CometConf extends ShimCometConf {
val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] =
conf("spark.comet.columnar.shuffle.memorySize")
.internal()
- .doc(
- "Test-only config. This is only used to test Comet shuffle with Spark
tests. " +
- "The optional maximum size of the memory used for Comet columnar
shuffle, in MiB. " +
- "Note that this config is only used when
`spark.comet.exec.shuffle.mode` is " +
- "`jvm`. Once allocated memory size reaches this config, the current
batch will be " +
- "flushed to disk immediately. If this is not configured, Comet will
use " +
- "`spark.comet.shuffle.memory.factor` * `spark.comet.memoryOverhead`
as " +
- "shuffle memory size. If final calculated value is larger than Comet
memory " +
- "overhead, Comet will use Comet memory overhead as shuffle memory
size.")
+ .doc("Amount of memory to reserve for columnar shuffle when running in
on-heap mode. " +
+ s"$TUNING_GUIDE.")
.bytesConf(ByteUnit.MiB)
.createOptional
val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
conf("spark.comet.columnar.shuffle.memory.factor")
.internal()
- .doc(
- "Test-only config. This is only used to test Comet shuffle with Spark
tests. " +
- "Fraction of Comet memory to be allocated per executor process for
Comet shuffle. " +
- "Comet memory size is specified by `spark.comet.memoryOverhead` or "
+
- "calculated by `spark.comet.memory.overhead.factor` *
`spark.executor.memory`.")
+ .doc("Fraction of Comet memory to be allocated per executor process for
columnar shuffle " +
+ s"when running in on-heap mode. $TUNING_GUIDE.")
.doubleConf
.checkValue(
factor => factor > 0,
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index a208d211e..32c6a3db8 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -71,9 +71,9 @@ Comet provides the following configuration settings.
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet
will provide a verbose tree representation of the extended information. | false
|
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet
will provide logging explaining the reason(s) why a query stage cannot be
executed natively. Set this to false to reduce the amount of logging. | false |
| spark.comet.expression.allowIncompatible | Comet is not currently fully
compatible with Spark for all expressions. Set this config to true to allow
them anyway. For more information, refer to the Comet Compatibility Guide
(https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
-| spark.comet.memory.overhead.factor | Fraction of executor memory to be
allocated as additional non-heap memory per executor process for Comet. | 0.2 |
-| spark.comet.memory.overhead.min | Minimum amount of additional memory to be
allocated per executor process for Comet, in MiB. | 402653184b |
-| spark.comet.memoryOverhead | The amount of additional memory to be allocated
per executor process for Comet, in MiB. This config is optional. If this is not
specified, it will be set to `spark.comet.memory.overhead.factor` *
`spark.executor.memory`. This is memory that accounts for things like Comet
native execution, Comet shuffle, etc. | |
+| spark.comet.memory.overhead.factor | Fraction of executor memory to be
allocated as additional memory for Comet when running in on-heap mode or when
using the `fair_unified` pool in off-heap mode. For more information, refer to
the Comet Tuning Guide
(https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 |
+| spark.comet.memory.overhead.min | Minimum amount of additional memory to be
allocated per executor process for Comet, in MiB, when running in on-heap mode
or when using the `fair_unified` pool in off-heap mode. For more information,
refer to the Comet Tuning Guide
(https://datafusion.apache.org/comet/user-guide/tuning.html). | 402653184b |
+| spark.comet.memoryOverhead | The amount of additional memory to be allocated
per executor process for Comet, in MiB, when running in on-heap mode or when
using the `fair_unified` pool in off-heap mode. This config is optional. If
this is not specified, it will be set to `spark.comet.memory.overhead.factor` *
`spark.executor.memory`. For more information, refer to the Comet Tuning Guide
(https://datafusion.apache.org/comet/user-guide/tuning.html). | |
| spark.comet.metrics.updateInterval | The interval in milliseconds to update
metrics. If interval is negative, metrics will be updated upon task completion.
| 3000 |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to
load successfully when Comet is enabled. If not, Comet will silently fallback
to Spark when it fails to load the native lib. Otherwise, an error will be
thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte
buffer when reading Parquet. | false |
diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md
index a57bb3f80..1a17f4ccc 100644
--- a/docs/source/user-guide/tuning.md
+++ b/docs/source/user-guide/tuning.md
@@ -17,18 +17,96 @@ specific language governing permissions and limitations
under the License.
-->
-# Tuning Guide
+# Comet Tuning Guide
Comet provides some tuning options to help you get the best performance from
your queries.
## Memory Tuning
-### Unified Memory Management with Off-Heap Memory
+It is necessary to specify how much memory Comet can use in addition to memory
already allocated to Spark. In some
+cases, it may be possible to reduce the amount of memory allocated to Spark so
that overall memory allocation is
+the same or lower than the original configuration. In other cases, enabling
Comet may require allocating more memory
+than before. See the [Determining How Much Memory to Allocate] section for
more details.
-The recommended way to share memory between Spark and Comet is to set
`spark.memory.offHeap.enabled=true`. This allows
-Comet to share an off-heap memory pool with Spark. The size of the pool is
specified by `spark.memory.offHeap.size`. For more details about Spark off-heap
memory mode, please refer to Spark documentation:
https://spark.apache.org/docs/latest/configuration.html.
+[Determining How Much Memory to Allocate]:
#determining-how-much-memory-to-allocate
-The type of pool can be specified with `spark.comet.exec.memoryPool`.
+Comet supports Spark's on-heap (the default) and off-heap mode for allocating
memory. However, we strongly recommend
+using off-heap mode. Comet has some limitations when running in on-heap mode,
such as requiring more memory overall,
+and requiring shuffle memory to be separately configured.
+
+### Configuring Comet Memory in Off-Heap Mode
+
+The recommended way to allocate memory for Comet is to set
`spark.memory.offHeap.enabled=true`. This allows
+Comet to share an off-heap memory pool with Spark, reducing the overall memory
overhead. The size of the pool is
+specified by `spark.memory.offHeap.size`. For more details about Spark
off-heap memory mode, please refer to
+Spark documentation: https://spark.apache.org/docs/latest/configuration.html.
+
+### Configuring Comet Memory in On-Heap Mode
+
+When running in on-heap mode, Comet memory can be allocated by setting
`spark.comet.memoryOverhead`. If this setting
+is not provided, it will be calculated by multiplying the current Spark
executor memory by
+`spark.comet.memory.overhead.factor` (default value is `0.2`) which may or may
not result in enough memory for
+Comet to operate. It is not recommended to rely on this behavior. It is better
to specify `spark.comet.memoryOverhead`
+explicitly.
+
+Comet supports native shuffle and columnar shuffle (these terms are explained
in the [shuffle] section below).
+In on-heap mode, columnar shuffle memory must be separately allocated using
`spark.comet.columnar.shuffle.memorySize`.
+If this setting is not provided, it will be calculated by multiplying
`spark.comet.memoryOverhead` by
+`spark.comet.columnar.shuffle.memory.factor` (default value is `1.0`). If a
shuffle exceeds this amount of memory
+then the query will fail.
+
+[shuffle]: #shuffle
+
+### Determining How Much Memory to Allocate
+
+Generally, increasing the amount of memory allocated to Comet will improve
query performance by reducing the
+amount of time spent spilling to disk, especially for aggregate, join, and
shuffle operations. Allocating insufficient
+memory can result in out-of-memory errors. This is no different from
allocating memory in Spark and the amount of
+memory will vary for different workloads, so some experimentation will be
required.
+
+Here is a real-world example, based on running benchmarks derived from TPC-H,
running on a single executor against
+local Parquet files using the 100 GB data set.
+
+Baseline Spark Performance
+
+- Spark completes the benchmark in 632 seconds with 8 cores and 8 GB RAM
+- With less than 8 GB RAM, performance degrades due to spilling
+- Spark can complete the benchmark with as little as 3 GB of RAM, but with
worse performance (744 seconds)
+
+Comet Performance
+
+- Comet requires at least 5 GB of RAM in off-heap mode and 6 GB RAM in on-heap
mode, but performance at this level
+ is around 340 seconds, which is significantly faster than Spark with any
amount of RAM
+- Comet running in off-heap with 8 cores completes the benchmark in 295
seconds, more than 2x faster than Spark
+- It is worth noting that running Comet with only 4 cores and 4 GB RAM
completes the benchmark in 520 seconds,
+ providing better performance than Spark for half the resource
+
+It may be possible to reduce Comet's memory overhead by reducing batch sizes
or increasing number of partitions.
+
+### SortExec
+
+Comet's SortExec implementation spills to disk when under memory pressure, but
there are some known issues in the
+underlying DataFusion SortExec implementation that could cause out-of-memory
errors during spilling. See
+https://github.com/apache/datafusion/issues/14692 for more information.
+
+Workarounds for this problem include:
+
+- Allocating more off-heap memory
+- Disabling native sort by setting `spark.comet.exec.sort.enabled=false`
+
+## Advanced Memory Tuning
+
+### Configuring spark.executor.memoryOverhead in On-Heap Mode
+
+In some environments, such as Kubernetes and YARN, it is important to
correctly set `spark.executor.memoryOverhead` so
+that it is possible to allocate off-heap memory when running in on-heap mode.
+
+Comet will automatically set `spark.executor.memoryOverhead` based on the
`spark.comet.memory*` settings so that
+resource managers respect Apache Spark memory configuration before starting
the containers.
+
+### Configuring Off-Heap Memory Pools
+
+Comet implements multiple memory pool implementations. The type of pool can be
specified with `spark.comet.exec.memoryPool`.
The valid pool types are:
@@ -36,22 +114,24 @@ The valid pool types are:
- `fair_unified`
The `unified` pool type implements a greedy first-come first-serve limit. This
pool works well for queries that do not
-need to spill or have a single spillable operator.
+need to spill or have a single spillable operator. The size of the pool is
specified by `spark.memory.offHeap.size`
+and the pool interacts with Spark's memory pool, effectively sharing the
off-heap memory between Spark and Comet. This
+approach is sometimes referred to as unified memory management.
The `fair_unified` pool type prevents operators from using more than an even
fraction of the available memory
(i.e. `pool_size / num_reservations`). This pool works best when you know
beforehand
the query has multiple operators that will likely all need to spill. Sometimes
it will cause spills even
when there is sufficient memory in order to leave enough memory for other
operators.
-### Dedicated Comet Memory Pools
+The pool size configuration for the `fair_unified` pool, is a little more
complex. The total pool size is computed by
+multiplying `spark.memory.offHeap.size` by
`spark.comet.memory.overhead.factor` with the minimum amount being
+`spark.comet.memory.overhead.min`. It is also possible to manually specify
`spark.comet.memoryOverhead` instead to
+override this default behavior. Note that the `fair_unified` pool does not use
unified memory management to interact
+with Spark's memory pools, which is why the allocation defaults to a fraction
of off-heap memory.
-Spark uses on-heap memory mode by default, i.e., the
`spark.memory.offHeap.enabled` setting is not enabled. If Spark is under
on-heap memory mode, Comet will use its own dedicated memory pools that
-are not shared with Spark. This requires additional configuration settings to
be specified to set the size and type of
-memory pool to use.
+### Configuring On-Heap Memory Pools
-The size of the pool can be set explicitly with `spark.comet.memoryOverhead`.
If this setting is not specified then
-the memory overhead will be calculated by multiplying the executor memory by
`spark.comet.memory.overhead.factor`
-(defaults to `0.2`).
+When running in on-heap mode, Comet will use its own dedicated memory pools
that are not shared with Spark.
The type of pool can be specified with `spark.comet.exec.memoryPool`. The
default setting is `greedy_task_shared`.
@@ -69,7 +149,7 @@ Pool types ending with `_global` use a single global memory
pool between all tas
Pool types ending with `_task_shared` share a single memory pool across all
attempts for a single task.
-Other pool types create a dedicated pool per native query plan using a
fraction of the available pool size based on number of cores
+Other pool types create a dedicated pool per native query plan using a
fraction of the available pool size based on number of cores
and cores per task.
The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements
a greedy first-come first-serve limit. This
@@ -91,28 +171,6 @@ adjusting how much memory to allocate.
[FairSpillPool]:
https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html
[UnboundedMemoryPool]:
https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.UnboundedMemoryPool.html
-
-### Determining How Much Memory to Allocate
-
-Generally, increasing memory overhead will improve query performance,
especially for queries containing joins and
-aggregates.
-
-Once a memory pool is exhausted, the native plan will start spilling to disk,
which will slow down the query.
-
-Insufficient memory allocation can also lead to out-of-memory (OOM) errors.
-
-## Configuring spark.executor.memoryOverhead
-
-In some environments, such as Kubernetes and YARN, it is important to
correctly set `spark.executor.memoryOverhead` so
-that it is possible to allocate off-heap memory.
-
-Comet will automatically set `spark.executor.memoryOverhead` based on the
`spark.comet.memory*` settings so that
-resource managers respect Apache Spark memory configuration before starting
the containers.
-
-Note that there is currently a known issue where this will be inaccurate when
using Native Memory Management because it
-does not take executor concurrency into account. The tracking issue for this is
-https://github.com/apache/datafusion-comet/issues/949.
-
## Optimizing Joins
Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability
reasons. If the build-side of a
@@ -141,30 +199,22 @@ It must be set before the Spark context is created. You
can enable or disable Co
at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`.
Once it is disabled, Comet will fall back to the default Spark shuffle manager.
-### Shuffle Mode
+### Shuffle Implementations
-Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and Auto
Mode.
+Comet provides two shuffle implementations: Native Shuffle and Columnar
Shuffle. Comet will first try to use Native
+Shuffle and if that is not possible it will try to use Columnar Shuffle. If
neither can be applied, it will fall
+back to Spark for shuffle operations.
-#### Auto Mode
+#### Native Shuffle
-`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best
shuffle mode based on the query plan. This
-is the default.
+Comet provides a fully native shuffle implementation, which generally provides
the best performance. However,
+native shuffle currently only supports `HashPartitioning` and
`SinglePartitioning` and has some restrictions on
+supported data types.
#### Columnar (JVM) Shuffle
Comet Columnar shuffle is JVM-based and supports `HashPartitioning`,
`RoundRobinPartitioning`, `RangePartitioning`, and
-`SinglePartitioning`. This mode has the highest query coverage.
-
-Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to
`jvm`. If this mode is explicitly set,
-then any shuffle operations that cannot be supported in this mode will fall
back to Spark.
-
-#### Native Shuffle
-
-Comet also provides a fully native shuffle implementation, which generally
provides the best performance. However,
-native shuffle currently only supports `HashPartitioning` and
`SinglePartitioning`.
-
-To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If
this mode is explicitly set,
-then any shuffle operations that cannot be supported in this mode will fall
back to Spark.
+`SinglePartitioning`. This shuffle implementation supports more data types
than native shuffle.
### Shuffle Compression
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index aeb3e3efd..0aa1b113d 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -168,7 +168,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
metrics_update_interval: jlong,
comet_task_memory_manager_obj: JObject,
batch_size: jint,
- use_unified_memory_manager: jboolean,
+ off_heap_mode: jboolean,
memory_pool_type: jstring,
memory_limit: jlong,
memory_limit_per_task: jlong,
@@ -213,7 +213,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
let memory_pool_type =
env.get_string(&JString::from_raw(memory_pool_type))?.into();
let memory_pool_config = parse_memory_pool_config(
- use_unified_memory_manager != JNI_FALSE,
+ off_heap_mode != JNI_FALSE,
memory_pool_type,
memory_limit,
memory_limit_per_task,
@@ -293,16 +293,21 @@ fn prepare_datafusion_session_context(
}
fn parse_memory_pool_config(
- use_unified_memory_manager: bool,
+ off_heap_mode: bool,
memory_pool_type: String,
memory_limit: i64,
memory_limit_per_task: i64,
) -> CometResult<MemoryPoolConfig> {
let pool_size = memory_limit as usize;
- let memory_pool_config = if use_unified_memory_manager {
+ let memory_pool_config = if off_heap_mode {
match memory_pool_type.as_str() {
"fair_unified" =>
MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size),
- _ => MemoryPoolConfig::new(MemoryPoolType::Unified, 0),
+ _ => {
+ // the Unified memory pool interacts with Spark's memory pool
to allocate
+ // memory therefore does not need a size to be explicitly set.
The pool size
+ // shared with Spark is set by `spark.memory.offHeap.size`.
+ MemoryPoolConfig::new(MemoryPoolType::Unified, 0)
+ }
}
} else {
// Use the memory pool from DF
diff --git
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
index bd6782a5d..051b1c6fa 100644
---
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
+++
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
@@ -43,10 +43,12 @@ import org.apache.comet.CometSparkSessionExtensions$;
* <p>Some methods are copied from
`org.apache.spark.unsafe.memory.TaskMemoryManager` with
* modifications. Most modifications are to remove the dependency on the
configured memory mode.
*
- * <p>This allocator is test-only and should not be used in production. It is
used to test Comet JVM
- * shuffle and execution with Spark tests which basically require on-heap
memory configuration.
- * Thus, this allocator is used to allocate separate off-heap memory
allocation for Comet JVM
- * shuffle and execution apart from Spark's on-heap memory configuration.
+ * <p>This allocator is only used by Comet Columnar Shuffle when running in
on-heap mode. It is used
+ * when users run in on-heap mode as well as in the Spark tests which require
on-heap memory
+ * configuration.
+ *
+ * <p>Thus, this allocator is used to allocate separate off-heap memory
allocation for Comet
+ * Columnar Shuffle and execution apart from Spark's on-heap memory
configuration.
*/
public final class CometBoundedShuffleMemoryAllocator extends
CometShuffleMemoryAllocatorTrait {
private final UnsafeMemoryAllocator allocator = new UnsafeMemoryAllocator();
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index f88a89d98..e4befefa6 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -75,12 +75,11 @@ class CometExecIterator(
metricsUpdateInterval = COMET_METRICS_UPDATE_INTERVAL.get(),
new CometTaskMemoryManager(id),
batchSize = COMET_BATCH_SIZE.get(),
- use_unified_memory_manager =
- CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf),
- memory_pool_type = COMET_EXEC_MEMORY_POOL_TYPE.get(),
- memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
- memory_limit_per_task = getMemoryLimitPerTask(conf),
- task_attempt_id = TaskContext.get().taskAttemptId,
+ offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf),
+ memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),
+ memoryLimit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
+ memoryLimitPerTask = getMemoryLimitPerTask(conf),
+ taskAttemptId = TaskContext.get().taskAttemptId,
debug = COMET_DEBUG_ENABLED.get(),
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
workerThreads = COMET_WORKER_THREADS.get(),
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 105144e55..a669b36eb 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -1336,7 +1336,7 @@ object CometSparkSessionExtensions extends Logging {
/** Calculates required memory overhead in MB per executor process for
Comet. */
def getCometMemoryOverheadInMiB(sparkConf: SparkConf): Long = {
- val baseMemoryMiB = if (cometUnifiedMemoryManagerEnabled(sparkConf)) {
+ val baseMemoryMiB = if (isOffHeapEnabled(sparkConf)) {
ConfigHelpers
.byteFromString(sparkConf.get("spark.memory.offHeap.size"),
ByteUnit.MiB)
} else {
@@ -1367,8 +1367,13 @@ object CometSparkSessionExtensions extends Logging {
ByteUnit.MiB.toBytes(getCometMemoryOverheadInMiB(sparkConf))
}
- /** Calculates required shuffle memory size in bytes per executor process
for Comet. */
+ /**
+ * Calculates required shuffle memory size in bytes per executor process for
Comet when running
+ * in on-heap mode.
+ */
def getCometShuffleMemorySize(sparkConf: SparkConf, conf: SQLConf =
SQLConf.get): Long = {
+ assert(!isOffHeapEnabled(sparkConf))
+
val cometMemoryOverhead = getCometMemoryOverheadInMiB(sparkConf)
val overheadFactor = COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.get(conf)
@@ -1391,7 +1396,7 @@ object CometSparkSessionExtensions extends Logging {
ByteUnit.BYTE.toMiB(getCometShuffleMemorySize(sparkConf, conf))
}
- def cometUnifiedMemoryManagerEnabled(sparkConf: SparkConf): Boolean = {
+ def isOffHeapEnabled(sparkConf: SparkConf): Boolean = {
sparkConf.getBoolean("spark.memory.offHeap.enabled", false)
}
diff --git a/spark/src/main/scala/org/apache/comet/Native.scala
b/spark/src/main/scala/org/apache/comet/Native.scala
index 2be0bf006..e466b2f4d 100644
--- a/spark/src/main/scala/org/apache/comet/Native.scala
+++ b/spark/src/main/scala/org/apache/comet/Native.scala
@@ -59,11 +59,11 @@ class Native extends NativeBase {
metricsUpdateInterval: Long,
taskMemoryManager: CometTaskMemoryManager,
batchSize: Int,
- use_unified_memory_manager: Boolean,
- memory_pool_type: String,
- memory_limit: Long,
- memory_limit_per_task: Long,
- task_attempt_id: Long,
+ offHeapMode: Boolean,
+ memoryPoolType: String,
+ memoryLimit: Long,
+ memoryLimitPerTask: Long,
+ taskAttemptId: Long,
debug: Boolean,
explain: Boolean,
workerThreads: Int,
diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala
b/spark/src/main/scala/org/apache/spark/Plugins.scala
index 835fa515b..64426c716 100644
--- a/spark/src/main/scala/org/apache/spark/Plugins.scala
+++ b/spark/src/main/scala/org/apache/spark/Plugins.scala
@@ -63,13 +63,10 @@ class CometDriverPlugin extends DriverPlugin with Logging
with ShimCometDriverPl
Math.max((executorMemory * memoryOverheadFactor).toLong,
memoryOverheadMinMib)
}
- val cometMemOverhead =
- if
(!CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(sc.getConf)) {
- CometSparkSessionExtensions.getCometMemoryOverheadInMiB(sc.getConf)
- } else {
- // comet shuffle unified memory manager is disabled, so we need to
add overhead memory
-
CometSparkSessionExtensions.getCometShuffleMemorySizeInMiB(sc.getConf)
- }
+ // we should never reach this code in off-heap mode due to earlier check
+ // in `shouldOverrideMemoryConf`
+ assert(!CometSparkSessionExtensions.isOffHeapEnabled(sc.getConf))
+ val cometMemOverhead =
CometSparkSessionExtensions.getCometMemoryOverheadInMiB(sc.getConf)
sc.conf.set(EXECUTOR_MEMORY_OVERHEAD.key, s"${execMemOverhead +
cometMemOverhead}M")
val newExecMemOverhead =
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
@@ -98,7 +95,7 @@ class CometDriverPlugin extends DriverPlugin with Logging
with ShimCometDriverPl
/**
* Whether we should override Spark memory configuration for Comet. This
only returns true when
* Comet native execution is enabled and/or Comet shuffle is enabled and
Comet doesn't use
- * unified memory manager.
+ * off-heap mode (unified memory manager).
*/
private def shouldOverrideMemoryConf(conf: SparkConf): Boolean = {
val cometEnabled =
@@ -109,9 +106,9 @@ class CometDriverPlugin extends DriverPlugin with Logging
with ShimCometDriverPl
val cometExec = conf.getBoolean(
CometConf.COMET_EXEC_ENABLED.key,
CometConf.COMET_EXEC_ENABLED.defaultValue.get)
- val unifiedMemory =
CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf)
+ val offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf)
- cometEnabled && (cometExecShuffle || cometExec) && !unifiedMemory
+ cometEnabled && (cometExecShuffle || cometExec) && !offHeapMode
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]