Repository: spark Updated Branches: refs/heads/master 7446f5ff9 -> dd95abada
[SPARK-2399] Add support for LZ4 compression. Based on Greg Bowyer's patch from JIRA https://issues.apache.org/jira/browse/SPARK-2399 Author: Reynold Xin <[email protected]> Closes #1416 from rxin/lz4 and squashes the following commits: 6c8fefe [Reynold Xin] Fixed typo. 8a14d38 [Reynold Xin] [SPARK-2399] Add support for LZ4 compression. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd95abad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd95abad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd95abad Branch: refs/heads/master Commit: dd95abada78b4d0aec97dacda50fdfd74464b073 Parents: 7446f5f Author: Reynold Xin <[email protected]> Authored: Tue Jul 15 01:46:57 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Tue Jul 15 01:46:57 2014 -0700 ---------------------------------------------------------------------- core/pom.xml | 4 ++++ .../org/apache/spark/io/CompressionCodec.scala | 22 ++++++++++++++++++++ .../apache/spark/io/CompressionCodecSuite.scala | 6 ++++++ docs/configuration.md | 10 ++++++++- pom.xml | 5 +++++ 5 files changed, 46 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dd95abad/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index 4ed920a..1054cec 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -115,6 +115,10 @@ <artifactId>snappy-java</artifactId> </dependency> <dependency> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + </dependency> + <dependency> <groupId>com.twitter</groupId> <artifactId>chill_${scala.binary.version}</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/spark/blob/dd95abad/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 4b0fe1a..33402c9 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -20,6 +20,7 @@ package org.apache.spark.io import java.io.{InputStream, OutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} +import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf @@ -61,6 +62,27 @@ private[spark] object CompressionCodec { /** * :: DeveloperApi :: + * LZ4 implementation of [[org.apache.spark.io.CompressionCodec]]. + * Block size can be configured by `spark.io.compression.lz4.block.size`. + * + * Note: The wire protocol for this codec is not guaranteed to be compatible across versions + * of Spark. This is intended for use as an internal compression utility within a single Spark + * application. + */ +@DeveloperApi +class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { + + override def compressedOutputStream(s: OutputStream): OutputStream = { + val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768) + new LZ4BlockOutputStream(s, blockSize) + } + + override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s) +} + + +/** + * :: DeveloperApi :: * LZF implementation of [[org.apache.spark.io.CompressionCodec]]. * * Note: The wire protocol for this codec is not guaranteed to be compatible across versions http://git-wip-us.apache.org/repos/asf/spark/blob/dd95abad/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 68a0ea3..42fc395 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -50,6 +50,12 @@ class CompressionCodecSuite extends FunSuite { testCodec(codec) } + test("lz4 compression codec") { + val codec = CompressionCodec.createCodec(conf, classOf[LZ4CompressionCodec].getName) + assert(codec.getClass === classOf[LZ4CompressionCodec]) + testCodec(codec) + } + test("lzf compression codec") { val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName) assert(codec.getClass === classOf[LZFCompressionCodec]) http://git-wip-us.apache.org/repos/asf/spark/blob/dd95abad/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 07aa4c0..19fd980 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -350,7 +350,15 @@ Apart from these, the following properties are also available, and may be useful <td>32768</td> <td> Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec - is used. + is used. Lowering this block size will also lower shuffle memory usage when Snappy is used. + </td> +</tr> +<tr> + <td><code>spark.io.compression.lz4.block.size</code></td> + <td>32768</td> + <td> + Block size (in bytes) used in LZ4 compression, in the case when LZ4 compression codec + is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used. </td> </tr> <tr> http://git-wip-us.apache.org/repos/asf/spark/blob/dd95abad/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index fa80707..d570f3e 100644 --- a/pom.xml +++ b/pom.xml @@ -298,6 +298,11 @@ <version>1.0.5</version> </dependency> <dependency> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + <version>1.2.0</version> + </dependency> + <dependency> <groupId>com.clearspring.analytics</groupId> <artifactId>stream</artifactId> <version>2.7.0</version>
