Updated Branches: refs/heads/branch-0.8 8e9bd9371 -> f930dd4bf
Merge pull request #43 from mateiz/kryo-fix Don't allocate Kryo buffers unless needed I noticed that the Kryo serializer could be slower than the Java one by 2-3x on small shuffles because it spend a lot of time initializing Kryo Input and Output objects. This is because our default buffer size for them is very large. Since the serializer is often used on streams, I made the initialization lazy for that, and used a smaller buffer (auto-managed by Kryo) for input. (cherry picked from commit e67d5b962a2adddc073cfc9c99be9012fbb69838) Signed-off-by: Reynold Xin <r...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f930dd4b Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f930dd4b Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f930dd4b Branch: refs/heads/branch-0.8 Commit: f930dd4bf6fcd32e2f7834752c3319078c89111b Parents: 8e9bd93 Author: Reynold Xin <r...@apache.org> Authored: Tue Oct 8 22:57:38 2013 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Tue Oct 8 22:58:35 2013 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f930dd4b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 24ef204..6c500ba 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -38,8 +38,6 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging def newKryoOutput() = new KryoOutput(bufferSize) - def newKryoInput() = new KryoInput(bufferSize) - def newKryo(): Kryo = { val instantiator = new ScalaKryoInstantiator val kryo = instantiator.newKryo() @@ -118,8 +116,10 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { val kryo = ks.newKryo() - val output = ks.newKryoOutput() - val input = ks.newKryoInput() + + // Make these lazy vals to avoid creating a buffer unless we use them + lazy val output = ks.newKryoOutput() + lazy val input = new KryoInput() def serialize[T](t: T): ByteBuffer = { output.clear()