[
https://issues.apache.org/jira/browse/HIVE-16980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071875#comment-16071875
]
liyunzhang_intel commented on HIVE-16980:
-----------------------------------------
[~lirui]: the reason why we only use 1 reducer is because there is a bug in
SetSparkReducerParallelism#process in 3TB scale. We use
[numberOfByteshttps://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L129]
to collect the numberOfBytes of sibling of specified RS. We use Long type and
it happens overflow when the data is too big. After happening this situation,
the parallelism is decided by
[sparkMemoryAndCores.getSecond()|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L184]
if spark.dynamic.allocation.enabled is true, sparkMemoryAndCores.getSecond is
a dymamic value which is decided by spark runtime. For example, the value of
sparkMemoryAndCores.getSecond is 5 or 15 randomly. There is possibility that
the value may be 1. The may problem here is the overflow of addition of Long
type. You can reproduce the overflow problem by following code
{code}
public static void main(String[] args) {
long a1= 9223372036854775807L;
long a2=1022672;
long res = a1+a2;
System.out.println(res); //-9223372036853753137
BigInteger b1= BigInteger.valueOf(a1);
BigInteger b2 = BigInteger.valueOf(a2);
BigInteger bigRes = b1.add(b2);
System.out.println(bigRes); //9223372036855798479
}
{code}
> The partition of join is not divided evently in HOS
> ---------------------------------------------------
>
> Key: HIVE-16980
> URL: https://issues.apache.org/jira/browse/HIVE-16980
> Project: Hive
> Issue Type: Bug
> Reporter: liyunzhang_intel
> Attachments: HIVE-16980_screenshot.png, query17_explain.log
>
>
> In HoS,the join implementation is union+repartition sort. We use
> HashPartitioner to partition the result of union.
> SortByShuffler.java
> {code}
> public JavaPairRDD<HiveKey, BytesWritable> shuffle(
> JavaPairRDD<HiveKey, BytesWritable> input, int numPartitions) {
> JavaPairRDD<HiveKey, BytesWritable> rdd;
> if (totalOrder) {
> if (numPartitions > 0) {
> if (numPartitions > 1 && input.getStorageLevel() ==
> StorageLevel.NONE()) {
> input.persist(StorageLevel.DISK_ONLY());
> sparkPlan.addCachedRDDId(input.id());
> }
> rdd = input.sortByKey(true, numPartitions);
> } else {
> rdd = input.sortByKey(true);
> }
> } else {
> Partitioner partitioner = new HashPartitioner(numPartitions);
> rdd = input.repartitionAndSortWithinPartitions(partitioner);
> }
> return rdd;
> }
> {code}
> In spark history server, i saw that there are 28 tasks in the repartition
> sort period while 21 tasks are finished less than 1s and the remaining 7
> tasks spend long time to execute. Is there any way to make the data evenly
> assigned to every partition?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)