Updated Branches: refs/heads/master ff4473217 -> 3d6e75419
Fix bug on read-side of external sort when using Snappy. This case wasn't handled correctly and this patch fixes it. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0213b403 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0213b403 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0213b403 Branch: refs/heads/master Commit: 0213b4032a78d621405105365119677edc663b1b Parents: 6285513 Author: Patrick Wendell <pwend...@gmail.com> Authored: Thu Jan 23 17:59:42 2014 -0800 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Thu Jan 23 18:04:55 2014 -0800 ---------------------------------------------------------------------- .../spark/util/collection/ExternalAppendOnlyMap.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0213b403/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 64e9b43..eb3acf4 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -331,7 +331,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] { val fileStream = new FileInputStream(file) val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize) - val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) + + val shouldCompress = blockManager.shouldCompress(blockId) + val compressionCodec = new LZFCompressionCodec(sparkConf) + val compressedStream = + if (shouldCompress) { + compressionCodec.compressedInputStream(bufferedStream) + } else { + bufferedStream + } var deserializeStream = ser.deserializeStream(compressedStream) var objectsRead = 0
