Enrico,
The below solution works but there is a little glitch.
It is working fine in spark-shell but failing for *_/skewed keys/_*
while doing a spark-submit.
while looking into the execution plan, the partitioning value is same
for both repartition and groupByKey and is driven by the value
"spark.sql.shuffle.partitions"
like: Exchange hashpartitioning(value#143, 200)
Any ideas on why is skewed keys giving wrong output while the same code
giving correct in spark-shell?
--Abhinav
On 26/03/20 10:54 pm, Enrico Minack wrote:
Abhinav,
you can repartition by your key, then sortWithinPartition, and the
groupByKey. Since data are already hash-partitioned by key, Spark
should not shuffle the data hence change the sort wihtin each partition:
ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")
Enrico
Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:
Hi,
I have a dataframe which has data like:
key | code | code_value
1 | c1 | 11
1 | c2 | 12
1 | c2 | 9
1 | c3 | 12
1 | c2 | 13
1 | c2 | 14
1 | c4 | 12
1 | c2 | 15
1 | c1 | 12
I need to group the data based on key and then apply some custom
logic on every of the value I got by grouping. So I did this:
lets suppose it is in a dataframe df.
*case class key_class(key: string, code: string, code_value: string)*
df
.as[key_class]
.groupByKey(_.key)
.mapGroups {
(x, groupedValues) =>
val status = groupedValues.map(row => {
// do some custom logic on row
("SUCCESS")
}).toList
}.toDF("status")
The issue with above approach is the values I get after applying
groupByKey are not sorted/ordered. I want the values to be sorted by
the column 'code'.
There is a way to do this:
1. get them in a list and then apply sort ==> this will result in OOM
if the iterartor is too big.
2. I think some how to apply the secondary sort, but problem with
that approach is I have to keep track of the key change.
3. sortWithinPartitions cannot be applied because groupBy will mess
up the order.
4. Another approach is:
df
.as[key_class]
.sort("key").sort("code")
.map {
// do stuff here
}
but here also I have to keep track of the key change within map
function, and sometimes this also overflows if the keys are skewed.
_/*So is there any way in which I can get the values sorted after
grouping them by a key.??*/_
_/*
*/_
_/*Thanks,*/_
_/*Abhinav
*/_