[ 
https://issues.apache.org/jira/browse/SPARK-24293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16478406#comment-16478406
 ] 

Li Yuanjian commented on SPARK-24293:
-------------------------------------

{quote}
doing the map side combine manually with SQL operators, to get SQL 
optimizations like whole-stage-codegen.
{quote}
Thanks for [~cloud_fan] detailed explanation, this resolved our confusion of 
why map-side combine not supported in SerializedShuffle.


> Serialized shuffle supports mapSideCombine
> ------------------------------------------
>
>                 Key: SPARK-24293
>                 URL: https://issues.apache.org/jira/browse/SPARK-24293
>             Project: Spark
>          Issue Type: Brainstorming
>          Components: Shuffle
>    Affects Versions: 2.3.0
>            Reporter: Xianjin YE
>            Priority: Major
>
> While doing research on integrating my company's internal Shuffle Service 
> with Spark, I found it is possible to support mapSideCombine with serialized 
> shuffle.
> The simple idea is that the `UnsafeShuffleWriter` uses a `Combiner` to 
> accumulate records when mapSideCombine is required before inserting into 
> `ShuffleExternalSorter`. The `Combiner` will tracking it's memory usage or 
> elements accumulated and is never spilled. When the `Combiner` accumulates 
> enough records(varied by different strategies), the accumulated (K, C) pairs 
> are then inserted into the `ShuffleExternalSorter`.  After that, the 
> `Combiner` is reset to empty state.
> After this change, combinedValues are sent to sorter segment by segment, and 
> the `BlockStoreShuffleReader` already handles this case.
> I did a local POC, and looks like that I can get the same result with normal 
> SortShuffle. The performance is not optimized yet. The most significant part 
> of code is shown as below: 
> {code:java}
> // code placeholder
> while (records.hasNext()) {
>   Product2<K, V> record = records.next();
>   if (this.mapSideCombine) {
>     this.aggregator.accumulateRecord(record);
>     if (this.aggregator.accumulatedKeyNum() >= 160_000) { // for poc
>       scala.collection.Iterator<Tuple2<K, C>> combinedIterator = 
> this.aggregator.accumulatedIterator();
>       while (combinedIterator.hasNext()) {
>         insertRecordIntoSorter(combinedIterator.next());
>       }
>       this.aggregator.resetAccumulation();
>     }
>   } else {
>     insertRecordIntoSorter(record);
>   }
> }
> if (this.mapSideCombine && this.aggregator.accumulatedKeyNum() > 0) {
>   scala.collection.Iterator<Tuple2<K, C>> combinedIterator = 
> this.aggregator.accumulatedIterator();
>   while (combinedIterator.hasNext()) {
>     insertRecordIntoSorter(combinedIterator.next());
>   }
>   this.aggregator.resetAccumulation(1);
> }
> {code}
>  
>  Is there something I am missing? cc [~joshrosen] [~cloud_fan] [~XuanYuan]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to