Repository: spark Updated Branches: refs/heads/branch-1.1 837bf60fd -> be674b34b
Use transferTo when copy merge files in ExternalSorter Since this is a file to file copy, using transferTo should be faster. Author: Raymond Liu <[email protected]> Closes #1884 from colorant/externalSorter and squashes the following commits: 6e42f3c [Raymond Liu] More code into copyStream bfb496b [Raymond Liu] Use transferTo when copy merge files in ExternalSorter (cherry picked from commit 246cb3f158686348a698d1c0da3001c314727129) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be674b34 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be674b34 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be674b34 Branch: refs/heads/branch-1.1 Commit: be674b34bed93eafeb621cbac5d5bb5f3a60e8f4 Parents: 837bf60 Author: Raymond Liu <[email protected]> Authored: Tue Aug 12 23:19:35 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Tue Aug 12 23:19:45 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/util/Utils.scala | 29 +++++++++++++++----- .../spark/util/collection/ExternalSorter.scala | 7 ++--- 2 files changed, 25 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/be674b34/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c60be4f..8cac5da 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -284,17 +284,32 @@ private[spark] object Utils extends Logging { /** Copy all data from an InputStream to an OutputStream */ def copyStream(in: InputStream, out: OutputStream, - closeStreams: Boolean = false) + closeStreams: Boolean = false): Long = { + var count = 0L try { - val buf = new Array[Byte](8192) - var n = 0 - while (n != -1) { - n = in.read(buf) - if (n != -1) { - out.write(buf, 0, n) + if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) { + // When both streams are File stream, use transferTo to improve copy performance. + val inChannel = in.asInstanceOf[FileInputStream].getChannel() + val outChannel = out.asInstanceOf[FileOutputStream].getChannel() + val size = inChannel.size() + + // In case transferTo method transferred less data than we have required. + while (count < size) { + count += inChannel.transferTo(count, size - count, outChannel) + } + } else { + val buf = new Array[Byte](8192) + var n = 0 + while (n != -1) { + n = in.read(buf) + if (n != -1) { + out.write(buf, 0, n) + count += n + } } } + count } finally { if (closeStreams) { try { http://git-wip-us.apache.org/repos/asf/spark/blob/be674b34/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index b73d5e0..5d8a648 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -745,12 +745,11 @@ private[spark] class ExternalSorter[K, V, C]( try { out = new FileOutputStream(outputFile) for (i <- 0 until numPartitions) { - val file = partitionWriters(i).fileSegment().file - in = new FileInputStream(file) - org.apache.spark.util.Utils.copyStream(in, out) + in = new FileInputStream(partitionWriters(i).fileSegment().file) + val size = org.apache.spark.util.Utils.copyStream(in, out, false) in.close() in = null - lengths(i) = file.length() + lengths(i) = size offsets(i + 1) = offsets(i) + lengths(i) } } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
