[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628573#comment-14628573
 ] 

Yin Huai commented on SPARK-8890:
---------------------------------

[~ilganeli] Let me try to explain it. 

The background is that when we save data to underlying filesystem (e.g. HDFS), 
we want to partition our data first. So, when we read the data, we can take 
advantage of this physical layout and ignore those unnecessary partitions 
stored in the filesystem. For example, we have a dataframe with to columns 
{{key}} and {{value}} and we want to save this dataframe and want the saved 
data to be partitioned the value of {{key}}. So, the layout of saved data will 
be something like 
{code}
root_path/key=1/file1
root_path/key=1/file2
root_path/key=2/file1
root_path/key=2/file1
...
{code}

Because the data we get may not be clustered by the value of {{key}}, for a 
task, whenever it sees a new value of {{key}}, it needs to create a new output 
writer, which occupies certain amount of memory to buffer data (e.g. every 
Parquet output writer will use 128M to buffer data). When we have many distinct 
values of {{key}}, we will have many output writers, which can cause OOM. What 
[~rxin] is suggesting at here is to first try to create output writer for every 
distinct value of {{key}}. When we find there are too many distinct values 
(e.g. 50), we redirect all of future input rows to a sort operation, which 
sorts rows based on the value of {{key}}. With the help of this sort operation, 
all of our future input rows are clustered by the value of {{key}} and the 
number of output writers that are opened at the same time will be bounded.  

> Reduce memory consumption for dynamic partition insert
> ------------------------------------------------------
>
>                 Key: SPARK-8890
>                 URL: https://issues.apache.org/jira/browse/SPARK-8890
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Reynold Xin
>            Priority: Critical
>
> Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
> table partitions is large. The problem is that we open one output writer for 
> each partition, and when data are randomized and when the number of 
> partitions is large, we open a large number of output writers, leading to OOM.
> The solution here is to inject a sorting operation once the number of active 
> partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to