This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/asf-site by this push:
new bc52e95d1 Publish built docs triggered by
382ac938f2d10666eefc08ec5c1c82025ddf3726
bc52e95d1 is described below
commit bc52e95d1eb7a790e007291365492042dd106b84
Author: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 20 01:28:54 2025 +0000
Publish built docs triggered by 382ac938f2d10666eefc08ec5c1c82025ddf3726
---
_sources/user-guide/configs.md.txt | 6 +-
_sources/user-guide/tuning.md.txt | 156 +++++++++++++++++++----------
objects.inv | Bin 785 -> 786 bytes
searchindex.js | 2 +-
user-guide/compatibility.html | 4 +-
user-guide/configs.html | 6 +-
user-guide/metrics.html | 4 +-
user-guide/tuning.html | 198 ++++++++++++++++++++++++-------------
8 files changed, 246 insertions(+), 130 deletions(-)
diff --git a/_sources/user-guide/configs.md.txt
b/_sources/user-guide/configs.md.txt
index a208d211e..32c6a3db8 100644
--- a/_sources/user-guide/configs.md.txt
+++ b/_sources/user-guide/configs.md.txt
@@ -71,9 +71,9 @@ Comet provides the following configuration settings.
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet
will provide a verbose tree representation of the extended information. | false
|
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet
will provide logging explaining the reason(s) why a query stage cannot be
executed natively. Set this to false to reduce the amount of logging. | false |
| spark.comet.expression.allowIncompatible | Comet is not currently fully
compatible with Spark for all expressions. Set this config to true to allow
them anyway. For more information, refer to the Comet Compatibility Guide
(https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
-| spark.comet.memory.overhead.factor | Fraction of executor memory to be
allocated as additional non-heap memory per executor process for Comet. | 0.2 |
-| spark.comet.memory.overhead.min | Minimum amount of additional memory to be
allocated per executor process for Comet, in MiB. | 402653184b |
-| spark.comet.memoryOverhead | The amount of additional memory to be allocated
per executor process for Comet, in MiB. This config is optional. If this is not
specified, it will be set to `spark.comet.memory.overhead.factor` *
`spark.executor.memory`. This is memory that accounts for things like Comet
native execution, Comet shuffle, etc. | |
+| spark.comet.memory.overhead.factor | Fraction of executor memory to be
allocated as additional memory for Comet when running in on-heap mode or when
using the `fair_unified` pool in off-heap mode. For more information, refer to
the Comet Tuning Guide
(https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 |
+| spark.comet.memory.overhead.min | Minimum amount of additional memory to be
allocated per executor process for Comet, in MiB, when running in on-heap mode
or when using the `fair_unified` pool in off-heap mode. For more information,
refer to the Comet Tuning Guide
(https://datafusion.apache.org/comet/user-guide/tuning.html). | 402653184b |
+| spark.comet.memoryOverhead | The amount of additional memory to be allocated
per executor process for Comet, in MiB, when running in on-heap mode or when
using the `fair_unified` pool in off-heap mode. This config is optional. If
this is not specified, it will be set to `spark.comet.memory.overhead.factor` *
`spark.executor.memory`. For more information, refer to the Comet Tuning Guide
(https://datafusion.apache.org/comet/user-guide/tuning.html). | |
| spark.comet.metrics.updateInterval | The interval in milliseconds to update
metrics. If interval is negative, metrics will be updated upon task completion.
| 3000 |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to
load successfully when Comet is enabled. If not, Comet will silently fallback
to Spark when it fails to load the native lib. Otherwise, an error will be
thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte
buffer when reading Parquet. | false |
diff --git a/_sources/user-guide/tuning.md.txt
b/_sources/user-guide/tuning.md.txt
index a57bb3f80..1a17f4ccc 100644
--- a/_sources/user-guide/tuning.md.txt
+++ b/_sources/user-guide/tuning.md.txt
@@ -17,18 +17,96 @@ specific language governing permissions and limitations
under the License.
-->
-# Tuning Guide
+# Comet Tuning Guide
Comet provides some tuning options to help you get the best performance from
your queries.
## Memory Tuning
-### Unified Memory Management with Off-Heap Memory
+It is necessary to specify how much memory Comet can use in addition to memory
already allocated to Spark. In some
+cases, it may be possible to reduce the amount of memory allocated to Spark so
that overall memory allocation is
+the same or lower than the original configuration. In other cases, enabling
Comet may require allocating more memory
+than before. See the [Determining How Much Memory to Allocate] section for
more details.
-The recommended way to share memory between Spark and Comet is to set
`spark.memory.offHeap.enabled=true`. This allows
-Comet to share an off-heap memory pool with Spark. The size of the pool is
specified by `spark.memory.offHeap.size`. For more details about Spark off-heap
memory mode, please refer to Spark documentation:
https://spark.apache.org/docs/latest/configuration.html.
+[Determining How Much Memory to Allocate]:
#determining-how-much-memory-to-allocate
-The type of pool can be specified with `spark.comet.exec.memoryPool`.
+Comet supports Spark's on-heap (the default) and off-heap mode for allocating
memory. However, we strongly recommend
+using off-heap mode. Comet has some limitations when running in on-heap mode,
such as requiring more memory overall,
+and requiring shuffle memory to be separately configured.
+
+### Configuring Comet Memory in Off-Heap Mode
+
+The recommended way to allocate memory for Comet is to set
`spark.memory.offHeap.enabled=true`. This allows
+Comet to share an off-heap memory pool with Spark, reducing the overall memory
overhead. The size of the pool is
+specified by `spark.memory.offHeap.size`. For more details about Spark
off-heap memory mode, please refer to
+Spark documentation: https://spark.apache.org/docs/latest/configuration.html.
+
+### Configuring Comet Memory in On-Heap Mode
+
+When running in on-heap mode, Comet memory can be allocated by setting
`spark.comet.memoryOverhead`. If this setting
+is not provided, it will be calculated by multiplying the current Spark
executor memory by
+`spark.comet.memory.overhead.factor` (default value is `0.2`) which may or may
not result in enough memory for
+Comet to operate. It is not recommended to rely on this behavior. It is better
to specify `spark.comet.memoryOverhead`
+explicitly.
+
+Comet supports native shuffle and columnar shuffle (these terms are explained
in the [shuffle] section below).
+In on-heap mode, columnar shuffle memory must be separately allocated using
`spark.comet.columnar.shuffle.memorySize`.
+If this setting is not provided, it will be calculated by multiplying
`spark.comet.memoryOverhead` by
+`spark.comet.columnar.shuffle.memory.factor` (default value is `1.0`). If a
shuffle exceeds this amount of memory
+then the query will fail.
+
+[shuffle]: #shuffle
+
+### Determining How Much Memory to Allocate
+
+Generally, increasing the amount of memory allocated to Comet will improve
query performance by reducing the
+amount of time spent spilling to disk, especially for aggregate, join, and
shuffle operations. Allocating insufficient
+memory can result in out-of-memory errors. This is no different from
allocating memory in Spark and the amount of
+memory will vary for different workloads, so some experimentation will be
required.
+
+Here is a real-world example, based on running benchmarks derived from TPC-H,
running on a single executor against
+local Parquet files using the 100 GB data set.
+
+Baseline Spark Performance
+
+- Spark completes the benchmark in 632 seconds with 8 cores and 8 GB RAM
+- With less than 8 GB RAM, performance degrades due to spilling
+- Spark can complete the benchmark with as little as 3 GB of RAM, but with
worse performance (744 seconds)
+
+Comet Performance
+
+- Comet requires at least 5 GB of RAM in off-heap mode and 6 GB RAM in on-heap
mode, but performance at this level
+ is around 340 seconds, which is significantly faster than Spark with any
amount of RAM
+- Comet running in off-heap with 8 cores completes the benchmark in 295
seconds, more than 2x faster than Spark
+- It is worth noting that running Comet with only 4 cores and 4 GB RAM
completes the benchmark in 520 seconds,
+ providing better performance than Spark for half the resource
+
+It may be possible to reduce Comet's memory overhead by reducing batch sizes
or increasing number of partitions.
+
+### SortExec
+
+Comet's SortExec implementation spills to disk when under memory pressure, but
there are some known issues in the
+underlying DataFusion SortExec implementation that could cause out-of-memory
errors during spilling. See
+https://github.com/apache/datafusion/issues/14692 for more information.
+
+Workarounds for this problem include:
+
+- Allocating more off-heap memory
+- Disabling native sort by setting `spark.comet.exec.sort.enabled=false`
+
+## Advanced Memory Tuning
+
+### Configuring spark.executor.memoryOverhead in On-Heap Mode
+
+In some environments, such as Kubernetes and YARN, it is important to
correctly set `spark.executor.memoryOverhead` so
+that it is possible to allocate off-heap memory when running in on-heap mode.
+
+Comet will automatically set `spark.executor.memoryOverhead` based on the
`spark.comet.memory*` settings so that
+resource managers respect Apache Spark memory configuration before starting
the containers.
+
+### Configuring Off-Heap Memory Pools
+
+Comet implements multiple memory pool implementations. The type of pool can be
specified with `spark.comet.exec.memoryPool`.
The valid pool types are:
@@ -36,22 +114,24 @@ The valid pool types are:
- `fair_unified`
The `unified` pool type implements a greedy first-come first-serve limit. This
pool works well for queries that do not
-need to spill or have a single spillable operator.
+need to spill or have a single spillable operator. The size of the pool is
specified by `spark.memory.offHeap.size`
+and the pool interacts with Spark's memory pool, effectively sharing the
off-heap memory between Spark and Comet. This
+approach is sometimes referred to as unified memory management.
The `fair_unified` pool type prevents operators from using more than an even
fraction of the available memory
(i.e. `pool_size / num_reservations`). This pool works best when you know
beforehand
the query has multiple operators that will likely all need to spill. Sometimes
it will cause spills even
when there is sufficient memory in order to leave enough memory for other
operators.
-### Dedicated Comet Memory Pools
+The pool size configuration for the `fair_unified` pool, is a little more
complex. The total pool size is computed by
+multiplying `spark.memory.offHeap.size` by
`spark.comet.memory.overhead.factor` with the minimum amount being
+`spark.comet.memory.overhead.min`. It is also possible to manually specify
`spark.comet.memoryOverhead` instead to
+override this default behavior. Note that the `fair_unified` pool does not use
unified memory management to interact
+with Spark's memory pools, which is why the allocation defaults to a fraction
of off-heap memory.
-Spark uses on-heap memory mode by default, i.e., the
`spark.memory.offHeap.enabled` setting is not enabled. If Spark is under
on-heap memory mode, Comet will use its own dedicated memory pools that
-are not shared with Spark. This requires additional configuration settings to
be specified to set the size and type of
-memory pool to use.
+### Configuring On-Heap Memory Pools
-The size of the pool can be set explicitly with `spark.comet.memoryOverhead`.
If this setting is not specified then
-the memory overhead will be calculated by multiplying the executor memory by
`spark.comet.memory.overhead.factor`
-(defaults to `0.2`).
+When running in on-heap mode, Comet will use its own dedicated memory pools
that are not shared with Spark.
The type of pool can be specified with `spark.comet.exec.memoryPool`. The
default setting is `greedy_task_shared`.
@@ -69,7 +149,7 @@ Pool types ending with `_global` use a single global memory
pool between all tas
Pool types ending with `_task_shared` share a single memory pool across all
attempts for a single task.
-Other pool types create a dedicated pool per native query plan using a
fraction of the available pool size based on number of cores
+Other pool types create a dedicated pool per native query plan using a
fraction of the available pool size based on number of cores
and cores per task.
The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements
a greedy first-come first-serve limit. This
@@ -91,28 +171,6 @@ adjusting how much memory to allocate.
[FairSpillPool]:
https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html
[UnboundedMemoryPool]:
https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.UnboundedMemoryPool.html
-
-### Determining How Much Memory to Allocate
-
-Generally, increasing memory overhead will improve query performance,
especially for queries containing joins and
-aggregates.
-
-Once a memory pool is exhausted, the native plan will start spilling to disk,
which will slow down the query.
-
-Insufficient memory allocation can also lead to out-of-memory (OOM) errors.
-
-## Configuring spark.executor.memoryOverhead
-
-In some environments, such as Kubernetes and YARN, it is important to
correctly set `spark.executor.memoryOverhead` so
-that it is possible to allocate off-heap memory.
-
-Comet will automatically set `spark.executor.memoryOverhead` based on the
`spark.comet.memory*` settings so that
-resource managers respect Apache Spark memory configuration before starting
the containers.
-
-Note that there is currently a known issue where this will be inaccurate when
using Native Memory Management because it
-does not take executor concurrency into account. The tracking issue for this is
-https://github.com/apache/datafusion-comet/issues/949.
-
## Optimizing Joins
Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability
reasons. If the build-side of a
@@ -141,30 +199,22 @@ It must be set before the Spark context is created. You
can enable or disable Co
at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`.
Once it is disabled, Comet will fall back to the default Spark shuffle manager.
-### Shuffle Mode
+### Shuffle Implementations
-Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and Auto
Mode.
+Comet provides two shuffle implementations: Native Shuffle and Columnar
Shuffle. Comet will first try to use Native
+Shuffle and if that is not possible it will try to use Columnar Shuffle. If
neither can be applied, it will fall
+back to Spark for shuffle operations.
-#### Auto Mode
+#### Native Shuffle
-`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best
shuffle mode based on the query plan. This
-is the default.
+Comet provides a fully native shuffle implementation, which generally provides
the best performance. However,
+native shuffle currently only supports `HashPartitioning` and
`SinglePartitioning` and has some restrictions on
+supported data types.
#### Columnar (JVM) Shuffle
Comet Columnar shuffle is JVM-based and supports `HashPartitioning`,
`RoundRobinPartitioning`, `RangePartitioning`, and
-`SinglePartitioning`. This mode has the highest query coverage.
-
-Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to
`jvm`. If this mode is explicitly set,
-then any shuffle operations that cannot be supported in this mode will fall
back to Spark.
-
-#### Native Shuffle
-
-Comet also provides a fully native shuffle implementation, which generally
provides the best performance. However,
-native shuffle currently only supports `HashPartitioning` and
`SinglePartitioning`.
-
-To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If
this mode is explicitly set,
-then any shuffle operations that cannot be supported in this mode will fall
back to Spark.
+`SinglePartitioning`. This shuffle implementation supports more data types
than native shuffle.
### Shuffle Compression
diff --git a/objects.inv b/objects.inv
index c34abdf69..ff80537dc 100644
Binary files a/objects.inv and b/objects.inv differ
diff --git a/searchindex.js b/searchindex.js
index 2699d7ad6..6917c25cb 100644
--- a/searchindex.js
+++ b/searchindex.js
@@ -1 +1 @@
-Search.setIndex({"alltitles": {"1. Install Comet": [[9, "install-comet"]], "2.
Clone Spark and Apply Diff": [[9, "clone-spark-and-apply-diff"]], "3. Run Spark
SQL Tests": [[9, "run-spark-sql-tests"]], "ANSI mode": [[11, "ansi-mode"]],
"API Differences Between Spark Versions": [[0,
"api-differences-between-spark-versions"]], "ASF Links": [[10, null]], "Adding
Spark-side Tests for the New Expression": [[0,
"adding-spark-side-tests-for-the-new-expression"]], "Adding a New Expression":
[[0, [...]
\ No newline at end of file
+Search.setIndex({"alltitles": {"1. Install Comet": [[9, "install-comet"]], "2.
Clone Spark and Apply Diff": [[9, "clone-spark-and-apply-diff"]], "3. Run Spark
SQL Tests": [[9, "run-spark-sql-tests"]], "ANSI mode": [[11, "ansi-mode"]],
"API Differences Between Spark Versions": [[0,
"api-differences-between-spark-versions"]], "ASF Links": [[10, null]], "Adding
Spark-side Tests for the New Expression": [[0,
"adding-spark-side-tests-for-the-new-expression"]], "Adding a New Expression":
[[0, [...]
\ No newline at end of file
diff --git a/user-guide/compatibility.html b/user-guide/compatibility.html
index e08b8e95f..de67ef5e7 100644
--- a/user-guide/compatibility.html
+++ b/user-guide/compatibility.html
@@ -53,7 +53,7 @@ under the License.
<script async="true" defer="true"
src="https://buttons.github.io/buttons.js"></script>
<link rel="index" title="Index" href="../genindex.html" />
<link rel="search" title="Search" href="../search.html" />
- <link rel="next" title="Tuning Guide" href="tuning.html" />
+ <link rel="next" title="Comet Tuning Guide" href="tuning.html" />
<link rel="prev" title="Comet Configuration Settings" href="configs.html"
/>
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="en">
@@ -842,7 +842,7 @@ Spark.</p></li>
<a class='right-next' id="next-link" href="tuning.html" title="next page">
<div class="prev-next-info">
<p class="prev-next-subtitle">next</p>
- <p class="prev-next-title">Tuning Guide</p>
+ <p class="prev-next-title">Comet Tuning Guide</p>
</div>
<i class="fas fa-angle-right"></i>
</a>
diff --git a/user-guide/configs.html b/user-guide/configs.html
index 88a40ac4e..230363898 100644
--- a/user-guide/configs.html
+++ b/user-guide/configs.html
@@ -512,15 +512,15 @@ TO MODIFY THIS CONTENT MAKE SURE THAT YOU MAKE YOUR
CHANGES TO THE TEMPLATE FILE
<td><p>false</p></td>
</tr>
<tr class="row-even"><td><p>spark.comet.memory.overhead.factor</p></td>
-<td><p>Fraction of executor memory to be allocated as additional non-heap
memory per executor process for Comet.</p></td>
+<td><p>Fraction of executor memory to be allocated as additional memory for
Comet when running in on-heap mode or when using the <code class="docutils
literal notranslate"><span class="pre">fair_unified</span></code> pool in
off-heap mode. For more information, refer to the Comet Tuning Guide
(https://datafusion.apache.org/comet/user-guide/tuning.html).</p></td>
<td><p>0.2</p></td>
</tr>
<tr class="row-odd"><td><p>spark.comet.memory.overhead.min</p></td>
-<td><p>Minimum amount of additional memory to be allocated per executor
process for Comet, in MiB.</p></td>
+<td><p>Minimum amount of additional memory to be allocated per executor
process for Comet, in MiB, when running in on-heap mode or when using the <code
class="docutils literal notranslate"><span
class="pre">fair_unified</span></code> pool in off-heap mode. For more
information, refer to the Comet Tuning Guide
(https://datafusion.apache.org/comet/user-guide/tuning.html).</p></td>
<td><p>402653184b</p></td>
</tr>
<tr class="row-even"><td><p>spark.comet.memoryOverhead</p></td>
-<td><p>The amount of additional memory to be allocated per executor process
for Comet, in MiB. This config is optional. If this is not specified, it will
be set to <code class="docutils literal notranslate"><span
class="pre">spark.comet.memory.overhead.factor</span></code> * <code
class="docutils literal notranslate"><span
class="pre">spark.executor.memory</span></code>. This is memory that accounts
for things like Comet native execution, Comet shuffle, etc.</p></td>
+<td><p>The amount of additional memory to be allocated per executor process
for Comet, in MiB, when running in on-heap mode or when using the <code
class="docutils literal notranslate"><span
class="pre">fair_unified</span></code> pool in off-heap mode. This config is
optional. If this is not specified, it will be set to <code class="docutils
literal notranslate"><span
class="pre">spark.comet.memory.overhead.factor</span></code> * <code
class="docutils literal notranslate"><span class="pr [...]
<td><p></p></td>
</tr>
<tr class="row-odd"><td><p>spark.comet.metrics.updateInterval</p></td>
diff --git a/user-guide/metrics.html b/user-guide/metrics.html
index 8f3b30e17..36c178b30 100644
--- a/user-guide/metrics.html
+++ b/user-guide/metrics.html
@@ -54,7 +54,7 @@ under the License.
<link rel="index" title="Index" href="../genindex.html" />
<link rel="search" title="Search" href="../search.html" />
<link rel="next" title="Contributing to Apache DataFusion Comet"
href="../contributor-guide/contributing.html" />
- <link rel="prev" title="Tuning Guide" href="tuning.html" />
+ <link rel="prev" title="Comet Tuning Guide" href="tuning.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="en">
@@ -478,7 +478,7 @@ logged for each native plan (and there is one plan per
task, so this is very ver
<i class="fas fa-angle-left"></i>
<div class="prev-next-info">
<p class="prev-next-subtitle">previous</p>
- <p class="prev-next-title">Tuning Guide</p>
+ <p class="prev-next-title">Comet Tuning Guide</p>
</div>
</a>
<a class='right-next' id="next-link"
href="../contributor-guide/contributing.html" title="next page">
diff --git a/user-guide/tuning.html b/user-guide/tuning.html
index d69ae235a..8df974620 100644
--- a/user-guide/tuning.html
+++ b/user-guide/tuning.html
@@ -24,7 +24,7 @@ under the License.
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0"
/><meta name="viewport" content="width=device-width, initial-scale=1" />
- <title>Tuning Guide — Apache DataFusion Comet documentation</title>
+ <title>Comet Tuning Guide — Apache DataFusion Comet
documentation</title>
<link href="../_static/styles/theme.css?digest=1999514e3f237ded88cf"
rel="stylesheet">
<link
href="../_static/styles/pydata-sphinx-theme.css?digest=1999514e3f237ded88cf"
rel="stylesheet">
@@ -292,13 +292,13 @@ under the License.
</a>
<ul class="nav section-nav flex-column">
<li class="toc-h3 nav-item toc-entry">
- <a class="reference internal nav-link"
href="#unified-memory-management-with-off-heap-memory">
- Unified Memory Management with Off-Heap Memory
+ <a class="reference internal nav-link"
href="#configuring-comet-memory-in-off-heap-mode">
+ Configuring Comet Memory in Off-Heap Mode
</a>
</li>
<li class="toc-h3 nav-item toc-entry">
- <a class="reference internal nav-link"
href="#dedicated-comet-memory-pools">
- Dedicated Comet Memory Pools
+ <a class="reference internal nav-link"
href="#configuring-comet-memory-in-on-heap-mode">
+ Configuring Comet Memory in On-Heap Mode
</a>
</li>
<li class="toc-h3 nav-item toc-entry">
@@ -306,12 +306,34 @@ under the License.
Determining How Much Memory to Allocate
</a>
</li>
+ <li class="toc-h3 nav-item toc-entry">
+ <a class="reference internal nav-link" href="#sortexec">
+ SortExec
+ </a>
+ </li>
</ul>
</li>
<li class="toc-h2 nav-item toc-entry">
- <a class="reference internal nav-link"
href="#configuring-spark-executor-memoryoverhead">
- Configuring spark.executor.memoryOverhead
+ <a class="reference internal nav-link" href="#advanced-memory-tuning">
+ Advanced Memory Tuning
</a>
+ <ul class="nav section-nav flex-column">
+ <li class="toc-h3 nav-item toc-entry">
+ <a class="reference internal nav-link"
href="#configuring-spark-executor-memoryoverhead-in-on-heap-mode">
+ Configuring spark.executor.memoryOverhead in On-Heap Mode
+ </a>
+ </li>
+ <li class="toc-h3 nav-item toc-entry">
+ <a class="reference internal nav-link"
href="#configuring-off-heap-memory-pools">
+ Configuring Off-Heap Memory Pools
+ </a>
+ </li>
+ <li class="toc-h3 nav-item toc-entry">
+ <a class="reference internal nav-link"
href="#configuring-on-heap-memory-pools">
+ Configuring On-Heap Memory Pools
+ </a>
+ </li>
+ </ul>
</li>
<li class="toc-h2 nav-item toc-entry">
<a class="reference internal nav-link" href="#optimizing-joins">
@@ -324,13 +346,13 @@ under the License.
</a>
<ul class="nav section-nav flex-column">
<li class="toc-h3 nav-item toc-entry">
- <a class="reference internal nav-link" href="#shuffle-mode">
- Shuffle Mode
+ <a class="reference internal nav-link" href="#shuffle-implementations">
+ Shuffle Implementations
</a>
<ul class="nav section-nav flex-column">
<li class="toc-h4 nav-item toc-entry">
- <a class="reference internal nav-link" href="#auto-mode">
- Auto Mode
+ <a class="reference internal nav-link" href="#native-shuffle">
+ Native Shuffle
</a>
</li>
<li class="toc-h4 nav-item toc-entry">
@@ -338,11 +360,6 @@ under the License.
Columnar (JVM) Shuffle
</a>
</li>
- <li class="toc-h4 nav-item toc-entry">
- <a class="reference internal nav-link" href="#native-shuffle">
- Native Shuffle
- </a>
- </li>
</ul>
</li>
<li class="toc-h3 nav-item toc-entry">
@@ -410,36 +427,108 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
-<section id="tuning-guide">
-<h1>Tuning Guide<a class="headerlink" href="#tuning-guide" title="Link to this
heading">¶</a></h1>
+<section id="comet-tuning-guide">
+<h1>Comet Tuning Guide<a class="headerlink" href="#comet-tuning-guide"
title="Link to this heading">¶</a></h1>
<p>Comet provides some tuning options to help you get the best performance
from your queries.</p>
<section id="memory-tuning">
<h2>Memory Tuning<a class="headerlink" href="#memory-tuning" title="Link to
this heading">¶</a></h2>
-<section id="unified-memory-management-with-off-heap-memory">
-<h3>Unified Memory Management with Off-Heap Memory<a class="headerlink"
href="#unified-memory-management-with-off-heap-memory" title="Link to this
heading">¶</a></h3>
-<p>The recommended way to share memory between Spark and Comet is to set <code
class="docutils literal notranslate"><span
class="pre">spark.memory.offHeap.enabled=true</span></code>. This allows
-Comet to share an off-heap memory pool with Spark. The size of the pool is
specified by <code class="docutils literal notranslate"><span
class="pre">spark.memory.offHeap.size</span></code>. For more details about
Spark off-heap memory mode, please refer to Spark documentation:
https://spark.apache.org/docs/latest/configuration.html.</p>
-<p>The type of pool can be specified with <code class="docutils literal
notranslate"><span class="pre">spark.comet.exec.memoryPool</span></code>.</p>
+<p>It is necessary to specify how much memory Comet can use in addition to
memory already allocated to Spark. In some
+cases, it may be possible to reduce the amount of memory allocated to Spark so
that overall memory allocation is
+the same or lower than the original configuration. In other cases, enabling
Comet may require allocating more memory
+than before. See the <a class="reference internal"
href="#determining-how-much-memory-to-allocate">Determining How Much Memory to
Allocate</a> section for more details.</p>
+<p>Comet supports Spark’s on-heap (the default) and off-heap mode for
allocating memory. However, we strongly recommend
+using off-heap mode. Comet has some limitations when running in on-heap mode,
such as requiring more memory overall,
+and requiring shuffle memory to be separately configured.</p>
+<section id="configuring-comet-memory-in-off-heap-mode">
+<h3>Configuring Comet Memory in Off-Heap Mode<a class="headerlink"
href="#configuring-comet-memory-in-off-heap-mode" title="Link to this
heading">¶</a></h3>
+<p>The recommended way to allocate memory for Comet is to set <code
class="docutils literal notranslate"><span
class="pre">spark.memory.offHeap.enabled=true</span></code>. This allows
+Comet to share an off-heap memory pool with Spark, reducing the overall memory
overhead. The size of the pool is
+specified by <code class="docutils literal notranslate"><span
class="pre">spark.memory.offHeap.size</span></code>. For more details about
Spark off-heap memory mode, please refer to
+Spark documentation:
https://spark.apache.org/docs/latest/configuration.html.</p>
+</section>
+<section id="configuring-comet-memory-in-on-heap-mode">
+<h3>Configuring Comet Memory in On-Heap Mode<a class="headerlink"
href="#configuring-comet-memory-in-on-heap-mode" title="Link to this
heading">¶</a></h3>
+<p>When running in on-heap mode, Comet memory can be allocated by setting
<code class="docutils literal notranslate"><span
class="pre">spark.comet.memoryOverhead</span></code>. If this setting
+is not provided, it will be calculated by multiplying the current Spark
executor memory by
+<code class="docutils literal notranslate"><span
class="pre">spark.comet.memory.overhead.factor</span></code> (default value is
<code class="docutils literal notranslate"><span class="pre">0.2</span></code>)
which may or may not result in enough memory for
+Comet to operate. It is not recommended to rely on this behavior. It is better
to specify <code class="docutils literal notranslate"><span
class="pre">spark.comet.memoryOverhead</span></code>
+explicitly.</p>
+<p>Comet supports native shuffle and columnar shuffle (these terms are
explained in the <a class="reference internal" href="#shuffle">shuffle</a>
section below).
+In on-heap mode, columnar shuffle memory must be separately allocated using
<code class="docutils literal notranslate"><span
class="pre">spark.comet.columnar.shuffle.memorySize</span></code>.
+If this setting is not provided, it will be calculated by multiplying <code
class="docutils literal notranslate"><span
class="pre">spark.comet.memoryOverhead</span></code> by
+<code class="docutils literal notranslate"><span
class="pre">spark.comet.columnar.shuffle.memory.factor</span></code> (default
value is <code class="docutils literal notranslate"><span
class="pre">1.0</span></code>). If a shuffle exceeds this amount of memory
+then the query will fail.</p>
+</section>
+<section id="determining-how-much-memory-to-allocate">
+<h3>Determining How Much Memory to Allocate<a class="headerlink"
href="#determining-how-much-memory-to-allocate" title="Link to this
heading">¶</a></h3>
+<p>Generally, increasing the amount of memory allocated to Comet will improve
query performance by reducing the
+amount of time spent spilling to disk, especially for aggregate, join, and
shuffle operations. Allocating insufficient
+memory can result in out-of-memory errors. This is no different from
allocating memory in Spark and the amount of
+memory will vary for different workloads, so some experimentation will be
required.</p>
+<p>Here is a real-world example, based on running benchmarks derived from
TPC-H, running on a single executor against
+local Parquet files using the 100 GB data set.</p>
+<p>Baseline Spark Performance</p>
+<ul class="simple">
+<li><p>Spark completes the benchmark in 632 seconds with 8 cores and 8 GB
RAM</p></li>
+<li><p>With less than 8 GB RAM, performance degrades due to spilling</p></li>
+<li><p>Spark can complete the benchmark with as little as 3 GB of RAM, but
with worse performance (744 seconds)</p></li>
+</ul>
+<p>Comet Performance</p>
+<ul class="simple">
+<li><p>Comet requires at least 5 GB of RAM in off-heap mode and 6 GB RAM in
on-heap mode, but performance at this level
+is around 340 seconds, which is significantly faster than Spark with any
amount of RAM</p></li>
+<li><p>Comet running in off-heap with 8 cores completes the benchmark in 295
seconds, more than 2x faster than Spark</p></li>
+<li><p>It is worth noting that running Comet with only 4 cores and 4 GB RAM
completes the benchmark in 520 seconds,
+providing better performance than Spark for half the resource</p></li>
+</ul>
+<p>It may be possible to reduce Comet’s memory overhead by reducing batch
sizes or increasing number of partitions.</p>
+</section>
+<section id="sortexec">
+<h3>SortExec<a class="headerlink" href="#sortexec" title="Link to this
heading">¶</a></h3>
+<p>Comet’s SortExec implementation spills to disk when under memory pressure,
but there are some known issues in the
+underlying DataFusion SortExec implementation that could cause out-of-memory
errors during spilling. See
+https://github.com/apache/datafusion/issues/14692 for more information.</p>
+<p>Workarounds for this problem include:</p>
+<ul class="simple">
+<li><p>Allocating more off-heap memory</p></li>
+<li><p>Disabling native sort by setting <code class="docutils literal
notranslate"><span
class="pre">spark.comet.exec.sort.enabled=false</span></code></p></li>
+</ul>
+</section>
+</section>
+<section id="advanced-memory-tuning">
+<h2>Advanced Memory Tuning<a class="headerlink" href="#advanced-memory-tuning"
title="Link to this heading">¶</a></h2>
+<section id="configuring-spark-executor-memoryoverhead-in-on-heap-mode">
+<h3>Configuring spark.executor.memoryOverhead in On-Heap Mode<a
class="headerlink"
href="#configuring-spark-executor-memoryoverhead-in-on-heap-mode" title="Link
to this heading">¶</a></h3>
+<p>In some environments, such as Kubernetes and YARN, it is important to
correctly set <code class="docutils literal notranslate"><span
class="pre">spark.executor.memoryOverhead</span></code> so
+that it is possible to allocate off-heap memory when running in on-heap
mode.</p>
+<p>Comet will automatically set <code class="docutils literal
notranslate"><span class="pre">spark.executor.memoryOverhead</span></code>
based on the <code class="docutils literal notranslate"><span
class="pre">spark.comet.memory*</span></code> settings so that
+resource managers respect Apache Spark memory configuration before starting
the containers.</p>
+</section>
+<section id="configuring-off-heap-memory-pools">
+<h3>Configuring Off-Heap Memory Pools<a class="headerlink"
href="#configuring-off-heap-memory-pools" title="Link to this
heading">¶</a></h3>
+<p>Comet implements multiple memory pool implementations. The type of pool can
be specified with <code class="docutils literal notranslate"><span
class="pre">spark.comet.exec.memoryPool</span></code>.</p>
<p>The valid pool types are:</p>
<ul class="simple">
<li><p><code class="docutils literal notranslate"><span
class="pre">unified</span></code> (default when <code class="docutils literal
notranslate"><span class="pre">spark.memory.offHeap.enabled=true</span></code>
is set)</p></li>
<li><p><code class="docutils literal notranslate"><span
class="pre">fair_unified</span></code></p></li>
</ul>
<p>The <code class="docutils literal notranslate"><span
class="pre">unified</span></code> pool type implements a greedy first-come
first-serve limit. This pool works well for queries that do not
-need to spill or have a single spillable operator.</p>
+need to spill or have a single spillable operator. The size of the pool is
specified by <code class="docutils literal notranslate"><span
class="pre">spark.memory.offHeap.size</span></code>
+and the pool interacts with Spark’s memory pool, effectively sharing the
off-heap memory between Spark and Comet. This
+approach is sometimes referred to as unified memory management.</p>
<p>The <code class="docutils literal notranslate"><span
class="pre">fair_unified</span></code> pool type prevents operators from using
more than an even fraction of the available memory
(i.e. <code class="docutils literal notranslate"><span
class="pre">pool_size</span> <span class="pre">/</span> <span
class="pre">num_reservations</span></code>). This pool works best when you know
beforehand
the query has multiple operators that will likely all need to spill. Sometimes
it will cause spills even
when there is sufficient memory in order to leave enough memory for other
operators.</p>
+<p>The pool size configuration for the <code class="docutils literal
notranslate"><span class="pre">fair_unified</span></code> pool, is a little
more complex. The total pool size is computed by
+multiplying <code class="docutils literal notranslate"><span
class="pre">spark.memory.offHeap.size</span></code> by <code class="docutils
literal notranslate"><span
class="pre">spark.comet.memory.overhead.factor</span></code> with the minimum
amount being
+<code class="docutils literal notranslate"><span
class="pre">spark.comet.memory.overhead.min</span></code>. It is also possible
to manually specify <code class="docutils literal notranslate"><span
class="pre">spark.comet.memoryOverhead</span></code> instead to
+override this default behavior. Note that the <code class="docutils literal
notranslate"><span class="pre">fair_unified</span></code> pool does not use
unified memory management to interact
+with Spark’s memory pools, which is why the allocation defaults to a fraction
of off-heap memory.</p>
</section>
-<section id="dedicated-comet-memory-pools">
-<h3>Dedicated Comet Memory Pools<a class="headerlink"
href="#dedicated-comet-memory-pools" title="Link to this heading">¶</a></h3>
-<p>Spark uses on-heap memory mode by default, i.e., the <code class="docutils
literal notranslate"><span
class="pre">spark.memory.offHeap.enabled</span></code> setting is not enabled.
If Spark is under on-heap memory mode, Comet will use its own dedicated memory
pools that
-are not shared with Spark. This requires additional configuration settings to
be specified to set the size and type of
-memory pool to use.</p>
-<p>The size of the pool can be set explicitly with <code class="docutils
literal notranslate"><span
class="pre">spark.comet.memoryOverhead</span></code>. If this setting is not
specified then
-the memory overhead will be calculated by multiplying the executor memory by
<code class="docutils literal notranslate"><span
class="pre">spark.comet.memory.overhead.factor</span></code>
-(defaults to <code class="docutils literal notranslate"><span
class="pre">0.2</span></code>).</p>
+<section id="configuring-on-heap-memory-pools">
+<h3>Configuring On-Heap Memory Pools<a class="headerlink"
href="#configuring-on-heap-memory-pools" title="Link to this heading">¶</a></h3>
+<p>When running in on-heap mode, Comet will use its own dedicated memory pools
that are not shared with Spark.</p>
<p>The type of pool can be specified with <code class="docutils literal
notranslate"><span class="pre">spark.comet.exec.memoryPool</span></code>. The
default setting is <code class="docutils literal notranslate"><span
class="pre">greedy_task_shared</span></code>.</p>
<p>The valid pool types are:</p>
<ul class="simple">
@@ -468,23 +557,6 @@ development/testing purposes, where there is no room to
allow spilling and rathe
Spilling significantly slows down the job and this option is one way to
measure the best performance scenario without
adjusting how much memory to allocate.</p>
</section>
-<section id="determining-how-much-memory-to-allocate">
-<h3>Determining How Much Memory to Allocate<a class="headerlink"
href="#determining-how-much-memory-to-allocate" title="Link to this
heading">¶</a></h3>
-<p>Generally, increasing memory overhead will improve query performance,
especially for queries containing joins and
-aggregates.</p>
-<p>Once a memory pool is exhausted, the native plan will start spilling to
disk, which will slow down the query.</p>
-<p>Insufficient memory allocation can also lead to out-of-memory (OOM)
errors.</p>
-</section>
-</section>
-<section id="configuring-spark-executor-memoryoverhead">
-<h2>Configuring spark.executor.memoryOverhead<a class="headerlink"
href="#configuring-spark-executor-memoryoverhead" title="Link to this
heading">¶</a></h2>
-<p>In some environments, such as Kubernetes and YARN, it is important to
correctly set <code class="docutils literal notranslate"><span
class="pre">spark.executor.memoryOverhead</span></code> so
-that it is possible to allocate off-heap memory.</p>
-<p>Comet will automatically set <code class="docutils literal
notranslate"><span class="pre">spark.executor.memoryOverhead</span></code>
based on the <code class="docutils literal notranslate"><span
class="pre">spark.comet.memory*</span></code> settings so that
-resource managers respect Apache Spark memory configuration before starting
the containers.</p>
-<p>Note that there is currently a known issue where this will be inaccurate
when using Native Memory Management because it
-does not take executor concurrency into account. The tracking issue for this is
-https://github.com/apache/datafusion-comet/issues/949.</p>
</section>
<section id="optimizing-joins">
<h2>Optimizing Joins<a class="headerlink" href="#optimizing-joins" title="Link
to this heading">¶</a></h2>
@@ -508,27 +580,21 @@ to test with both for your specific workloads.</p>
It must be set before the Spark context is created. You can enable or disable
Comet shuffle
at runtime by setting <code class="docutils literal notranslate"><span
class="pre">spark.comet.exec.shuffle.enabled</span></code> to <code
class="docutils literal notranslate"><span class="pre">true</span></code> or
<code class="docutils literal notranslate"><span
class="pre">false</span></code>.
Once it is disabled, Comet will fall back to the default Spark shuffle
manager.</p>
-<section id="shuffle-mode">
-<h3>Shuffle Mode<a class="headerlink" href="#shuffle-mode" title="Link to this
heading">¶</a></h3>
-<p>Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and
Auto Mode.</p>
-<section id="auto-mode">
-<h4>Auto Mode<a class="headerlink" href="#auto-mode" title="Link to this
heading">¶</a></h4>
-<p><code class="docutils literal notranslate"><span
class="pre">spark.comet.exec.shuffle.mode</span></code> to <code
class="docutils literal notranslate"><span class="pre">auto</span></code> will
let Comet choose the best shuffle mode based on the query plan. This
-is the default.</p>
+<section id="shuffle-implementations">
+<h3>Shuffle Implementations<a class="headerlink"
href="#shuffle-implementations" title="Link to this heading">¶</a></h3>
+<p>Comet provides two shuffle implementations: Native Shuffle and Columnar
Shuffle. Comet will first try to use Native
+Shuffle and if that is not possible it will try to use Columnar Shuffle. If
neither can be applied, it will fall
+back to Spark for shuffle operations.</p>
+<section id="native-shuffle">
+<h4>Native Shuffle<a class="headerlink" href="#native-shuffle" title="Link to
this heading">¶</a></h4>
+<p>Comet provides a fully native shuffle implementation, which generally
provides the best performance. However,
+native shuffle currently only supports <code class="docutils literal
notranslate"><span class="pre">HashPartitioning</span></code> and <code
class="docutils literal notranslate"><span
class="pre">SinglePartitioning</span></code> and has some restrictions on
+supported data types.</p>
</section>
<section id="columnar-jvm-shuffle">
<h4>Columnar (JVM) Shuffle<a class="headerlink" href="#columnar-jvm-shuffle"
title="Link to this heading">¶</a></h4>
<p>Comet Columnar shuffle is JVM-based and supports <code class="docutils
literal notranslate"><span class="pre">HashPartitioning</span></code>, <code
class="docutils literal notranslate"><span
class="pre">RoundRobinPartitioning</span></code>, <code class="docutils literal
notranslate"><span class="pre">RangePartitioning</span></code>, and
-<code class="docutils literal notranslate"><span
class="pre">SinglePartitioning</span></code>. This mode has the highest query
coverage.</p>
-<p>Columnar shuffle can be enabled by setting <code class="docutils literal
notranslate"><span class="pre">spark.comet.exec.shuffle.mode</span></code> to
<code class="docutils literal notranslate"><span class="pre">jvm</span></code>.
If this mode is explicitly set,
-then any shuffle operations that cannot be supported in this mode will fall
back to Spark.</p>
-</section>
-<section id="native-shuffle">
-<h4>Native Shuffle<a class="headerlink" href="#native-shuffle" title="Link to
this heading">¶</a></h4>
-<p>Comet also provides a fully native shuffle implementation, which generally
provides the best performance. However,
-native shuffle currently only supports <code class="docutils literal
notranslate"><span class="pre">HashPartitioning</span></code> and <code
class="docutils literal notranslate"><span
class="pre">SinglePartitioning</span></code>.</p>
-<p>To enable native shuffle, set <code class="docutils literal
notranslate"><span class="pre">spark.comet.exec.shuffle.mode</span></code> to
<code class="docutils literal notranslate"><span
class="pre">native</span></code>. If this mode is explicitly set,
-then any shuffle operations that cannot be supported in this mode will fall
back to Spark.</p>
+<code class="docutils literal notranslate"><span
class="pre">SinglePartitioning</span></code>. This shuffle implementation
supports more data types than native shuffle.</p>
</section>
</section>
<section id="shuffle-compression">
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]