Repository: spark Updated Branches: refs/heads/master 65f75db61 -> 1bb63ae51
[SPARK-24109][CORE] Remove class SnappyOutputStreamWrapper ## What changes were proposed in this pull request? Remove SnappyOutputStreamWrapper and other workaround now that new Snappy fixes these. See also https://github.com/apache/spark/pull/21176 and comments it links to. ## How was this patch tested? Existing tests Closes #22691 from srowen/SPARK-24109. Authored-by: Sean Owen <[email protected]> Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1bb63ae5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1bb63ae5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1bb63ae5 Branch: refs/heads/master Commit: 1bb63ae5127609bd71748450c7c99287f98c72c8 Parents: 65f75db Author: Sean Owen <[email protected]> Authored: Thu Oct 11 14:04:44 2018 -0700 Committer: Sean Owen <[email protected]> Committed: Thu Oct 11 14:04:44 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/io/CompressionCodec.scala | 63 ++------------------ project/MimaExcludes.scala | 1 + 2 files changed, 6 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1bb63ae5/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 7722db5..0664c5a 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -154,72 +154,19 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { */ @DeveloperApi class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { - val version = SnappyCompressionCodec.version - override def compressedOutputStream(s: OutputStream): OutputStream = { - val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt - new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize)) - } - - override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) -} - -/** - * Object guards against memory leak bug in snappy-java library: - * (https://github.com/xerial/snappy-java/issues/131). - * Before a new version of the library, we only call the method once and cache the result. - */ -private final object SnappyCompressionCodec { - private lazy val version: String = try { + try { Snappy.getNativeLibraryVersion } catch { case e: Error => throw new IllegalArgumentException(e) } -} -/** - * Wrapper over `SnappyOutputStream` which guards against write-after-close and double-close - * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version - * of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107. - */ -private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream { - - private[this] var closed: Boolean = false - - override def write(b: Int): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b) - } - - override def write(b: Array[Byte]): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b) - } - - override def write(b: Array[Byte], off: Int, len: Int): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b, off, len) - } - - override def flush(): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.flush() + override def compressedOutputStream(s: OutputStream): OutputStream = { + val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt + new SnappyOutputStream(s, blockSize) } - override def close(): Unit = { - if (!closed) { - closed = true - os.close() - } - } + override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/1bb63ae5/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 0b074fb..bf85fe0 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,7 @@ object MimaExcludes { // Exclude rules for 3.0.x lazy val v30excludes = v24excludes ++ Seq( + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version") ) // Exclude rules for 2.4.x --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
