This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 119ddd7  [SPARK-36737][BUILD][CORE][SQL][SS] Upgrade Apache commons-io 
to 2.11.0 and revert change of SPARK-36456
119ddd7 is described below

commit 119ddd7e9526ed899f88a944babb74af693297f5
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Tue Sep 14 21:16:58 2021 +0900

    [SPARK-36737][BUILD][CORE][SQL][SS] Upgrade Apache commons-io to 2.11.0 and 
revert change of SPARK-36456
    
    ### What changes were proposed in this pull request?
    SPARK-36456 change to use `JavaUtils. closeQuietly` instead of 
`IOUtils.closeQuietly`, but there is slightly different from the 2 methods in 
default behavior: swallowing IOException is same, but the former logs it as 
ERROR while the latter doesn't log by default.
    
    `Apache commons-io` community decided to retain the `IOUtils.closeQuietly` 
method in the [new 
version](https://github.com/apache/commons-io/blob/75f20dca72656225d0dc8e7c982e40caa9277d42/src/main/java/org/apache/commons/io/IOUtils.java#L465-L467)
 and removed deprecated annotation,  the change has been released in version 
2.11.0.
    
    So the change of this pr is to upgrade `Apache commons-io` to 2.11.0 and 
revert change of SPARK-36456 to maintain original behavior(don't print error 
log).
    
    ### Why are the changes needed?
    
    1. Upgrade `Apache commons-io` to 2.11.0 to use non-deprecated 
`closeQuietly` API, other changes related to `Apache commons-io are detailed in 
[commons-io/changes-report](https://commons.apache.org/proper/commons-io/changes-report.html#a2.11.0)
    
    2. Revert change of SPARK-36737 to maintain original `IOUtils.closeQuietly` 
API behavior(don't print error log).
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass the Jenkins or GitHub Action
    
    Closes #33977 from LuciferYang/upgrade-commons-io.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 core/src/main/scala/org/apache/spark/storage/BlockManager.scala     | 5 +++--
 .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala      | 5 +++--
 core/src/main/scala/org/apache/spark/util/Utils.scala               | 4 ++--
 .../scala/org/apache/spark/util/logging/RollingFileAppender.scala   | 5 ++---
 core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala   | 3 +--
 core/src/test/scala/org/apache/spark/util/UtilsSuite.scala          | 6 +++---
 dev/deps/spark-deps-hadoop-3.2-hive-2.3                             | 2 +-
 pom.xml                                                             | 2 +-
 .../org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala  | 4 ++--
 .../org/apache/spark/sql/execution/streaming/StreamMetadata.scala   | 4 ++--
 .../execution/streaming/state/HDFSBackedStateStoreProvider.scala    | 4 ++--
 .../spark/sql/execution/streaming/state/RocksDBFileManager.scala    | 5 ++---
 12 files changed, 24 insertions(+), 25 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b537060..cbb4e9c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -35,6 +35,7 @@ import scala.util.control.NonFatal
 
 import com.codahale.metrics.{MetricRegistry, MetricSet}
 import com.google.common.cache.CacheBuilder
+import org.apache.commons.io.IOUtils
 
 import org.apache.spark._
 import org.apache.spark.errors.SparkCoreErrors
@@ -51,7 +52,7 @@ import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle._
 import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper}
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
-import org.apache.spark.network.util.{JavaUtils, TransportConf}
+import org.apache.spark.network.util.TransportConf
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
@@ -341,7 +342,7 @@ private[spark] class BlockManager(
             false
         }
       } finally {
-        JavaUtils.closeQuietly(inputStream)
+        IOUtils.closeQuietly(inputStream)
       }
     }
 
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index b1713ec..eaecf65 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -29,6 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, 
HashSet, Queue}
 import scala.util.{Failure, Success}
 
 import io.netty.util.internal.OutOfDirectMemoryError
+import org.apache.commons.io.IOUtils
 import org.roaringbitmap.RoaringBitmap
 
 import org.apache.spark.{MapOutputTracker, TaskContext}
@@ -38,7 +39,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.network.shuffle._
 import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper}
-import org.apache.spark.network.util.{JavaUtils, NettyUtils, TransportConf}
+import org.apache.spark.network.util.{NettyUtils, TransportConf}
 import org.apache.spark.shuffle.ShuffleReadMetricsReporter
 import org.apache.spark.util.{CompletionIterator, TaskCompletionListener, 
Utils}
 
@@ -1303,7 +1304,7 @@ private class BufferReleasingInputStream(
         val diagnosisResponse = checkedInOpt.map { checkedIn =>
           iterator.diagnoseCorruption(checkedIn, address, blockId)
         }
-        JavaUtils.closeQuietly(this)
+        IOUtils.closeQuietly(this)
         // We'd never retry the block whatever the cause is since the block 
has been
         // partially consumed by downstream RDDs.
         iterator.throwFetchFailedException(blockId, mapIndex, address, e, 
diagnosisResponse)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f894b83..f3fc90d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -3149,8 +3149,8 @@ private[spark] object Utils extends Logging {
       logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}")
     } finally {
       // Close everything no matter what happened
-      JavaUtils.closeQuietly(in)
-      JavaUtils.closeQuietly(out)
+      IOUtils.closeQuietly(in)
+      IOUtils.closeQuietly(out)
     }
     files.toSeq
   }
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index 10363a9..68a5923 100644
--- 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -25,7 +25,6 @@ import org.apache.commons.io.IOUtils
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config
-import org.apache.spark.network.util.JavaUtils
 
 /**
  * Continuously appends data from input stream into the given file, and rolls
@@ -95,8 +94,8 @@ private[spark] class RollingFileAppender(
         gzOutputStream.close()
         activeFile.delete()
       } finally {
-        JavaUtils.closeQuietly(inputStream)
-        JavaUtils.closeQuietly(gzOutputStream)
+        IOUtils.closeQuietly(inputStream)
+        IOUtils.closeQuietly(gzOutputStream)
       }
     } else {
       Files.move(activeFile, rolloverFile)
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala 
b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 1197bea..71010a1 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -35,7 +35,6 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.{config, Logging}
-import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.util.logging.{FileAppender, RollingFileAppender, 
SizeBasedRollingPolicy, TimeBasedRollingPolicy}
 
 class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging 
{
@@ -381,7 +380,7 @@ class FileAppenderSuite extends SparkFunSuite with 
BeforeAndAfter with Logging {
         try {
           IOUtils.toString(inputStream, StandardCharsets.UTF_8)
         } finally {
-          JavaUtils.closeQuietly(inputStream)
+          IOUtils.closeQuietly(inputStream)
         }
       } else {
         Files.toString(file, StandardCharsets.UTF_8)
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index a4df5cd..f8607f1 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.launcher.SparkLauncher
-import org.apache.spark.network.util.{ByteUnit, JavaUtils}
+import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.util.io.ChunkedByteBufferInputStream
 
@@ -245,8 +245,8 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
         assert(mergedStream.read() === -1)
         assert(byteBufferInputStream.chunkedByteBuffer === null)
       } finally {
-        JavaUtils.closeQuietly(mergedStream)
-        JavaUtils.closeQuietly(in)
+        IOUtils.closeQuietly(mergedStream)
+        IOUtils.closeQuietly(in)
       }
     }
   }
diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 
b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
index a2f5ce4..e8c7ad9 100644
--- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
@@ -35,7 +35,7 @@ commons-compiler/3.0.16//commons-compiler-3.0.16.jar
 commons-compress/1.21//commons-compress-1.21.jar
 commons-crypto/1.1.0//commons-crypto-1.1.0.jar
 commons-dbcp/1.4//commons-dbcp-1.4.jar
-commons-io/2.8.0//commons-io-2.8.0.jar
+commons-io/2.11.0//commons-io-2.11.0.jar
 commons-lang/2.6//commons-lang-2.6.jar
 commons-lang3/3.12.0//commons-lang3-3.12.0.jar
 commons-logging/1.1.3//commons-logging-1.1.3.jar
diff --git a/pom.xml b/pom.xml
index c5cb837..02627b4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -173,7 +173,7 @@
     <netlib.ludovic.dev.version>2.2.0</netlib.ludovic.dev.version>
     <commons-codec.version>1.15</commons-codec.version>
     <commons-compress.version>1.21</commons-compress.version>
-    <commons-io.version>2.8.0</commons-io.version>
+    <commons-io.version>2.11.0</commons-io.version>
     <!-- org.apache.commons/commons-lang/-->
     <commons-lang2.version>2.6</commons-lang2.version>
     <!-- org.apache.commons/commons-lang3/-->
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 2a4e064..8a037b5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -22,12 +22,12 @@ import java.nio.charset.StandardCharsets
 
 import scala.reflect.ClassTag
 
+import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs._
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
 
@@ -147,7 +147,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
           throw new IllegalStateException(
             s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", 
ise)
       } finally {
-        JavaUtils.closeQuietly(input)
+        IOUtils.closeQuietly(input)
       }
     } else {
       throw 
QueryExecutionErrors.batchMetadataFileNotFoundError(batchMetadataFile)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index b46be4f..cb18988 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -22,13 +22,13 @@ import java.nio.charset.StandardCharsets
 
 import scala.util.control.NonFatal
 
+import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, 
Path}
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
 
@@ -63,7 +63,7 @@ object StreamMetadata extends Logging {
           logError(s"Error reading stream metadata from $metadataFile", e)
           throw e
       } finally {
-        JavaUtils.closeQuietly(input)
+        IOUtils.closeQuietly(input)
       }
     } else None
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index ce2bbe8..75b7dae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -27,13 +27,13 @@ import scala.collection.mutable
 import scala.util.control.NonFatal
 
 import com.google.common.io.ByteStreams
+import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.CompressionCodec
-import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
@@ -542,7 +542,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
       rawStream: CancellableFSDataOutputStream): Unit = {
     try {
       if (rawStream != null) rawStream.cancel()
-      JavaUtils.closeQuietly(compressedStream)
+      IOUtils.closeQuietly(compressedStream)
     } catch {
       case e: FSError if e.getCause.isInstanceOf[IOException] =>
         // Closing the compressedStream causes the stream to write/flush flush 
data into the
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 3378064..23cdbd0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -38,7 +38,6 @@ import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
 import org.apache.spark.util.Utils
 
@@ -459,8 +458,8 @@ class RocksDBFileManager(
         throw e
     } finally {
       // Close everything no matter what happened
-      JavaUtils.closeQuietly(in)
-      JavaUtils.closeQuietly(zout)
+      IOUtils.closeQuietly(in)
+      IOUtils.closeQuietly(zout)
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to