Repository: spark
Updated Branches:
  refs/heads/master 65f75db61 -> 1bb63ae51


[SPARK-24109][CORE] Remove class SnappyOutputStreamWrapper

## What changes were proposed in this pull request?

Remove SnappyOutputStreamWrapper and other workaround now that new Snappy fixes 
these.
See also https://github.com/apache/spark/pull/21176 and comments it links to.

## How was this patch tested?

Existing tests

Closes #22691 from srowen/SPARK-24109.

Authored-by: Sean Owen <[email protected]>
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1bb63ae5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1bb63ae5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1bb63ae5

Branch: refs/heads/master
Commit: 1bb63ae5127609bd71748450c7c99287f98c72c8
Parents: 65f75db
Author: Sean Owen <[email protected]>
Authored: Thu Oct 11 14:04:44 2018 -0700
Committer: Sean Owen <[email protected]>
Committed: Thu Oct 11 14:04:44 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/io/CompressionCodec.scala  | 63 ++------------------
 project/MimaExcludes.scala                      |  1 +
 2 files changed, 6 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1bb63ae5/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 7722db5..0664c5a 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -154,72 +154,19 @@ class LZFCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
  */
 @DeveloperApi
 class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
-  val version = SnappyCompressionCodec.version
 
-  override def compressedOutputStream(s: OutputStream): OutputStream = {
-    val blockSize = 
conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
-    new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
-  }
-
-  override def compressedInputStream(s: InputStream): InputStream = new 
SnappyInputStream(s)
-}
-
-/**
- * Object guards against memory leak bug in snappy-java library:
- * (https://github.com/xerial/snappy-java/issues/131).
- * Before a new version of the library, we only call the method once and cache 
the result.
- */
-private final object SnappyCompressionCodec {
-  private lazy val version: String = try {
+  try {
     Snappy.getNativeLibraryVersion
   } catch {
     case e: Error => throw new IllegalArgumentException(e)
   }
-}
 
-/**
- * Wrapper over `SnappyOutputStream` which guards against write-after-close 
and double-close
- * issues. See SPARK-7660 for more details. This wrapping can be removed if we 
upgrade to a version
- * of snappy-java that contains the fix for 
https://github.com/xerial/snappy-java/issues/107.
- */
-private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends 
OutputStream {
-
-  private[this] var closed: Boolean = false
-
-  override def write(b: Int): Unit = {
-    if (closed) {
-      throw new IOException("Stream is closed")
-    }
-    os.write(b)
-  }
-
-  override def write(b: Array[Byte]): Unit = {
-    if (closed) {
-      throw new IOException("Stream is closed")
-    }
-    os.write(b)
-  }
-
-  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
-    if (closed) {
-      throw new IOException("Stream is closed")
-    }
-    os.write(b, off, len)
-  }
-
-  override def flush(): Unit = {
-    if (closed) {
-      throw new IOException("Stream is closed")
-    }
-    os.flush()
+  override def compressedOutputStream(s: OutputStream): OutputStream = {
+    val blockSize = 
conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
+    new SnappyOutputStream(s, blockSize)
   }
 
-  override def close(): Unit = {
-    if (!closed) {
-      closed = true
-      os.close()
-    }
-  }
+  override def compressedInputStream(s: InputStream): InputStream = new 
SnappyInputStream(s)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1bb63ae5/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 0b074fb..bf85fe0 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,7 @@ object MimaExcludes {
 
   // Exclude rules for 3.0.x
   lazy val v30excludes = v24excludes ++ Seq(
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version")
   )
 
   // Exclude rules for 2.4.x


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to