Kontinuation opened a new issue, #949:
URL: https://github.com/apache/datafusion-comet/issues/949

   ### What is the problem the feature request solves?
   
   The DataFusion Comet documentation has a [memory tuning 
section](https://datafusion.apache.org/comet/user-guide/tuning.html#memory-tuning)
 in the tuning guide after addressing 
https://github.com/apache/datafusion-comet/issues/595, it looks simple at first 
glance, but I found that the actual behavior is more complex than what I've 
thought.
   
   1. `spark.comet.memory.overhead.factor` and `spark.comet.memoryOverhead` are 
for per-operator limit, not per-executor/per-worker or per-core. When a comet 
plan is created, it [creates its own memory 
pool](https://github.com/apache/datafusion-comet/blob/0.2.0/native/core/src/execution/jni_api.rs#L217-L226)
 [sized 
`spark.comet.memoryOverhead`](https://github.com/apache/datafusion-comet/blob/0.2.0/spark/src/main/scala/org/apache/comet/CometExecIterator.scala#L77-L90).
 Usually, we have `spark.executor.cores` equal to the number of vCPUs, so the 
actual amount of memory allocated for comet in the worker instance will be (at 
least) `spark.executor.cores* spark.comet.memoryOverhead`.
   2. We have `CometPlugin` for configuring comet memory overhead 
automatically, but `CometPlugin` [does not account for the existence of 
multiple executor 
cores](https://github.com/apache/datafusion-comet/blob/0.2.0/spark/src/main/scala/org/apache/spark/Plugins.scala#L63-L65).
 The actual per-instance comet memory consumption will be more than the 
configured memory overhead when `spark.executor.cores` > 1.
   3. Even when assuming `spark.executor.cores = 1` and we are only running one 
single task on each executor instance, there are still chances to have multiple 
comet executors allocating multiple memory pools, so the actual memory limit 
will be multiple times of `spark.comet.memoryOverhead`. The following figure 
shows the DAG of a Spark job. We can see that Stage 205 has 3 `CometSort` 
nodes, each node may consume `spark.comet.memoryOverhead` amount of memory. 
This is a conservative estimation since we assume that all other nodes in this 
stage won't reserve significant amount of memory.
   
![image](https://github.com/user-attachments/assets/9b646f6a-85d3-430e-8fd5-5ad79b3be204)
   
   The conclusion is that the actual memory limit for comet depends on:
   * `spark.comet.memoryOverhead`
   * Number of cores per executor
   * The maximum number of memory-intensive Comet nodes in one stage
   
   This makes comet hard to tune and the behavior is hard to estimate (it 
depends on the actual queries). We'd better make it clear in the tuning guide 
or revamp the memory-related configurations to make it easier to tune and 
reason about.
   
   ### Describe the potential solution
   
   Ideally the `spark.comet.memory.overhead.factor` and 
`spark.comet.memoryOverhead` configure the per executor instance memory limit. 
I have the following ideas to achieve this:
   
   1. Use the unified memory manager introduced by 
https://github.com/apache/datafusion-comet/pull/83. This requires enabling 
off-heap memory in Spark. I'm not sure why it does not appear in the tuning 
guide (due to its maturity maybe). The downside is that comet operators cannot 
trigger the spilling of other memory consumers, which makes it easy to run into 
issues similar to https://github.com/apache/datafusion-comet/issues/886 due to 
its greedy/unfair nature.
   2. Making all comet operators in the same task sharing the same 
[FairSpillPool](https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html#).
 The memory limit of the fair spill pool can be `spark.comet.memoryOverhead / 
maxParallelism`. It ensures that each operator could get the minimum amount of 
memory, especially when we only support self-spilling. The downside is memory 
under-utilization when the memory requirements of the operators are very uneven 
(https://github.com/apache/datafusion/issues/2829).
   
   I'm not sure if it is feasible to implement sophisticated [memory 
arbitration](https://facebookincubator.github.io/velox/develop/memory.html) on 
top of 1 or 2, but I think it will help a lot to handle various kinds of 
workloads efficiently. 
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to