Xianjin YE created SPARK-24293:
----------------------------------

             Summary: Serialized shuffle supports mapSideCombine
                 Key: SPARK-24293
                 URL: https://issues.apache.org/jira/browse/SPARK-24293
             Project: Spark
          Issue Type: Brainstorming
          Components: Shuffle
    Affects Versions: 2.3.0
            Reporter: Xianjin YE


While doing research on integrating my company's internal Shuffle Service with 
Spark, I found it is possible to support mapSideCombine with serialized shuffle.

The simple idea is that the `UnsafeShuffleWriter` uses a `Combiner` to 
accumulate records when mapSideCombine is required before inserting into 
`ShuffleExternalSorter`. The `Combiner` will tracking it's memory usage or 
elements accumulated and is never spilled. When the `Combiner` accumulates 
enough records(varied by different strategies), the accumulated (K, C) pairs 
are then inserted into the `ShuffleExternalSorter`.  After that, the `Combiner` 
is reset to empty state.

After this change, combinedValues are sent to sorter segment by segment, and 
the `BlockStoreShuffleReader` already handles this case.

I did a local POC, and looks like that I can get the same result with normal 
SortShuffle. The performance is not optimized yet. The most significant part of 
code is shown as below: 
{code:java}
// code placeholder
while (records.hasNext()) {
  Product2<K, V> record = records.next();
  if (this.mapSideCombine) {
    this.aggregator.accumulateRecord(record);
    if (this.aggregator.accumulatedKeyNum() >= 160_000) { // for poc
      scala.collection.Iterator<Tuple2<K, C>> combinedIterator = 
this.aggregator.accumulatedIterator();
      while (combinedIterator.hasNext()) {
        insertRecordIntoSorter(combinedIterator.next());
      }
      this.aggregator.resetAccumulation();
    }
  } else {
    insertRecordIntoSorter(record);
  }
}
if (this.mapSideCombine && this.aggregator.accumulatedKeyNum() > 0) {
  scala.collection.Iterator<Tuple2<K, C>> combinedIterator = 
this.aggregator.accumulatedIterator();
  while (combinedIterator.hasNext()) {
    insertRecordIntoSorter(combinedIterator.next());
  }
  this.aggregator.resetAccumulation(1);
}
{code}
 

 Is there something I am missing? cc [~joshrosen] [~cloud_fan] [~XuanYuan]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to