Kontinuation commented on PR #1004: URL: https://github.com/apache/datafusion-comet/pull/1004#issuecomment-2401198781
This is better than using a greedy memory pool. It makes spillable operators work correctly under memory pressure, especially when running sort-merge-join where multiple sort operators compete for resources. There are still some issues remaining unresolved. Each task may create multiple native plans and we still do not make them share the same memory pool. I'd like to share the experiments I've done to better sync with you on this topic. I did some experiments [on my branch](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406) to try out various ways of using memory pools. There's a configuration `spark.comet.exec.memoryPool` to allow me running queries using various memory pools. All configurations were tested using the query mentioned in https://github.com/apache/datafusion-comet/issues/1003. [**spark.comet.exec.memoryPool = greedy**](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406R282-R284) This is the current mode when using native memory management. It could only run with `spark.comet.memoryOverhead = 8000m`, otherwise sort-merge-join will fail because of memory reservation failure: ``` 24/10/09 10:59:14 WARN TaskSetManager: Lost task 3.0 in stage 13.0 (TID 43) (bogon executor driver): org.apache.comet.CometNativeException: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge[0] consumed 1164398840 bytes, GroupedHashAggregateStream[0] consumed 117699433 bytes, SMJStream[0] consumed 459160 bytes, HashJoinInput[0] consumed 1392 bytes. Error: Failed to allocate additional 993312 bytes for ExternalSorter[0] with 1321280 bytes already allocated for this reservation - 625495 bytes remain available for the total pool ``` [**spark.comet.exec.memoryPool = fair_spill**](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406R277-R281) The same approach as this PR. Simply use FairSpillPool for per-plan memory pool. It could run with `spark.comet.memoryOverhead = 3200m`. Both sort operators could spill to cope with the memory bound: ``` 24/10/09 11:03:11 INFO core/src/execution/jni_api.rs: Comet native query plan with metrics (stage: 13 task: 41): AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1, col_4@4 as col_2, col_3@3 as col_3, col_8@8 as col_4, col_2@2 as col_5, col_5@5 as col_6], aggr=[sum], metrics=[output_rows=2791473, elapsed_compute=3.695235425s] ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_4@4 as col_3, col_5@5 as col_4, col_6@6 as col_5, col_7@7 as col_6, col_8@8 as col_7, col_1@10 as col_8], metrics=[output_rows=15002382, elapsed_compute=2.23923ms] ProjectionExec: expr=[col_0@2 as col_0, col_1@3 as col_1, col_2@4 as col_2, col_3@5 as col_3, col_4@6 as col_4, col_5@7 as col_5, col_6@8 as col_6, col_7@9 as col_7, col_8@10 as col_8, col_0@0 as col_0, col_1@1 as col_1], metrics=[output_rows=15002382, elapsed_compute=2.445133ms] HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_3@3)], metrics=[output_rows=15002382, input_batches=1832, input_rows=15002382, build_input_batches=1, output_batches=1832, build_input_rows=25, build_mem_used=1392, join_time=853.609662ms, build_time=42.125µs] CopyExec [UnpackOrDeepCopy], metrics=[output_rows=25, elapsed_compute=3.292µs] ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int64, col_1: Utf8], metrics=[output_rows=25, elapsed_compute=709ns, cast_time=1ns] CopyExec [UnpackOrClone], metrics=[output_rows=15002382, elapsed_compute=1.754617ms] ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5, col_6@6 as col_6, col_1@9 as col_7, col_2@10 as col_8], metrics=[output_rows=15002382, elapsed_compute=2.032586ms] SortMergeJoin: join_type=Inner, on=[(col_7@7, col_0@0)], metrics=[output_rows=15002382, spill_count=0, spilled_bytes=0, spilled_rows=0, input_batches=2290, input_rows=18752902, output_batches=1832, peak_mem_used=918320, join_time=4.976020762s] SortExec: expr=[col_7@7 ASC], preserve_partitioning=[false], metrics=[output_rows=3750520, elapsed_compute=1.678235876s, spill_count=3, spilled_bytes=572203168, spilled_rows=3066232] CopyExec [UnpackOrDeepCopy], metrics=[output_rows=3750520, elapsed_compute=65.265593ms] ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Utf8, col_2: Utf8, col_3: Int64, col_4: Utf8, col_5: Decimal128(12, 2), col_6: Utf8, col_7: Int64], metrics=[output_rows=3750520, elapsed_compute=450.456µs, cast_time=1ns] SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false], metrics=[output_rows=15002382, elapsed_compute=2.424249133s, spill_count=4, spilled_bytes=547164360, spilled_rows=13667085] CopyExec [UnpackOrDeepCopy], metrics=[output_rows=15002382, elapsed_compute=40.672672ms] ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Decimal128(12, 2), col_2: Decimal128(12, 2)], metrics=[output_rows=15002382, elapsed_compute=531.627µs, cast_time=1ns] ``` But please note that each task may create 2 native plans, which has its own memory pool. Here is an example task creating 2 native plans, and these 2 plans are running concurrently. Plan 1: ``` 24/10/08 13:34:46 INFO core/src/execution/jni_api.rs: Comet native query plan (stage: 13 task: 40): AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1, col_4@4 as col_2, col_3@3 as col_3, col_8@8 as col_4, col_2@2 as col_5, col_5@5 as col_6], aggr=[sum] ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_4@4 as col_3, col_5@5 as col_4, col_6@6 as col_5, col_7@7 as col_6, col_8@8 as col_7, col_1@10 as col_8] ProjectionExec: expr=[col_0@2 as col_0, col_1@3 as col_1, col_2@4 as col_2, col_3@5 as col_3, col_4@6 as col_4, col_5@7 as col_5, col_6@8 as col_6, col_7@9 as col_7, col_8@10 as col_8, col_0@0 as col_0, col_1@1 as col_1] HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_3@3)] CopyExec [UnpackOrDeepCopy] ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int64, col_1: Utf8] CopyExec [UnpackOrClone] ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5, col_6@6 as col_6, col_1@9 as col_7, col_2@10 as col_8] SortMergeJoin: join_type=Inner, on=[(col_7@7, col_0@0)] SortExec: expr=[col_7@7 ASC], preserve_partitioning=[false] CopyExec [UnpackOrDeepCopy] ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Utf8, col_2: Utf8, col_3: Int64, col_4: Utf8, col_5: Decimal128(12, 2), col_6: Utf8, col_7: Int64] SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] CopyExec [UnpackOrDeepCopy] ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Decimal128(12, 2), col_2: Decimal128(12, 2)] ``` Plan 2: ``` 24/10/08 13:34:52 INFO core/src/execution/jni_api.rs: Comet native query plan (stage: 13 task: 40): ShuffleWriterExec: partitioning=Hash([Column { name: "col_0", index: 0 }, Column { name: "col_1", index: 1 }, Column { name: "col_2", index: 2 }, Column { name: "col_3", index: 3 }, Column { name: "col_4", index: 4 }, Column { name: "col_5", index: 5 }, Column { name: "col_6", index: 6 }], 4) ScanExec: source=[], schema=[col_0: Int64, col_1: Utf8, col_2: Decimal128(12, 2), col_3: Utf8, col_4: Utf8, col_5: Utf8, col_6: Utf8, col_7: Decimal128(36, 4), col_8: Boolean] ``` [**spark.comet.exec.memoryPool = fair_spill_shared**](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406R235-R242) This approach allocates a FairSpillPool for all plans in the same task. For the above example, the sort-merge-join and the shuffle-write plans in the same task share the same memory pool. This strictly follows the conceptual model that comet won't exceed the `spark.comet.memoryOverhead`. It could run with `spark.comet.memoryOverhead = 4800m`. I've added 2 additional JNI interfaces for creating a memory pool at the beginning of each task and releasing the memory pool at the end of each task. Actually this is not necessary. We can create and track the usage of per-task memory pool in the native code, all it needs is the task attempt id at native plan creation time. [**spark.comet.exec.memoryPool = fair_spill_global**](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406R259-R267) This approach uses a singleton FairSpillPool for all tasks in the same executor instance. I thought that it should be the optimal approach, but in practice it does not work well. It could only run with `spark.comet.memoryOverhead = 12000m`. I'll dive into this issue next week since there are lots of other work allocated for this week :(. [**spark.comet.exec.memoryPool = greedy_global**](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406R268-R276) This approach uses a singleton GreedyMemoryPool for all tasks in the same executor instance. As expected, it does not work well. It could only run with `spark.comet.memoryOverhead = 9000m`. So the conclusion is that `fair_spill` and `fair_spill_shared` have lower memory requirements and are less likely to break when running memory-intensive queries, and I also believe that Spark needs a more sophisticated memory management system from datafusion to support large ETL use cases steadily, which is the use case where Spark shines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
