[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056593#comment-16056593
 ] 

liyunzhang_intel commented on HIVE-16840:
-----------------------------------------

[~xuefuz],[~lirui],[~Ferd],[~csun]:  attached is HIVE-16840.1.patch.
Changes
1.  change physical plan at SetSparkReducerParallelism#process.  If 
needSetSparkReucerParallelism return false,this stands for current sink maybe 
an order by limit case. Add newSparkSortRS(actually its type is ReduceSink), 
newSel(actually its type is Sel),newLimit(actually its type is Limit) before 
sink.
original physical plan is 
{code} ...-RS-SEL-LIMIT....{code}
now physical plan is 
{code} ...-newSparkSortRS-newSel-newLimit-RS-LIMIT....{code}
currently i add SetSparkReducerParallelism#getNumReducerForSparkSortRS, it 
returns 10,this set the parallelism for newSparkSortRS. i will update the 
function in next patch.
2. add a property sortLimit in ReduceSinkOperator. If it is true. use partition 
sort  not global sort in GenSparkUtils#getEdgeProperty


Not fully test about the patch, just test a simple qfile, but I think we can 
parallel, you can review, i will start fully test.
{code}

select key,value from src order by key limit 10;

{code}

> Investigate the performance of order by limit in HoS
> ----------------------------------------------------
>
>                 Key: HIVE-16840
>                 URL: https://issues.apache.org/jira/browse/HIVE-16840
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>         Attachments: HIVE-16840.patch
>
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>        ,i_item_desc
>        ,s_state
>        ,count(ss_quantity) as store_sales_quantitycount
>        ,avg(ss_quantity) as store_sales_quantityave
>        ,stddev_samp(ss_quantity) as store_sales_quantitystdev
>        ,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>        ,count(sr_return_quantity) as_store_returns_quantitycount
>        ,avg(sr_return_quantity) as_store_returns_quantityave
>        ,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>        ,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>        ,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>        ,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>        ,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>      ,store_returns
>      ,catalog_sales
>      ,date_dim d1
>      ,date_dim d2
>      ,date_dim d3
>      ,store
>      ,item
>  where d1.d_quarter_name = '2000Q1'
>    and d1.d_date_sk = store_sales.ss_sold_date_sk
>    and item.i_item_sk = store_sales.ss_item_sk
>    and store.s_store_sk = store_sales.ss_store_sk
>    and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>    and store_sales.ss_item_sk = store_returns.sr_item_sk
>    and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>    and store_returns.sr_returned_date_sk = d2.d_date_sk
>    and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>    and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>    and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>    and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>    and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>          ,i_item_desc
>          ,s_state
>  order by i_item_id
>          ,i_item_desc
>          ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
>     Spark
>       Edges:
>         Reducer 10 <- Reducer 9 (SORT, 1)
>         Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
>         Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
>         Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
>         Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
>         Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
>         Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
>         Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
>         Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to