Repository: spark
Updated Branches:
  refs/heads/master 6cb06e871 -> 75c60bf4b


[SPARK-12074] Avoid memory copy involving 
ByteBuffer.wrap(ByteArrayOutputStream.toByteArray)

SPARK-12060 fixed JavaSerializerInstance.serialize
This PR applies the same technique on two other classes.

zsxwing

Author: tedyu <yuzhih...@gmail.com>

Closes #10177 from tedyu/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75c60bf4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75c60bf4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75c60bf4

Branch: refs/heads/master
Commit: 75c60bf4ba91e45e76a6e27f054a1c550eb6ff94
Parents: 6cb06e8
Author: tedyu <yuzhih...@gmail.com>
Authored: Tue Dec 8 10:01:44 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Dec 8 10:01:44 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/scheduler/Task.scala     | 7 +++----
 .../main/scala/org/apache/spark/storage/BlockManager.scala    | 4 ++--
 .../scala/org/apache/spark/util/ByteBufferOutputStream.scala  | 4 +++-
 3 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/75c60bf4/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 5fe5ae8..d4bc3a5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -27,8 +27,7 @@ import org.apache.spark.{Accumulator, SparkEnv, 
TaskContextImpl, TaskContext}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.ByteBufferInputStream
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
 
 
 /**
@@ -172,7 +171,7 @@ private[spark] object Task {
       serializer: SerializerInstance)
     : ByteBuffer = {
 
-    val out = new ByteArrayOutputStream(4096)
+    val out = new ByteBufferOutputStream(4096)
     val dataOut = new DataOutputStream(out)
 
     // Write currentFiles
@@ -193,7 +192,7 @@ private[spark] object Task {
     dataOut.flush()
     val taskBytes = serializer.serialize(task)
     Utils.writeByteBuffer(taskBytes, out)
-    ByteBuffer.wrap(out.toByteArray)
+    out.toByteBuffer
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/75c60bf4/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ab0007f..ed05143 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1202,9 +1202,9 @@ private[spark] class BlockManager(
       blockId: BlockId,
       values: Iterator[Any],
       serializer: Serializer = defaultSerializer): ByteBuffer = {
-    val byteStream = new ByteArrayOutputStream(4096)
+    val byteStream = new ByteBufferOutputStream(4096)
     dataSerializeStream(blockId, byteStream, values, serializer)
-    ByteBuffer.wrap(byteStream.toByteArray)
+    byteStream.toByteBuffer
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/75c60bf4/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
index 92e4522..8527e3a 100644
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -23,7 +23,9 @@ import java.nio.ByteBuffer
 /**
  * Provide a zero-copy way to convert data in ByteArrayOutputStream to 
ByteBuffer
  */
-private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
+private[spark] class ByteBufferOutputStream(capacity: Int) extends 
ByteArrayOutputStream(capacity) {
+
+  def this() = this(32)
 
   def toByteBuffer: ByteBuffer = {
     return ByteBuffer.wrap(buf, 0, count)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to