This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 119ddd7 [SPARK-36737][BUILD][CORE][SQL][SS] Upgrade Apache commons-io to 2.11.0 and revert change of SPARK-36456 119ddd7 is described below commit 119ddd7e9526ed899f88a944babb74af693297f5 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Tue Sep 14 21:16:58 2021 +0900 [SPARK-36737][BUILD][CORE][SQL][SS] Upgrade Apache commons-io to 2.11.0 and revert change of SPARK-36456 ### What changes were proposed in this pull request? SPARK-36456 change to use `JavaUtils. closeQuietly` instead of `IOUtils.closeQuietly`, but there is slightly different from the 2 methods in default behavior: swallowing IOException is same, but the former logs it as ERROR while the latter doesn't log by default. `Apache commons-io` community decided to retain the `IOUtils.closeQuietly` method in the [new version](https://github.com/apache/commons-io/blob/75f20dca72656225d0dc8e7c982e40caa9277d42/src/main/java/org/apache/commons/io/IOUtils.java#L465-L467) and removed deprecated annotation, the change has been released in version 2.11.0. So the change of this pr is to upgrade `Apache commons-io` to 2.11.0 and revert change of SPARK-36456 to maintain original behavior(don't print error log). ### Why are the changes needed? 1. Upgrade `Apache commons-io` to 2.11.0 to use non-deprecated `closeQuietly` API, other changes related to `Apache commons-io are detailed in [commons-io/changes-report](https://commons.apache.org/proper/commons-io/changes-report.html#a2.11.0) 2. Revert change of SPARK-36737 to maintain original `IOUtils.closeQuietly` API behavior(don't print error log). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass the Jenkins or GitHub Action Closes #33977 from LuciferYang/upgrade-commons-io. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 5 +++-- .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 5 +++-- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- .../scala/org/apache/spark/util/logging/RollingFileAppender.scala | 5 ++--- core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala | 3 +-- core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 6 +++--- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 2 +- pom.xml | 2 +- .../org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 4 ++-- .../org/apache/spark/sql/execution/streaming/StreamMetadata.scala | 4 ++-- .../execution/streaming/state/HDFSBackedStateStoreProvider.scala | 4 ++-- .../spark/sql/execution/streaming/state/RocksDBFileManager.scala | 5 ++--- 12 files changed, 24 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b537060..cbb4e9c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -35,6 +35,7 @@ import scala.util.control.NonFatal import com.codahale.metrics.{MetricRegistry, MetricSet} import com.google.common.cache.CacheBuilder +import org.apache.commons.io.IOUtils import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors @@ -51,7 +52,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper} import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo -import org.apache.spark.network.util.{JavaUtils, TransportConf} +import org.apache.spark.network.util.TransportConf import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} @@ -341,7 +342,7 @@ private[spark] class BlockManager( false } } finally { - JavaUtils.closeQuietly(inputStream) + IOUtils.closeQuietly(inputStream) } } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index b1713ec..eaecf65 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -29,6 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.util.{Failure, Success} import io.netty.util.internal.OutOfDirectMemoryError +import org.apache.commons.io.IOUtils import org.roaringbitmap.RoaringBitmap import org.apache.spark.{MapOutputTracker, TaskContext} @@ -38,7 +39,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper} -import org.apache.spark.network.util.{JavaUtils, NettyUtils, TransportConf} +import org.apache.spark.network.util.{NettyUtils, TransportConf} import org.apache.spark.shuffle.ShuffleReadMetricsReporter import org.apache.spark.util.{CompletionIterator, TaskCompletionListener, Utils} @@ -1303,7 +1304,7 @@ private class BufferReleasingInputStream( val diagnosisResponse = checkedInOpt.map { checkedIn => iterator.diagnoseCorruption(checkedIn, address, blockId) } - JavaUtils.closeQuietly(this) + IOUtils.closeQuietly(this) // We'd never retry the block whatever the cause is since the block has been // partially consumed by downstream RDDs. iterator.throwFetchFailedException(blockId, mapIndex, address, e, diagnosisResponse) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f894b83..f3fc90d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -3149,8 +3149,8 @@ private[spark] object Utils extends Logging { logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}") } finally { // Close everything no matter what happened - JavaUtils.closeQuietly(in) - JavaUtils.closeQuietly(out) + IOUtils.closeQuietly(in) + IOUtils.closeQuietly(out) } files.toSeq } diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala index 10363a9..68a5923 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -25,7 +25,6 @@ import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.internal.config -import org.apache.spark.network.util.JavaUtils /** * Continuously appends data from input stream into the given file, and rolls @@ -95,8 +94,8 @@ private[spark] class RollingFileAppender( gzOutputStream.close() activeFile.delete() } finally { - JavaUtils.closeQuietly(inputStream) - JavaUtils.closeQuietly(gzOutputStream) + IOUtils.closeQuietly(inputStream) + IOUtils.closeQuietly(gzOutputStream) } } else { Files.move(activeFile, rolloverFile) diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index 1197bea..71010a1 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -35,7 +35,6 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.{config, Logging} -import org.apache.spark.network.util.JavaUtils import org.apache.spark.util.logging.{FileAppender, RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy} class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { @@ -381,7 +380,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { try { IOUtils.toString(inputStream, StandardCharsets.UTF_8) } finally { - JavaUtils.closeQuietly(inputStream) + IOUtils.closeQuietly(inputStream) } } else { Files.toString(file, StandardCharsets.UTF_8) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index a4df5cd..f8607f1 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.network.util.{ByteUnit, JavaUtils} +import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.SparkListener import org.apache.spark.util.io.ChunkedByteBufferInputStream @@ -245,8 +245,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(mergedStream.read() === -1) assert(byteBufferInputStream.chunkedByteBuffer === null) } finally { - JavaUtils.closeQuietly(mergedStream) - JavaUtils.closeQuietly(in) + IOUtils.closeQuietly(mergedStream) + IOUtils.closeQuietly(in) } } } diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index a2f5ce4..e8c7ad9 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -35,7 +35,7 @@ commons-compiler/3.0.16//commons-compiler-3.0.16.jar commons-compress/1.21//commons-compress-1.21.jar commons-crypto/1.1.0//commons-crypto-1.1.0.jar commons-dbcp/1.4//commons-dbcp-1.4.jar -commons-io/2.8.0//commons-io-2.8.0.jar +commons-io/2.11.0//commons-io-2.11.0.jar commons-lang/2.6//commons-lang-2.6.jar commons-lang3/3.12.0//commons-lang3-3.12.0.jar commons-logging/1.1.3//commons-logging-1.1.3.jar diff --git a/pom.xml b/pom.xml index c5cb837..02627b4 100644 --- a/pom.xml +++ b/pom.xml @@ -173,7 +173,7 @@ <netlib.ludovic.dev.version>2.2.0</netlib.ludovic.dev.version> <commons-codec.version>1.15</commons-codec.version> <commons-compress.version>1.21</commons-compress.version> - <commons-io.version>2.8.0</commons-io.version> + <commons-io.version>2.11.0</commons-io.version> <!-- org.apache.commons/commons-lang/--> <commons-lang2.version>2.6</commons-lang2.version> <!-- org.apache.commons/commons-lang3/--> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 2a4e064..8a037b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -22,12 +22,12 @@ import java.nio.charset.StandardCharsets import scala.reflect.ClassTag +import org.apache.commons.io.IOUtils import org.apache.hadoop.fs._ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging -import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.SparkSession import org.apache.spark.sql.errors.QueryExecutionErrors @@ -147,7 +147,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: throw new IllegalStateException( s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise) } finally { - JavaUtils.closeQuietly(input) + IOUtils.closeQuietly(input) } } else { throw QueryExecutionErrors.batchMetadataFileNotFoundError(batchMetadataFile) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala index b46be4f..cb18988 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -22,13 +22,13 @@ import java.nio.charset.StandardCharsets import scala.util.control.NonFatal +import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging -import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream @@ -63,7 +63,7 @@ object StreamMetadata extends Logging { logError(s"Error reading stream metadata from $metadataFile", e) throw e } finally { - JavaUtils.closeQuietly(input) + IOUtils.closeQuietly(input) } } else None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index ce2bbe8..75b7dae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -27,13 +27,13 @@ import scala.collection.mutable import scala.util.control.NonFatal import com.google.common.io.ByteStreams +import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec -import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming.CheckpointFileManager @@ -542,7 +542,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with rawStream: CancellableFSDataOutputStream): Unit = { try { if (rawStream != null) rawStream.cancel() - JavaUtils.closeQuietly(compressedStream) + IOUtils.closeQuietly(compressedStream) } catch { case e: FSError if e.getCause.isInstanceOf[IOException] => // Closing the compressedStream causes the stream to write/flush flush data into the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 3378064..23cdbd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -38,7 +38,6 @@ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging -import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.util.Utils @@ -459,8 +458,8 @@ class RocksDBFileManager( throw e } finally { // Close everything no matter what happened - JavaUtils.closeQuietly(in) - JavaUtils.closeQuietly(zout) + IOUtils.closeQuietly(in) + IOUtils.closeQuietly(zout) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org