[ 
https://issues.apache.org/jira/browse/TEZ-4211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17172161#comment-17172161
 ] 

László Bodor commented on TEZ-4211:
-----------------------------------

[~rajesh.balamohan]: I took a look at  [^TEZ-4211.2.patch]. I'm not quite 
familiar with MergeManager.finalMerge, I can see the major difference 
before/after the patch is that the new logic is much simpler (which I like 
personally). If I understood that correctly (please find my notes below), just 
creates in-memory segments first, then disk segments for merge. It indeed does 
prevent memory outputs to be spilled if there are no onDisk outputs, but 
regardless of onDisk outputs, it prevents inMemory outputs to be spilled at 
all. I'm not sure if that was the intention, as before the patch, some inMemory 
outputs were spilled in order to respect a memory setting (postMergeMemLimit). 
Could it be an issue later?




my notes about the patch:
BEFORE:
https://github.com/apache/tez/blob/7659726a1ed877d1f5303fc3673e4399bab33b65/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java

1. create memory segments from inMemoryMapOutputs (with this.postMergeMemLimit 
threshold to keep some of them)
2. merge (some of) them to disk -> add to onDiskMapOutputs
3. create disk segments from onDiskMapOutputs (which now contains originally 
memory outputs, spilled to disk)
4. create memory segments from rest of inMemoryOutputs (no threshold, process 
all of them) -> finalSegments
5. if there are diskSegments, then add it to finalSegments, or return them if 
finalSegments is empty (no memory map output left)

AFTER:
https://github.com/abstractdog/tez/blob/55e0cbd20fac5b0b4ffa47032c76b3bcd88fc421/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java

1. create memory segments from all of inMemoryOutputs (process all, no 
threshold) -> finalSegments
2. if there are elements in onDiskMapOutputs, create diskSegments of it (~ step 
no3. in old)
3. if there are diskSegments, then add it to finalSegments, or return them if 
finalSegments is empty (~there was no memory map output at all) (~ step no5. in 
old)




> Optimise MergeManager final merge
> ---------------------------------
>
>                 Key: TEZ-4211
>                 URL: https://issues.apache.org/jira/browse/TEZ-4211
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Rajesh Balamohan
>            Priority: Major
>         Attachments: TEZ-4211.2.patch, TEZ-4211.wip.patch
>
>
> There are cases, when entire data is held in memory and no disk segments are 
> present in MergeManager. Currently, mergemanager spills mem segments to disk 
> before proceeding.
>  
> [https://github.com/apache/tez/blob/master/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java#L1184]
>  
> {code:java}
> if (numMemDiskSegments > 0 && ioSortFactor > onDiskMapOutputs.size()) {
> ...
> ..
> TezMerger.writeFile(rIter, writer, progressable, 
> TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
> ...
> ..
>  {code}
> This can be optimised not to spill to disk when only mem segments are present.
> Snippet from logs in one of the apps (Q78)
> {noformat}
>  [ShuffleAndMergeRunner {Map_1} ()] 
> org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager: 
> finalMerge with #inMemoryOutputs=4112, size=839646500 and #onDiskOutputs=0, 
> size=0
>  [ShuffleAndMergeRunner {Map_1} ()] 
> org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager: 
> finalMerge with #inMemoryOutputs=4112, size=859378362 and #onDiskOutputs=0, 
> size=0
>  [ShuffleAndMergeRunner {Map_1} ()] 
> org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager: 
> finalMerge with #inMemoryOutputs=4112, size=856145179 and #onDiskOutputs=0, 
> size=0
>  [ShuffleAndMergeRunner {Map_1} ()] 
> org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager: 
> finalMerge with #inMemoryOutputs=4112, size=849878734 and #onDiskOutputs=0, 
> size=0
>  [ShuffleAndMergeRunner {Map_1} ()] 
> org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager: 
> finalMerge with #inMemoryOutputs=4112, size=842666749 and #onDiskOutputs=0, 
> size=0
>  [ShuffleAndMergeRunner {Map_1} ()] 
> org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager: 
> finalMerge with #inMemoryOutputs=4112, size=839533127 and #onDiskOutputs=0, 
> size=0
>  [ShuffleAndMergeRunner {Map_1} ()] 
> org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager: 
> finalMerge with #inMemoryOutputs=4112, size=860448335 and #onDiskOutputs=0, 
> size=0
>  [ShuffleAndMergeRunner {Map_1} ()] 
> org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager: 
> finalMerge with #inMemoryOutputs=4112, size=844468505 and #onDiskOutputs=0, 
> size=0
>  [ShuffleAndMergeRunner {Map_1} ()] 
> org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager: 
> finalMerge with #inMemoryOutputs=4112, size=850099810 and #onDiskOutputs=0, 
> size=0
>  [ShuffleAndMergeRunner {Map_1} ()] 
> org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager: 
> finalMerge with #inMemoryOutputs=4112, size=849206236 and #onDiskOutputs=0, 
> size=0
>  [ShuffleAndMergeRunner {Map_1} ()] 
> org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager: 
> finalMerge with #inMemoryOutputs=4112, size=840238680 and #onDiskOutputs=0, 
> size=0
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to