[
https://issues.apache.org/jira/browse/SPARK-5581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14525907#comment-14525907
]
Josh Rosen commented on SPARK-5581:
-----------------------------------
I also noticed this while working on some new shuffle code.
One way to address this might be to de-couple the commit and close steps.
Maybe we could add a {{commit()}} operation that returns a {{FileSegment}}.
There may be some subtleties to watch out for in how we handle compression and
serialization output streams. The current code opens new serialization and
compression streams for each partition and we might have to do the same thing
even if we re-use the underlying FileOutputStream. It's possible that we could
"reset" the compression/serialization streams without closing them, but some
implementations might not support this (for example, SnappyOutputStream writes
a magic number at the beginning of the stream, so we'd need to make sure that
this gets written to the start of each partition). This means that we might not
gain the full benefits of being able to re-use the buffers in the compression
output streams.
If the cost of opening an output stream is sufficiently high, though, then even
just re-using the FileOutputStream could be a win, especially in cases where we
have thousands of small (since this would be an order-of-magnitude reduction in
the number of file handles opened).
> When writing sorted map output file, avoid open / close between each partition
> ------------------------------------------------------------------------------
>
> Key: SPARK-5581
> URL: https://issues.apache.org/jira/browse/SPARK-5581
> Project: Spark
> Issue Type: Improvement
> Components: Shuffle
> Affects Versions: 1.3.0
> Reporter: Sandy Ryza
>
> {code}
> // Bypassing merge-sort; get an iterator by partition and just write
> everything directly.
> for ((id, elements) <- this.partitionedIterator) {
> if (elements.hasNext) {
> val writer = blockManager.getDiskWriter(
> blockId, outputFile, ser, fileBufferSize,
> context.taskMetrics.shuffleWriteMetrics.get)
> for (elem <- elements) {
> writer.write(elem)
> }
> writer.commitAndClose()
> val segment = writer.fileSegment()
> lengths(id) = segment.length
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]