Repository: spark
Updated Branches:
  refs/heads/master d62da642a -> 7c0ed13d2


[SPARK-4079] [CORE] Consolidates Errors if a CompressionCodec is not available

This commit consolidates some of the exceptions thrown if compression codecs 
are not available. If a bad configuration string was passed in, a 
ClassNotFoundException was through. Also, if Snappy was not available, it would 
throw an InvocationTargetException when the codec was being used (not when it 
was being initialized). Now, an IllegalArgumentException is thrown when a codec 
is not available at creation time - either because the class does not exist or 
the codec itself is not available in the system. This will allow us to have a 
better message and fail faster.

Author: Kostas Sakellis <[email protected]>

Closes #3119 from ksakellis/kostas-spark-4079 and squashes the following 
commits:

9709c7c [Kostas Sakellis] Removed unnecessary Logging class
63bfdd0 [Kostas Sakellis] Removed isAvailable to preserve binary compatibility
1d0ef2f [Kostas Sakellis] [SPARK-4079] [CORE] Added more information to 
exception
64f3d27 [Kostas Sakellis] [SPARK-4079] [CORE] Code review feedback
52dfa8f [Kostas Sakellis] [SPARK-4079] [CORE] Default to LZF if Snappy not 
available


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c0ed13d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c0ed13d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c0ed13d

Branch: refs/heads/master
Commit: 7c0ed13d298d9cf66842c667602e2dccb8f5605b
Parents: d62da64
Author: Kostas Sakellis <[email protected]>
Authored: Mon Dec 22 13:07:01 2014 -0800
Committer: Patrick Wendell <[email protected]>
Committed: Mon Dec 22 13:07:01 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/io/CompressionCodec.scala  | 27 +++++++++++++++-----
 .../apache/spark/io/CompressionCodecSuite.scala |  6 +++++
 2 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c0ed13d/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 1ac7f4e..f856890 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -21,11 +21,12 @@ import java.io.{InputStream, OutputStream}
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
-import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.Utils
+import org.apache.spark.Logging
 
 /**
  * :: DeveloperApi ::
@@ -44,25 +45,33 @@ trait CompressionCodec {
   def compressedInputStream(s: InputStream): InputStream
 }
 
-
 private[spark] object CompressionCodec {
 
+  private val configKey = "spark.io.compression.codec"
   private val shortCompressionCodecNames = Map(
     "lz4" -> classOf[LZ4CompressionCodec].getName,
     "lzf" -> classOf[LZFCompressionCodec].getName,
     "snappy" -> classOf[SnappyCompressionCodec].getName)
 
   def createCodec(conf: SparkConf): CompressionCodec = {
-    createCodec(conf, conf.get("spark.io.compression.codec", 
DEFAULT_COMPRESSION_CODEC))
+    createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC))
   }
 
   def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
     val codecClass = 
shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
-    val ctor = Class.forName(codecClass, true, 
Utils.getContextOrSparkClassLoader)
-      .getConstructor(classOf[SparkConf])
-    ctor.newInstance(conf).asInstanceOf[CompressionCodec]
+    val codec = try {
+      val ctor = Class.forName(codecClass, true, 
Utils.getContextOrSparkClassLoader)
+        .getConstructor(classOf[SparkConf])
+      Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
+    } catch {
+      case e: ClassNotFoundException => None
+      case e: IllegalArgumentException => None
+    }
+    codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is 
not available. " +
+      s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
   }
 
+  val FALLBACK_COMPRESSION_CODEC = "lzf"
   val DEFAULT_COMPRESSION_CODEC = "snappy"
   val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
 }
@@ -120,6 +129,12 @@ class LZFCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 @DeveloperApi
 class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
+  try {
+    Snappy.getNativeLibraryVersion
+  } catch {
+    case e: Error => throw new IllegalArgumentException
+  }
+
   override def compressedOutputStream(s: OutputStream): OutputStream = {
     val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 
32768)
     new SnappyOutputStream(s, blockSize)

http://git-wip-us.apache.org/repos/asf/spark/blob/7c0ed13d/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala 
b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 25be7f2..8c6035f 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -85,4 +85,10 @@ class CompressionCodecSuite extends FunSuite {
     assert(codec.getClass === classOf[SnappyCompressionCodec])
     testCodec(codec)
   }
+
+  test("bad compression codec") {
+    intercept[IllegalArgumentException] {
+      CompressionCodec.createCodec(conf, "foobar")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to