Updated Branches:
  refs/heads/master ff4473217 -> 3d6e75419

Fix bug on read-side of external sort when using Snappy.

This case wasn't handled correctly and this patch fixes it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0213b403
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0213b403
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0213b403

Branch: refs/heads/master
Commit: 0213b4032a78d621405105365119677edc663b1b
Parents: 6285513
Author: Patrick Wendell <pwend...@gmail.com>
Authored: Thu Jan 23 17:59:42 2014 -0800
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Thu Jan 23 18:04:55 2014 -0800

----------------------------------------------------------------------
 .../spark/util/collection/ExternalAppendOnlyMap.scala     | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0213b403/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 64e9b43..eb3acf4 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -331,7 +331,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
   private class DiskMapIterator(file: File, blockId: BlockId) extends 
Iterator[(K, C)] {
     val fileStream = new FileInputStream(file)
     val bufferedStream = new FastBufferedInputStream(fileStream, 
fileBufferSize)
-    val compressedStream = blockManager.wrapForCompression(blockId, 
bufferedStream)
+
+    val shouldCompress = blockManager.shouldCompress(blockId)
+    val compressionCodec = new LZFCompressionCodec(sparkConf)
+    val compressedStream =
+      if (shouldCompress) {
+        compressionCodec.compressedInputStream(bufferedStream)
+      } else {
+        bufferedStream
+      }
     var deserializeStream = ser.deserializeStream(compressedStream)
     var objectsRead = 0
 

Reply via email to