[ 
https://issues.apache.org/jira/browse/SPARK-19222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sujith updated SPARK-19222:
---------------------------
    Description: 
When limit is being added in the middle of the physical plan there will 
be possibility of memory bottleneck 
if the limit value is too large and system will try to aggregate all the 
partition limit values as part of single partition. 
Description: 
Eg: 
create table src_temp as select * from src limit n;    (n=10000000) 

== Physical Plan  == 
ExecutedCommand 
   +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, 
InsertIntoHiveTable] 
         +- GlobalLimit 10000000 
            +- LocalLimit 10000000 
               +- Project [imei#101, age#102, task#103L, num#104, level#105, 
productdate#106, name#107, point#108] 
                  +- SubqueryAlias hive 
                     +- 
Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108]
 csv  |

As shown in above plan when the limit comes in middle,there can be two 
types of performance bottlenecks. 
scenario 1: when the partition count is very high and limit value is small 
scenario 2: when the limit value is very large 


Eg,current scenario based on following sample data of limit count is 10000000 
and partition count  5 

Local Limit -------- > |partition 1|   |partition 2|   |partition 3|   
|partition 4|   |partition 5|
                                take(n)          take(n)          take(n)       
   take(n)         take(n)
                                          
                                         Shuffle Exchange(single partition)     
                        
Global Limit -------- >                     take(n) (all the partition data 
will be grouped in single partition)                               
  
as the above scenario occurs where system will shuffle and try to group the 
limit data from all partition 
to single partition which will induce performance bottleneck. 

  was:
Performance/memory bottle neck occurs in the below mentioned query
case 1:
{code}
create table t1 as select * from dest1 limit 10000000;
{code}
case 2:
{code}
create table t1 as select * from dest1 limit 1000;
pre-condition : partition count >=10000
{code}
In above cases limit is being added in the terminal of the physical plan 

{code}
== Physical Plan  ==
ExecutedCommand
   +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, 
InsertIntoHiveTable]
         +- GlobalLimit 10000000
            +- LocalLimit 10000000
               +- Project [imei#101, age#102, task#103L, num#104, level#105, 
productdate#106, name#107, point#108]
                  +- SubqueryAlias hive
                     +- 
Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108]
 csv  |
{code}
Issue Hints: 

Possible Bottleneck snippet in limit.scala file under spark-sql package.
{code}
  protected override def doExecute(): RDD[InternalRow] = {
    val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
    val shuffled = new ShuffledRowRDD(
      ShuffleExchange.prepareShuffleDependency(
        locallyLimited, child.output, SinglePartition, serializer))
    shuffled.mapPartitionsInternal(_.take(limit))
  }
{code}

As mentioned in above case 1  (where limit value is 10000000 or partition count 
is > 10000) and case 2(limit value is small(around 1000)), As per the above 
snippet when the {{ShuffledRowRDD}}
is created by grouping all the limit data from different partitions to a single 
partition in executer,  memory issue occurs since all the partition limit data 
will be collected and 
grouped  in a single partition for processing, in both former/later case the 
data count  can go very high which can create the memory bottleneck.

Proposed solution for case 2:
An accumulator value can be to send to all partitions, all executor will be 
updating the accumulator value based on the  data fetched , 
eg: Number of partition = 100, number of cores =10
Ideally tasks will be launched in a group of 10 task/core, once the first group 
finishes the tasks driver will check whether the accumulator value is been 
reached the limit value if its reached then no further tasks will be launched 
to executors and the result after applying limit will be returned.

Please let me now for any suggestions or solutions for the above mentioned 
problems

Thanks,
Sujith


corrected the descriptions

> Limit Query Performance issue
> -----------------------------
>
>                 Key: SPARK-19222
>                 URL: https://issues.apache.org/jira/browse/SPARK-19222
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>         Environment: Linux/Windows
>            Reporter: Sujith
>            Priority: Minor
>
> When limit is being added in the middle of the physical plan there will 
> be possibility of memory bottleneck 
> if the limit value is too large and system will try to aggregate all the 
> partition limit values as part of single partition. 
> Description: 
> Eg: 
> create table src_temp as select * from src limit n;    (n=10000000) 
> == Physical Plan  == 
> ExecutedCommand 
>    +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, 
> InsertIntoHiveTable] 
>          +- GlobalLimit 10000000 
>             +- LocalLimit 10000000 
>                +- Project [imei#101, age#102, task#103L, num#104, level#105, 
> productdate#106, name#107, point#108] 
>                   +- SubqueryAlias hive 
>                      +- 
> Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108]
>  csv  |
> As shown in above plan when the limit comes in middle,there can be two 
> types of performance bottlenecks. 
> scenario 1: when the partition count is very high and limit value is small 
> scenario 2: when the limit value is very large 
> Eg,current scenario based on following sample data of limit count is 10000000 
> and partition count  5 
> Local Limit -------- > |partition 1|   |partition 2|   |partition 3|   
> |partition 4|   |partition 5|
>                                 take(n)          take(n)          take(n)     
>      take(n)         take(n)
>                                           
>                                          Shuffle Exchange(single partition)   
>   
>                         
> Global Limit -------- >                     take(n) (all the partition data 
> will be grouped in single partition)                               
>   
> as the above scenario occurs where system will shuffle and try to group the 
> limit data from all partition 
> to single partition which will induce performance bottleneck. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to