Repository: spark Updated Branches: refs/heads/master d62da642a -> 7c0ed13d2
[SPARK-4079] [CORE] Consolidates Errors if a CompressionCodec is not available This commit consolidates some of the exceptions thrown if compression codecs are not available. If a bad configuration string was passed in, a ClassNotFoundException was through. Also, if Snappy was not available, it would throw an InvocationTargetException when the codec was being used (not when it was being initialized). Now, an IllegalArgumentException is thrown when a codec is not available at creation time - either because the class does not exist or the codec itself is not available in the system. This will allow us to have a better message and fail faster. Author: Kostas Sakellis <[email protected]> Closes #3119 from ksakellis/kostas-spark-4079 and squashes the following commits: 9709c7c [Kostas Sakellis] Removed unnecessary Logging class 63bfdd0 [Kostas Sakellis] Removed isAvailable to preserve binary compatibility 1d0ef2f [Kostas Sakellis] [SPARK-4079] [CORE] Added more information to exception 64f3d27 [Kostas Sakellis] [SPARK-4079] [CORE] Code review feedback 52dfa8f [Kostas Sakellis] [SPARK-4079] [CORE] Default to LZF if Snappy not available Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c0ed13d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c0ed13d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c0ed13d Branch: refs/heads/master Commit: 7c0ed13d298d9cf66842c667602e2dccb8f5605b Parents: d62da64 Author: Kostas Sakellis <[email protected]> Authored: Mon Dec 22 13:07:01 2014 -0800 Committer: Patrick Wendell <[email protected]> Committed: Mon Dec 22 13:07:01 2014 -0800 ---------------------------------------------------------------------- .../org/apache/spark/io/CompressionCodec.scala | 27 +++++++++++++++----- .../apache/spark/io/CompressionCodecSuite.scala | 6 +++++ 2 files changed, 27 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7c0ed13d/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 1ac7f4e..f856890 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -21,11 +21,12 @@ import java.io.{InputStream, OutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} -import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} +import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils +import org.apache.spark.Logging /** * :: DeveloperApi :: @@ -44,25 +45,33 @@ trait CompressionCodec { def compressedInputStream(s: InputStream): InputStream } - private[spark] object CompressionCodec { + private val configKey = "spark.io.compression.codec" private val shortCompressionCodecNames = Map( "lz4" -> classOf[LZ4CompressionCodec].getName, "lzf" -> classOf[LZFCompressionCodec].getName, "snappy" -> classOf[SnappyCompressionCodec].getName) def createCodec(conf: SparkConf): CompressionCodec = { - createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC)) + createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC)) } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName) - val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader) - .getConstructor(classOf[SparkConf]) - ctor.newInstance(conf).asInstanceOf[CompressionCodec] + val codec = try { + val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader) + .getConstructor(classOf[SparkConf]) + Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec]) + } catch { + case e: ClassNotFoundException => None + case e: IllegalArgumentException => None + } + codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " + + s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC")) } + val FALLBACK_COMPRESSION_CODEC = "lzf" val DEFAULT_COMPRESSION_CODEC = "snappy" val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq } @@ -120,6 +129,12 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { @DeveloperApi class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { + try { + Snappy.getNativeLibraryVersion + } catch { + case e: Error => throw new IllegalArgumentException + } + override def compressedOutputStream(s: OutputStream): OutputStream = { val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768) new SnappyOutputStream(s, blockSize) http://git-wip-us.apache.org/repos/asf/spark/blob/7c0ed13d/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 25be7f2..8c6035f 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -85,4 +85,10 @@ class CompressionCodecSuite extends FunSuite { assert(codec.getClass === classOf[SnappyCompressionCodec]) testCodec(codec) } + + test("bad compression codec") { + intercept[IllegalArgumentException] { + CompressionCodec.createCodec(conf, "foobar") + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
