[ 
https://issues.apache.org/jira/browse/HIVE-16980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071895#comment-16071895
 ] 

Rui Li commented on HIVE-16980:
-------------------------------

[~kellyzly], nice catch of the Long overflow!
As to data skew, it's usually due to some skewed keys in your data. E.g. when 
you join A and B on A.key = B.key, and most of keys in A are 1. Since same keys 
have to go to the same reducer, then you'll have one reducer handling most data 
from A. {{hive.optimize.skewjoin}} determines skew at runtime, and joins the 
skewed data with a map join. It can mitigate the problem, but skewed data still 
has to be shuffled to the one reducer.
I don't think hash code can lead to skew (possible in some extreme case though, 
like HIVE-14797).

For your case, my suggestion is to force more reducers and verify if skew 
really exists. If so, enable {{hive.optimize.skewjoin}} and see if it helps.

> The partition of join is not divided evently in HOS
> ---------------------------------------------------
>
>                 Key: HIVE-16980
>                 URL: https://issues.apache.org/jira/browse/HIVE-16980
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang_intel
>         Attachments: HIVE-16980_screenshot.png, query17_explain.log
>
>
> In HoS,the join implementation is union+repartition sort. We use 
> HashPartitioner to partition the result of union. 
> SortByShuffler.java
> {code}
>     public JavaPairRDD<HiveKey, BytesWritable> shuffle(
>       JavaPairRDD<HiveKey, BytesWritable> input, int numPartitions) {
>     JavaPairRDD<HiveKey, BytesWritable> rdd;
>     if (totalOrder) {
>       if (numPartitions > 0) {
>         if (numPartitions > 1 && input.getStorageLevel() == 
> StorageLevel.NONE()) {
>           input.persist(StorageLevel.DISK_ONLY());
>           sparkPlan.addCachedRDDId(input.id());
>         }
>         rdd = input.sortByKey(true, numPartitions);
>       } else {
>         rdd = input.sortByKey(true);
>       }
>     } else {
>       Partitioner partitioner = new HashPartitioner(numPartitions);
>       rdd = input.repartitionAndSortWithinPartitions(partitioner);
>     }
>     return rdd;
>   }
> {code}
> In spark history server, i saw that there are 28 tasks in the repartition 
> sort period while 21 tasks are finished less than 1s and the remaining 7 
> tasks spend long time to execute. Is there any way to make the data evenly 
> assigned to every partition?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to