[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Or updated SPARK-4452:
-----------------------------
    Description: 
When an Aggregator is used with ExternalSorter in a task, spark will create 
many small files and could cause too many files open error during merging.

Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by 
Aggregator) in this case. Here is an example: Due to the usage of mapside 
aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask 
as much memory as it can, which is totalMem/numberOfThreads. Then later on when 
ExternalSorter is created in the same thread, the ShuffleMemoryManager could 
refuse to allocate more memory to it, since the memory is already given to the 
previous requested object(ExternalAppendOnlyMap). That causes the 
ExternalSorter keeps spilling small files(due to the lack of memory)

I'm currently working on a PR to address these two issues. It will include 
following changes:

1. The ShuffleMemoryManager should not only track the memory usage for each 
thread, but also the object who holds the memory
2. The ShuffleMemoryManager should be able to trigger the spilling of a 
spillable object. In this way, if a new object in a thread is requesting 
memory, the old occupant could be evicted/spilled. Previously the spillable 
objects trigger spilling by themselves. So one may not trigger spilling even if 
another object in the same thread needs more memory. After this change The 
ShuffleMemoryManager could trigger the spilling of an object if it needs to.
3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
after the iterator is returned. This should be changed so that even after the 
iterator is returned, the ShuffleMemoryManager can still spill it.

Currently, I have a working branch in progress: 
https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
change 3 and have a prototype of change 1 and 2 to evict spillable from memory 
manager, still in progress. I will send a PR when it's done.

Any feedback or thoughts on this change is highly appreciated !


  was:
When an Aggregator is used with ExternalSorter in a task, spark will create 
many small files and could cause too many files open error during merging.

Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by 
Aggregator) in this case. Here is an example: Due to the usage of mapside 
aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask 
as much memory as it can, which is totalMem/numberOfThreads. Then later on when 
ExternalSorter is created in the same thread, the ShuffleMemoryManager could 
refuse to allocate more memory to it, since the memory is already given to the 
previous requested object(ExternalAppendOnlyMap). That causes the 
ExternalSorter keeps spilling small files(due to the lack of memory)

I'm currently working on a PR to address these two issues. It will include 
following changes

1. The ShuffleMemoryManager should not only track the memory usage for each 
thread, but also the object who holds the memory
2. The ShuffleMemoryManager should be able to trigger the spilling of a 
spillable object. In this way, if a new object in a thread is requesting 
memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
happening. Previously spillable object triggers spilling by themself. So one 
may not trigger spilling even if another object in the same thread needs more 
memory. After this change The ShuffleMemoryManager could trigger the spilling 
of an object if it needs to
3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
after the iterator is returned. This should be changed so that even after the 
iterator is returned, the ShuffleMemoryManager can still spill it.

Currently, I have a working branch in progress: 
https://github.com/tsdeng/spark/tree/enhance_memory_manager 

Already made change 3 and have a prototype of change 1 and 2 to evict spillable 
from memory manager, still in progress.
I will send a PR when it's done.

Any feedback or thoughts on this change is highly appreciated !



> Shuffle data structures can starve others on the same thread for memory 
> ------------------------------------------------------------------------
>
>                 Key: SPARK-4452
>                 URL: https://issues.apache.org/jira/browse/SPARK-4452
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Tianshuo Deng
>            Assignee: Tianshuo Deng
>            Priority: Blocker
>
> When an Aggregator is used with ExternalSorter in a task, spark will create 
> many small files and could cause too many files open error during merging.
> Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
> objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
> by Aggregator) in this case. Here is an example: Due to the usage of mapside 
> aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
> ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
> on when ExternalSorter is created in the same thread, the 
> ShuffleMemoryManager could refuse to allocate more memory to it, since the 
> memory is already given to the previous requested 
> object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
> small files(due to the lack of memory)
> I'm currently working on a PR to address these two issues. It will include 
> following changes:
> 1. The ShuffleMemoryManager should not only track the memory usage for each 
> thread, but also the object who holds the memory
> 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
> spillable object. In this way, if a new object in a thread is requesting 
> memory, the old occupant could be evicted/spilled. Previously the spillable 
> objects trigger spilling by themselves. So one may not trigger spilling even 
> if another object in the same thread needs more memory. After this change The 
> ShuffleMemoryManager could trigger the spilling of an object if it needs to.
> 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
> ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
> after the iterator is returned. This should be changed so that even after the 
> iterator is returned, the ShuffleMemoryManager can still spill it.
> Currently, I have a working branch in progress: 
> https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
> change 3 and have a prototype of change 1 and 2 to evict spillable from 
> memory manager, still in progress. I will send a PR when it's done.
> Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to