[
https://issues.apache.org/jira/browse/HIVE-16980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069708#comment-16069708
]
liyunzhang_intel commented on HIVE-16980:
-----------------------------------------
[~lirui] and [~xuefuz]: attached is the screenshot of TPC-DS/query17.sql on
3TB.
TPC-DS/query17.sql
{code}
select i_item_id
,i_item_desc
,s_state
,count(ss_quantity) as store_sales_quantitycount
,avg(ss_quantity) as store_sales_quantityave
,stddev_samp(ss_quantity) as store_sales_quantitystdev
,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
,count(sr_return_quantity) as_store_returns_quantitycount
,avg(sr_return_quantity) as_store_returns_quantityave
,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as
store_returns_quantitycov
,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) as
catalog_sales_quantityave
,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitystdev
,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
from store_sales
,store_returns
,catalog_sales
,date_dim d1
,date_dim d2
,date_dim d3
,store
,item
where d1.d_quarter_name = '2000Q1'
and d1.d_date_sk = store_sales.ss_sold_date_sk
and item.i_item_sk = store_sales.ss_item_sk
and store.s_store_sk = store_sales.ss_store_sk
and store_sales.ss_customer_sk = store_returns.sr_customer_sk
and store_sales.ss_item_sk = store_returns.sr_item_sk
and store_sales.ss_ticket_number = store_returns.sr_ticket_number
and store_returns.sr_returned_date_sk = d2.d_date_sk
and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
and store_returns.sr_item_sk = catalog_sales.cs_item_sk
and catalog_sales.cs_sold_date_sk = d3.d_date_sk
and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
group by i_item_id
,i_item_desc
,s_state
order by i_item_id
,i_item_desc
,s_state
limit 100;
{code}
explain is also attached.
Let's explain the explain
store, item, d2,d3, d1 is small table.
store_sales, store_returns, ctalog_sales are big table.
there are 7 stages in the job
Stage-0: d2 union d3 union store union item ( all these small table will
be converted to map join. Here first strange thing is d1 is also small ,why d1
is in the first stage-0)
Stage-1
Reducer 2 <- Map 1 (store_sales), Map 7 (store_returns)
Reducer 3 <- Map 8 (catalog_sales), Reducer 2
Reducer 4 <- Map 9 (d1), Reducer 3
Reducer 5 <- Reducer 4 (GROUP)
Reducer 6 <- Reducer 5 (SORT)
the screenshot is about Stage :Reducer 3 <- Map 8 (catalog_sales), Reducer 2 "
. In the history server, it shows 2178 tasks finished, Median of duration time
is 4s. 75 percentile of duration is 20 min. Max of duration time 32min. About
Shuffle Read size/Records, Median of it is 0.0B/0. 75 percentile of it is
274.9MB/8695090. Max of it is 275.3MB/8709548. I don't understand these
metrics very much but it seems that the difference between tasks are too big
especially some tasks need a lot of shuffle read while others are not. Can you
help to see where is wrong?
> The partition of join is not divided evently in HOS
> ---------------------------------------------------
>
> Key: HIVE-16980
> URL: https://issues.apache.org/jira/browse/HIVE-16980
> Project: Hive
> Issue Type: Bug
> Reporter: liyunzhang_intel
>
> In HoS,the join implementation is union+repartition sort. We use
> HashPartitioner to partition the result of union.
> SortByShuffler.java
> {code}
> public JavaPairRDD<HiveKey, BytesWritable> shuffle(
> JavaPairRDD<HiveKey, BytesWritable> input, int numPartitions) {
> JavaPairRDD<HiveKey, BytesWritable> rdd;
> if (totalOrder) {
> if (numPartitions > 0) {
> if (numPartitions > 1 && input.getStorageLevel() ==
> StorageLevel.NONE()) {
> input.persist(StorageLevel.DISK_ONLY());
> sparkPlan.addCachedRDDId(input.id());
> }
> rdd = input.sortByKey(true, numPartitions);
> } else {
> rdd = input.sortByKey(true);
> }
> } else {
> Partitioner partitioner = new HashPartitioner(numPartitions);
> rdd = input.repartitionAndSortWithinPartitions(partitioner);
> }
> return rdd;
> }
> {code}
> In spark history server, i saw that there are 28 tasks in the repartition
> sort period while 21 tasks are finished less than 1s and the remaining 7
> tasks spend long time to execute. Is there any way to make the data evenly
> assigned to every partition?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)