This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 898cf76  [SPARK-37373] Collecting LocalSparkContext worker logs in 
case of test failure
898cf76 is described below

commit 898cf76affeb9d12f9007038bf719d2b67a7ee61
Author: attilapiros <[email protected]>
AuthorDate: Wed Dec 15 16:58:05 2021 +0800

    [SPARK-37373] Collecting LocalSparkContext worker logs in case of test 
failure
    
    ### What changes were proposed in this pull request?
    
    Collecting `LocalSparkContext` worker logs in case of test failure.
    
    ### Why are the changes needed?
    
    About 50 test suites are using `LocalSparkContext` by specifying 
"local-cluster" as the cluster URL. In this case executor logs will be under 
the worker dir which is a temporary directory and as such will be deleted at 
shutdown (for details see 
https://github.com/apache/spark/blob/0a4961df29aab6912492e87e4e719865fe20d981/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala#L70).
    
    So when such a test fails and the error was on the executor side the log 
will be lost.
    
    This is only for local cluster tests and not for standalone tests where 
logs will be kept in the "<SPARK_HOME>/work".
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually by adding a temporary code to one of the test to fail then checked 
the unittest.log:
    
    ```
    21/11/18 18:59:32.924 dag-scheduler-event-loop INFO TaskSchedulerImpl: 
Killing all running tasks in stage 0: Stage finished
    21/11/18 18:59:32.924 pool-1-thread-1-ScalaTest-running-DistributedSuite 
INFO DAGScheduler: Job 0 finished: $anonfun$new$13 at OutcomeOf.scala:85, took 
4.339006 s
    21/11/18 18:59:32.930 pool-1-thread-1-ScalaTest-running-DistributedSuite 
INFO DistributedSuite:
    
    ===== EXTRA LOGS FOR THE FAILED TEST
    
    21/11/18 18:59:32.930 pool-1-thread-1-ScalaTest-running-DistributedSuite 
INFO DistributedSuite:
    ----- Logfile: 
/Users/attilazsoltpiros/git/attilapiros/spark/core/target/tmp/org.apache.spark.DistributedSuite/worker-85d9c1f8-3dae-453d-b105-fc2087ef110c/app-20211118095928-0000/1/target/unit-tests.log
    21/11/18 18:59:32.939 pool-1-thread-1-ScalaTest-running-DistributedSuite 
INFO DistributedSuite: 21/11/18 18:59:29.877 main INFO 
CoarseGrainedExecutorBackend: Started daemon with process name: 62486Budlap-617
    21/11/18 18:59:29.885 main INFO SignalUtils: Registering signal handler for 
TERM
    21/11/18 18:59:29.886 main INFO SignalUtils: Registering signal handler for 
HUP
    21/11/18 18:59:29.886 main INFO SignalUtils: Registering signal handler for 
INT
    21/11/18 18:59:30.459 main WARN NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
    ...
    21/11/18 18:59:32.208 Executor task launch worker for task 1.0 in stage 0.0 
(TID 1) INFO MemoryStore: Block broadcast_0 stored as values in memory 
(estimated size 6.2 KiB, free 546.3 MiB)
    21/11/18 18:59:32.792 Executor task launch worker for task 1.0 in stage 0.0 
(TID 1) INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 923 bytes result 
sent to driver
    
    21/11/18 18:59:32.940 pool-1-thread-1-ScalaTest-running-DistributedSuite 
INFO DistributedSuite:
    ----- Logfile: 
/Users/attilazsoltpiros/git/attilapiros/spark/core/target/tmp/org.apache.spark.DistributedSuite/worker-67734ffb-23d8-4aa5-85cf-7b1857961c48/app-20211118095928-0000/2/target/unit-tests.log
    21/11/18 18:59:32.940 pool-1-thread-1-ScalaTest-running-DistributedSuite 
INFO DistributedSuite: 21/11/18 18:59:30.005 main INFO 
CoarseGrainedExecutorBackend: Started daemon with process name: 62488Budlap-617
    21/11/18 18:59:30.014 main INFO SignalUtils: Registering signal handler for 
TERM
    21/11/18 18:59:30.015 main INFO SignalUtils: Registering signal handler for 
HUP
    21/11/18 18:59:30.015 main INFO SignalUtils: Registering signal handler for 
INT
    ...
    ```
    
    Here you can see the path were:
    - 
spark/core/target/tmp/org.apache.spark.DistributedSuite/worker-85d9c1f8-3dae-453d-b105-fc2087ef110c/app-20211118095928-0000/1/target/unit-tests.log
    - 
spark/core/target/tmp/org.apache.spark.DistributedSuite/worker-0c97959f-f8df-464e-a9c4-941a1a40701b/app-20211118095928-0000/0/target/unit-tests.log
    
    Closes #34651 from attilapiros/localSparkClusterExecutorLogs.
    
    Lead-authored-by: attilapiros <[email protected]>
    Co-authored-by: Attila Zsolt Piros <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../main/scala/org/apache/spark/SparkContext.scala |  2 +-
 .../apache/spark/deploy/LocalSparkCluster.scala    | 37 +++++++++++++++++++++-
 .../scala/org/apache/spark/SparkFunSuite.scala     | 14 +++++++-
 .../apache/spark/deploy/master/MasterSuite.scala   |  6 ++--
 4 files changed, 53 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 23f5f85..16ac744 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2966,7 +2966,7 @@ object SparkContext extends Logging {
         sc.conf.setIfMissing(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, false)
 
         val scheduler = new TaskSchedulerImpl(sc)
-        val localCluster = new LocalSparkCluster(
+        val localCluster = LocalSparkCluster(
           numWorkers.toInt, coresPerWorker.toInt, memoryPerWorkerInt, sc.conf)
         val masterUrls = localCluster.start()
         val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala 
b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index b2ed912..9c57269 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.deploy
 
+import java.io.File
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkConf
@@ -33,7 +35,7 @@ import org.apache.spark.util.Utils
  * fault recovery without spinning up a lot of processes.
  */
 private[spark]
-class LocalSparkCluster(
+class LocalSparkCluster private (
     numWorkers: Int,
     coresPerWorker: Int,
     memoryPerWorker: Int,
@@ -45,6 +47,8 @@ class LocalSparkCluster(
   private val workerRpcEnvs = ArrayBuffer[RpcEnv]()
   // exposed for testing
   var masterWebUIPort = -1
+  // for test only
+  private val workerDirs = ArrayBuffer[String]()
 
   def start(): Array[String] = {
     logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
@@ -66,6 +70,9 @@ class LocalSparkCluster(
       val workDir = if (Utils.isTesting) {
         Utils.createTempDir(namePrefix = "worker").getAbsolutePath
       } else null
+      if (Utils.isTesting) {
+        workerDirs += workDir
+      }
       val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, 
coresPerWorker,
         memoryPerWorker, masters, workDir, Some(workerNum), _conf,
         conf.get(config.Worker.SPARK_WORKER_RESOURCE_FILE))
@@ -75,6 +82,13 @@ class LocalSparkCluster(
     masters
   }
 
+  def workerLogfiles(): Seq[File] = {
+    workerDirs.toSeq.flatMap { dir =>
+      Utils.recursiveList(new File(dir))
+        .filter(f => f.isFile && 
""".*\.log$""".r.findFirstMatchIn(f.getName).isDefined)
+    }
+  }
+
   def stop(): Unit = {
     logInfo("Shutting down local Spark cluster.")
     // Stop the workers before the master so they don't get upset that it 
disconnected
@@ -84,5 +98,26 @@ class LocalSparkCluster(
     masterRpcEnvs.foreach(_.awaitTermination())
     masterRpcEnvs.clear()
     workerRpcEnvs.clear()
+    workerDirs.clear()
+    LocalSparkCluster.clear()
+  }
+}
+
+private[spark] object LocalSparkCluster {
+
+  private var localCluster: Option[LocalSparkCluster] = None
+
+  private[spark] def get: Option[LocalSparkCluster] = localCluster
+
+  private def clear(): Unit = localCluster = None
+
+  def apply(
+      numWorkers: Int,
+      coresPerWorker: Int,
+      memoryPerWorker: Int,
+      conf: SparkConf): LocalSparkCluster = {
+    localCluster =
+      Some(new LocalSparkCluster(numWorkers, coresPerWorker, memoryPerWorker, 
conf))
+    localCluster.get
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 227c708..d2e08b7 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -31,6 +31,7 @@ import org.apache.log4j.spi.LoggingEvent
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, 
Failed, Outcome}
 import org.scalatest.funsuite.AnyFunSuite
 
+import org.apache.spark.deploy.LocalSparkCluster
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.util.{AccumulatorContext, Utils}
@@ -172,7 +173,18 @@ abstract class SparkFunSuite
     }
   }
 
-  protected def logForFailedTest(): Unit = {}
+  protected def logForFailedTest(): Unit = {
+    LocalSparkCluster.get.foreach { localCluster =>
+      val workerLogfiles = localCluster.workerLogfiles
+      if (workerLogfiles.nonEmpty) {
+        logInfo("\n\n===== EXTRA LOGS FOR THE FAILED TEST\n")
+        workerLogfiles.foreach { logFile =>
+          logInfo(s"\n----- Logfile: ${logFile.getAbsolutePath()}")
+          logInfo(FileUtils.readFileToString(logFile, "UTF-8"))
+        }
+      }
+    }
+  }
 
   /**
    * Log the suite name and the test name before and after each test.
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 7fd9af1..3832e38 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -323,7 +323,7 @@ class MasterSuite extends SparkFunSuite
   test("master/worker web ui available") {
     implicit val formats = org.json4s.DefaultFormats
     val conf = new SparkConf()
-    val localCluster = new LocalSparkCluster(2, 2, 512, conf)
+    val localCluster = LocalSparkCluster(2, 2, 512, conf)
     localCluster.start()
     val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}";
     try {
@@ -360,7 +360,7 @@ class MasterSuite extends SparkFunSuite
     implicit val formats = org.json4s.DefaultFormats
     val conf = new SparkConf()
     conf.set(UI_REVERSE_PROXY, true)
-    val localCluster = new LocalSparkCluster(2, 2, 512, conf)
+    val localCluster = LocalSparkCluster(2, 2, 512, conf)
     localCluster.start()
     val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}";
     try {
@@ -398,7 +398,7 @@ class MasterSuite extends SparkFunSuite
     val conf = new SparkConf()
     conf.set(UI_REVERSE_PROXY, true)
     conf.set(UI_REVERSE_PROXY_URL, reverseProxyUrl)
-    val localCluster = new LocalSparkCluster(2, 2, 512, conf)
+    val localCluster = LocalSparkCluster(2, 2, 512, conf)
     localCluster.start()
     val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}";
     try {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to