This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 898cf76 [SPARK-37373] Collecting LocalSparkContext worker logs in
case of test failure
898cf76 is described below
commit 898cf76affeb9d12f9007038bf719d2b67a7ee61
Author: attilapiros <[email protected]>
AuthorDate: Wed Dec 15 16:58:05 2021 +0800
[SPARK-37373] Collecting LocalSparkContext worker logs in case of test
failure
### What changes were proposed in this pull request?
Collecting `LocalSparkContext` worker logs in case of test failure.
### Why are the changes needed?
About 50 test suites are using `LocalSparkContext` by specifying
"local-cluster" as the cluster URL. In this case executor logs will be under
the worker dir which is a temporary directory and as such will be deleted at
shutdown (for details see
https://github.com/apache/spark/blob/0a4961df29aab6912492e87e4e719865fe20d981/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala#L70).
So when such a test fails and the error was on the executor side the log
will be lost.
This is only for local cluster tests and not for standalone tests where
logs will be kept in the "<SPARK_HOME>/work".
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually by adding a temporary code to one of the test to fail then checked
the unittest.log:
```
21/11/18 18:59:32.924 dag-scheduler-event-loop INFO TaskSchedulerImpl:
Killing all running tasks in stage 0: Stage finished
21/11/18 18:59:32.924 pool-1-thread-1-ScalaTest-running-DistributedSuite
INFO DAGScheduler: Job 0 finished: $anonfun$new$13 at OutcomeOf.scala:85, took
4.339006 s
21/11/18 18:59:32.930 pool-1-thread-1-ScalaTest-running-DistributedSuite
INFO DistributedSuite:
===== EXTRA LOGS FOR THE FAILED TEST
21/11/18 18:59:32.930 pool-1-thread-1-ScalaTest-running-DistributedSuite
INFO DistributedSuite:
----- Logfile:
/Users/attilazsoltpiros/git/attilapiros/spark/core/target/tmp/org.apache.spark.DistributedSuite/worker-85d9c1f8-3dae-453d-b105-fc2087ef110c/app-20211118095928-0000/1/target/unit-tests.log
21/11/18 18:59:32.939 pool-1-thread-1-ScalaTest-running-DistributedSuite
INFO DistributedSuite: 21/11/18 18:59:29.877 main INFO
CoarseGrainedExecutorBackend: Started daemon with process name: 62486Budlap-617
21/11/18 18:59:29.885 main INFO SignalUtils: Registering signal handler for
TERM
21/11/18 18:59:29.886 main INFO SignalUtils: Registering signal handler for
HUP
21/11/18 18:59:29.886 main INFO SignalUtils: Registering signal handler for
INT
21/11/18 18:59:30.459 main WARN NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
...
21/11/18 18:59:32.208 Executor task launch worker for task 1.0 in stage 0.0
(TID 1) INFO MemoryStore: Block broadcast_0 stored as values in memory
(estimated size 6.2 KiB, free 546.3 MiB)
21/11/18 18:59:32.792 Executor task launch worker for task 1.0 in stage 0.0
(TID 1) INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 923 bytes result
sent to driver
21/11/18 18:59:32.940 pool-1-thread-1-ScalaTest-running-DistributedSuite
INFO DistributedSuite:
----- Logfile:
/Users/attilazsoltpiros/git/attilapiros/spark/core/target/tmp/org.apache.spark.DistributedSuite/worker-67734ffb-23d8-4aa5-85cf-7b1857961c48/app-20211118095928-0000/2/target/unit-tests.log
21/11/18 18:59:32.940 pool-1-thread-1-ScalaTest-running-DistributedSuite
INFO DistributedSuite: 21/11/18 18:59:30.005 main INFO
CoarseGrainedExecutorBackend: Started daemon with process name: 62488Budlap-617
21/11/18 18:59:30.014 main INFO SignalUtils: Registering signal handler for
TERM
21/11/18 18:59:30.015 main INFO SignalUtils: Registering signal handler for
HUP
21/11/18 18:59:30.015 main INFO SignalUtils: Registering signal handler for
INT
...
```
Here you can see the path were:
-
spark/core/target/tmp/org.apache.spark.DistributedSuite/worker-85d9c1f8-3dae-453d-b105-fc2087ef110c/app-20211118095928-0000/1/target/unit-tests.log
-
spark/core/target/tmp/org.apache.spark.DistributedSuite/worker-0c97959f-f8df-464e-a9c4-941a1a40701b/app-20211118095928-0000/0/target/unit-tests.log
Closes #34651 from attilapiros/localSparkClusterExecutorLogs.
Lead-authored-by: attilapiros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../main/scala/org/apache/spark/SparkContext.scala | 2 +-
.../apache/spark/deploy/LocalSparkCluster.scala | 37 +++++++++++++++++++++-
.../scala/org/apache/spark/SparkFunSuite.scala | 14 +++++++-
.../apache/spark/deploy/master/MasterSuite.scala | 6 ++--
4 files changed, 53 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 23f5f85..16ac744 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2966,7 +2966,7 @@ object SparkContext extends Logging {
sc.conf.setIfMissing(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, false)
val scheduler = new TaskSchedulerImpl(sc)
- val localCluster = new LocalSparkCluster(
+ val localCluster = LocalSparkCluster(
numWorkers.toInt, coresPerWorker.toInt, memoryPerWorkerInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index b2ed912..9c57269 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -17,6 +17,8 @@
package org.apache.spark.deploy
+import java.io.File
+
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
@@ -33,7 +35,7 @@ import org.apache.spark.util.Utils
* fault recovery without spinning up a lot of processes.
*/
private[spark]
-class LocalSparkCluster(
+class LocalSparkCluster private (
numWorkers: Int,
coresPerWorker: Int,
memoryPerWorker: Int,
@@ -45,6 +47,8 @@ class LocalSparkCluster(
private val workerRpcEnvs = ArrayBuffer[RpcEnv]()
// exposed for testing
var masterWebUIPort = -1
+ // for test only
+ private val workerDirs = ArrayBuffer[String]()
def start(): Array[String] = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
@@ -66,6 +70,9 @@ class LocalSparkCluster(
val workDir = if (Utils.isTesting) {
Utils.createTempDir(namePrefix = "worker").getAbsolutePath
} else null
+ if (Utils.isTesting) {
+ workerDirs += workDir
+ }
val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0,
coresPerWorker,
memoryPerWorker, masters, workDir, Some(workerNum), _conf,
conf.get(config.Worker.SPARK_WORKER_RESOURCE_FILE))
@@ -75,6 +82,13 @@ class LocalSparkCluster(
masters
}
+ def workerLogfiles(): Seq[File] = {
+ workerDirs.toSeq.flatMap { dir =>
+ Utils.recursiveList(new File(dir))
+ .filter(f => f.isFile &&
""".*\.log$""".r.findFirstMatchIn(f.getName).isDefined)
+ }
+ }
+
def stop(): Unit = {
logInfo("Shutting down local Spark cluster.")
// Stop the workers before the master so they don't get upset that it
disconnected
@@ -84,5 +98,26 @@ class LocalSparkCluster(
masterRpcEnvs.foreach(_.awaitTermination())
masterRpcEnvs.clear()
workerRpcEnvs.clear()
+ workerDirs.clear()
+ LocalSparkCluster.clear()
+ }
+}
+
+private[spark] object LocalSparkCluster {
+
+ private var localCluster: Option[LocalSparkCluster] = None
+
+ private[spark] def get: Option[LocalSparkCluster] = localCluster
+
+ private def clear(): Unit = localCluster = None
+
+ def apply(
+ numWorkers: Int,
+ coresPerWorker: Int,
+ memoryPerWorker: Int,
+ conf: SparkConf): LocalSparkCluster = {
+ localCluster =
+ Some(new LocalSparkCluster(numWorkers, coresPerWorker, memoryPerWorker,
conf))
+ localCluster.get
}
}
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 227c708..d2e08b7 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -31,6 +31,7 @@ import org.apache.log4j.spi.LoggingEvent
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach,
Failed, Outcome}
import org.scalatest.funsuite.AnyFunSuite
+import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.util.{AccumulatorContext, Utils}
@@ -172,7 +173,18 @@ abstract class SparkFunSuite
}
}
- protected def logForFailedTest(): Unit = {}
+ protected def logForFailedTest(): Unit = {
+ LocalSparkCluster.get.foreach { localCluster =>
+ val workerLogfiles = localCluster.workerLogfiles
+ if (workerLogfiles.nonEmpty) {
+ logInfo("\n\n===== EXTRA LOGS FOR THE FAILED TEST\n")
+ workerLogfiles.foreach { logFile =>
+ logInfo(s"\n----- Logfile: ${logFile.getAbsolutePath()}")
+ logInfo(FileUtils.readFileToString(logFile, "UTF-8"))
+ }
+ }
+ }
+ }
/**
* Log the suite name and the test name before and after each test.
diff --git
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 7fd9af1..3832e38 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -323,7 +323,7 @@ class MasterSuite extends SparkFunSuite
test("master/worker web ui available") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
- val localCluster = new LocalSparkCluster(2, 2, 512, conf)
+ val localCluster = LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
try {
@@ -360,7 +360,7 @@ class MasterSuite extends SparkFunSuite
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
conf.set(UI_REVERSE_PROXY, true)
- val localCluster = new LocalSparkCluster(2, 2, 512, conf)
+ val localCluster = LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
try {
@@ -398,7 +398,7 @@ class MasterSuite extends SparkFunSuite
val conf = new SparkConf()
conf.set(UI_REVERSE_PROXY, true)
conf.set(UI_REVERSE_PROXY_URL, reverseProxyUrl)
- val localCluster = new LocalSparkCluster(2, 2, 512, conf)
+ val localCluster = LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]