Xianjin YE created SPARK-24293:
----------------------------------
Summary: Serialized shuffle supports mapSideCombine
Key: SPARK-24293
URL: https://issues.apache.org/jira/browse/SPARK-24293
Project: Spark
Issue Type: Brainstorming
Components: Shuffle
Affects Versions: 2.3.0
Reporter: Xianjin YE
While doing research on integrating my company's internal Shuffle Service with
Spark, I found it is possible to support mapSideCombine with serialized shuffle.
The simple idea is that the `UnsafeShuffleWriter` uses a `Combiner` to
accumulate records when mapSideCombine is required before inserting into
`ShuffleExternalSorter`. The `Combiner` will tracking it's memory usage or
elements accumulated and is never spilled. When the `Combiner` accumulates
enough records(varied by different strategies), the accumulated (K, C) pairs
are then inserted into the `ShuffleExternalSorter`. After that, the `Combiner`
is reset to empty state.
After this change, combinedValues are sent to sorter segment by segment, and
the `BlockStoreShuffleReader` already handles this case.
I did a local POC, and looks like that I can get the same result with normal
SortShuffle. The performance is not optimized yet. The most significant part of
code is shown as below:
{code:java}
// code placeholder
while (records.hasNext()) {
Product2<K, V> record = records.next();
if (this.mapSideCombine) {
this.aggregator.accumulateRecord(record);
if (this.aggregator.accumulatedKeyNum() >= 160_000) { // for poc
scala.collection.Iterator<Tuple2<K, C>> combinedIterator =
this.aggregator.accumulatedIterator();
while (combinedIterator.hasNext()) {
insertRecordIntoSorter(combinedIterator.next());
}
this.aggregator.resetAccumulation();
}
} else {
insertRecordIntoSorter(record);
}
}
if (this.mapSideCombine && this.aggregator.accumulatedKeyNum() > 0) {
scala.collection.Iterator<Tuple2<K, C>> combinedIterator =
this.aggregator.accumulatedIterator();
while (combinedIterator.hasNext()) {
insertRecordIntoSorter(combinedIterator.next());
}
this.aggregator.resetAccumulation(1);
}
{code}
Is there something I am missing? cc [~joshrosen] [~cloud_fan] [~XuanYuan]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]