This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 135841f  [SPARK-38411][CORE] Use `UTF-8` when 
`doMergeApplicationListingInternal` reads event logs
135841f is described below

commit 135841f257fbb008aef211a5e38222940849cb26
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Sun Mar 6 15:41:20 2022 -0800

    [SPARK-38411][CORE] Use `UTF-8` when `doMergeApplicationListingInternal` 
reads event logs
    
    ### What changes were proposed in this pull request?
    
    Use UTF-8 instead of system default encoding to read event log
    
    ### Why are the changes needed?
    
    After SPARK-29160, we should always use UTF-8 to read event log, otherwise, 
if Spark History Server run with different default charset than "UTF-8", will 
encounter such error.
    
    ```
    2022-03-04 12:16:00,143 [3752440] - INFO  
[log-replay-executor-19:Logging57] - Parsing 
hdfs://hz-cluster11/spark2-history/application_1640597251469_2453817_1.lz4 for 
listing data...
    2022-03-04 12:16:00,145 [3752442] - ERROR 
[log-replay-executor-18:Logging94] - Exception while merging application 
listings
    java.nio.charset.MalformedInputException: Input length = 1
        at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at 
scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74)
        at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:884)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
        at 
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:82)
        at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListing$4(FsHistoryProvider.scala:819)
        at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListing$4$adapted(FsHistoryProvider.scala:801)
        at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2626)
        at 
org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListing(FsHistoryProvider.scala:801)
        at 
org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:715)
        at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:581)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, bug fix.
    
    ### How was this patch tested?
    
    Verification steps in ubuntu:20.04
    
    1. build `spark-3.3.0-SNAPSHOT-bin-master.tgz` on commit `34618a7ef6` using 
`dev/make-distribution.sh --tgz --name master`
    2. build `spark-3.3.0-SNAPSHOT-bin-SPARK-38411.tgz` on commit `2a8f56038b` 
using `dev/make-distribution.sh --tgz --name SPARK-38411`
    3. switch to UTF-8 using `export LC_ALL=C.UTF-8 && bash`
    4. generate event log contains no-ASCII chars.
        ```
        bin/spark-submit \
            --master local[*] \
            --class org.apache.spark.examples.SparkPi \
            --conf spark.eventLog.enabled=true \
            --conf spark.user.key='计算圆周率' \
            examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar
        ```
    5. switch to POSIX using `export LC_ALL=POSIX && bash`
    6. run `spark-3.3.0-SNAPSHOT-bin-master/sbin/start-history-server.sh` and 
watch logs
        <details>
    
        ```
        Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp 
/spark-3.3.0-SNAPSHOT-bin-master/conf/:/spark-3.3.0-SNAPSHOT-bin-master/jars/* 
-Xmx1g org.apache.spark.deploy.history.HistoryServer
        ========================================
        Using Spark's default log4j profile: 
org/apache/spark/log4j2-defaults.properties
        22/03/06 13:37:19 INFO HistoryServer: Started daemon with process name: 
48729c3ffc10aa9
        22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for TERM
        22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for HUP
        22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for INT
        22/03/06 13:37:21 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
        22/03/06 13:37:21 INFO SecurityManager: Changing view acls to: root
        22/03/06 13:37:21 INFO SecurityManager: Changing modify acls to: root
        22/03/06 13:37:21 INFO SecurityManager: Changing view acls groups to:
        22/03/06 13:37:21 INFO SecurityManager: Changing modify acls groups to:
        22/03/06 13:37:21 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(root); groups 
with view permissions: Set(); users  with modify permissions: Set(root); groups 
with modify permissions: Set()
        22/03/06 13:37:21 INFO FsHistoryProvider: History server ui acls 
disabled; users with admin permissions: ; groups with admin permissions:
        22/03/06 13:37:22 INFO Utils: Successfully started service 
'HistoryServerUI' on port 18080.
        22/03/06 13:37:23 INFO HistoryServer: Bound HistoryServer to 0.0.0.0, 
and started at http://29c3ffc10aa9:18080
        22/03/06 13:37:23 INFO FsHistoryProvider: Parsing 
file:/tmp/spark-events/local-1646573251839 for listing data...
        22/03/06 13:37:25 ERROR FsHistoryProvider: Exception while merging 
application listings
        java.nio.charset.MalformedInputException: Input length = 1
            at 
java.nio.charset.CoderResult.throwException(CoderResult.java:281) ~[?:1.8.0_312]
            at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) 
~[?:1.8.0_312]
            at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) 
~[?:1.8.0_312]
            at java.io.InputStreamReader.read(InputStreamReader.java:184) 
~[?:1.8.0_312]
            at java.io.BufferedReader.fill(BufferedReader.java:161) 
~[?:1.8.0_312]
            at java.io.BufferedReader.readLine(BufferedReader.java:324) 
~[?:1.8.0_312]
            at java.io.BufferedReader.readLine(BufferedReader.java:389) 
~[?:1.8.0_312]
            at 
scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74) 
~[scala-library-2.12.15.jar:?]
            at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:886) 
~[scala-library-2.12.15.jar:?]
            at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) 
~[scala-library-2.12.15.jar:?]
            at 
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:82) 
~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
            at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListingInternal$4(FsHistoryProvider.scala:830)
 ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
            at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListingInternal$4$adapted(FsHistoryProvider.scala:812)
 ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
            at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2738) 
~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
            at 
org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListingInternal(FsHistoryProvider.scala:812)
 ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
            at 
org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListing(FsHistoryProvider.scala:758)
 ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
            at 
org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:718)
 ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
            at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:584)
 ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_312]
            at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_312]
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_312]
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_312]
            at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
        ```
        </details>
    7. run `spark-3.3.0-SNAPSHOT-bin-master/sbin/stop-history-server.sh`
    8. run `spark-3.3.0-SNAPSHOT-bin-SPARK-38411/sbin/stop-history-server.sh` 
and watch logs
        <details>
    
        ```
        Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp 
/spark-3.3.0-SNAPSHOT-bin-SPARK-38411/conf/:/spark-3.3.0-SNAPSHOT-bin-SPARK-38411/jars/*
 -Xmx1g org.apache.spark.deploy.history.HistoryServer
        ========================================
        Using Spark's default log4j profile: 
org/apache/spark/log4j2-defaults.properties
        22/03/06 13:30:54 INFO HistoryServer: Started daemon with process name: 
34729c3ffc10aa9
        22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for TERM
        22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for HUP
        22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for INT
        22/03/06 13:30:55 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
        22/03/06 13:30:56 INFO SecurityManager: Changing view acls to: root
        22/03/06 13:30:56 INFO SecurityManager: Changing modify acls to: root
        22/03/06 13:30:56 INFO SecurityManager: Changing view acls groups to:
        22/03/06 13:30:56 INFO SecurityManager: Changing modify acls groups to:
        22/03/06 13:30:56 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(root); groups 
with view permissions: Set(); users  with modify permissions: Set(root); groups 
with modify permissions: Set()
        22/03/06 13:30:56 INFO FsHistoryProvider: History server ui acls 
disabled; users with admin permissions: ; groups with admin permissions:
        22/03/06 13:30:57 INFO Utils: Successfully started service 
'HistoryServerUI' on port 18080.
        22/03/06 13:30:57 INFO HistoryServer: Bound HistoryServer to 0.0.0.0, 
and started at http://29c3ffc10aa9:18080
        22/03/06 13:30:57 INFO FsHistoryProvider: Parsing 
file:/tmp/spark-events/local-1646573251839 for listing data...
        22/03/06 13:30:59 INFO FsHistoryProvider: Finished parsing 
file:/tmp/spark-events/local-1646573251839
        ```
        </details>
    
    Closes #35730 from pan3793/SPARK-38411.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../scala/org/apache/spark/deploy/history/FsHistoryProvider.scala     | 4 ++--
 core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala    | 4 ++--
 .../org/apache/spark/deploy/history/EventLogFileWritersSuite.scala    | 4 ++--
 .../scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala  | 4 ++--
 4 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a2494eb..a9adaed 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -26,7 +26,7 @@ import java.util.zip.ZipOutputStream
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.io.Source
+import scala.io.{Codec, Source}
 import scala.util.control.NonFatal
 import scala.xml.Node
 
@@ -819,7 +819,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
           }
         }
 
-        val source = Source.fromInputStream(in).getLines()
+        val source = Source.fromInputStream(in)(Codec.UTF8).getLines()
 
         // Because skipping may leave the stream in the middle of a line, read 
the next line
         // before replaying.
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index aead72e..c5a72ef 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 
 import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
+import scala.io.{Codec, Source}
 
 import com.google.common.io.ByteStreams
 import org.apache.commons.io.FileUtils
@@ -647,7 +647,7 @@ class SparkSubmitSuite
       runSparkSubmit(args)
       val listStatus = fileSystem.listStatus(testDirPath)
       val logData = EventLogFileReader.openEventLog(listStatus.last.getPath, 
fileSystem)
-      Source.fromInputStream(logData).getLines().foreach { line =>
+      Source.fromInputStream(logData)(Codec.UTF8).getLines().foreach { line =>
         assert(!line.contains("secret_password"))
       }
     }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
index e6dd9ae..455e2e1 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
@@ -21,7 +21,7 @@ import java.io.{File, FileOutputStream, IOException}
 import java.net.URI
 
 import scala.collection.mutable
-import scala.io.Source
+import scala.io.{Codec, Source}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@@ -114,7 +114,7 @@ abstract class EventLogFileWritersSuite extends 
SparkFunSuite with LocalSparkCon
   protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): 
List[String] = {
     val logDataStream = EventLogFileReader.openEventLog(log, fs)
     try {
-      Source.fromInputStream(logDataStream).getLines().toList
+      Source.fromInputStream(logDataStream)(Codec.UTF8).getLines().toList
     } finally {
       logDataStream.close()
     }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index b06e83e..edb2095 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -23,7 +23,7 @@ import java.util.{Arrays, Properties}
 import scala.collection.immutable.Map
 import scala.collection.mutable
 import scala.collection.mutable.Set
-import scala.io.Source
+import scala.io.{Codec, Source}
 
 import org.apache.hadoop.fs.Path
 import org.json4s.jackson.JsonMethods._
@@ -661,7 +661,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
   }
 
   private def readLines(in: InputStream): Seq[String] = {
-    Source.fromInputStream(in).getLines().toSeq
+    Source.fromInputStream(in)(Codec.UTF8).getLines().toSeq
   }
 
   /**

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to