This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c75c29d [SPARK-32598][SCHEDULER] Fix missing driver logs under UI App-Executors tab in standalone cluster mode c75c29d is described below commit c75c29dcaa9458a9ce0dd7a4d5fafbffb4b7f6a6 Author: KevinSmile <kevinwang...@hotmail.com> AuthorDate: Fri Jan 15 09:01:26 2021 -0600 [SPARK-32598][SCHEDULER] Fix missing driver logs under UI App-Executors tab in standalone cluster mode ### What changes were proposed in this pull request? Fix [SPARK-32598] (missing driver logs under UI-ApplicationDetails-Executors tab in standalone cluster mode) . The direct bug is: the original author forgot to implement `getDriverLogUrls` in `StandaloneSchedulerBackend` https://github.com/apache/spark/blob/1de272f98d0ff22d0dd151797f22b8faf310963a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L70-L75 So we set DriverLogUrls as env in `DriverRunner`, and retrieve it at `StandaloneSchedulerBackend`. ### Why are the changes needed? Fix bug [SPARK-32598]. ### Does this PR introduce _any_ user-facing change? Yes. User will see driver logs (standalone cluster mode) under UI-ApplicationDetails-Executors tab now. Before: ![image](https://user-images.githubusercontent.com/17903517/93901055-b5de8600-fd28-11ea-879a-d97e6f70cc6e.png) After: ![image](https://user-images.githubusercontent.com/17903517/93901080-baa33a00-fd28-11ea-8895-3787c5efbf88.png) ### How was this patch tested? Re-check the real case in [SPARK-32598] and found this user-facing bug fixed. Closes #29644 from KevinSmile/kw-dev-master. Authored-by: KevinSmile <kevinwang...@hotmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../scala/org/apache/spark/deploy/worker/DriverRunner.scala | 11 +++++++++++ .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 1 + .../spark/scheduler/cluster/StandaloneSchedulerBackend.scala | 8 ++++++++ .../test/scala/org/apache/spark/deploy/DeployTestUtils.scala | 1 + .../org/apache/spark/deploy/worker/DriverRunnerTest.scala | 3 ++- 5 files changed, 23 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 776d916..6945cb5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -33,9 +33,11 @@ import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{DRIVER_RESOURCES_FILE, SPARK_DRIVER_PREFIX} +import org.apache.spark.internal.config.UI.UI_REVERSE_PROXY import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.ui.UIUtils import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils} /** @@ -50,6 +52,7 @@ private[deploy] class DriverRunner( val driverDesc: DriverDescription, val worker: RpcEndpointRef, val workerUrl: String, + val workerWebUiUrl: String, val securityManager: SecurityManager, val resources: Map[String, ResourceInformation] = Map.empty) extends Logging { @@ -189,6 +192,14 @@ private[deploy] class DriverRunner( val builder = CommandUtils.buildProcessBuilder(driverDesc.command.copy(javaOpts = javaOpts), securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) + // add WebUI driver log url to environment + val reverseProxy = conf.get(UI_REVERSE_PROXY) + val workerUrlRef = UIUtils.makeHref(reverseProxy, driverId, workerWebUiUrl) + builder.environment.put("SPARK_DRIVER_LOG_URL_STDOUT", + s"$workerUrlRef/logPage?driverId=$driverId&logType=stdout") + builder.environment.put("SPARK_DRIVER_LOG_URL_STDERR", + s"$workerUrlRef/logPage?driverId=$driverId&logType=stderr") + runDriver(builder, driverDir, driverDesc.supervise) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index a3c7375..cb36207 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -652,6 +652,7 @@ private[deploy] class Worker( driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, + workerWebUiUrl, securityMgr, resources_) drivers(driverId) = driver diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index c14b2d4..7a05569 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster +import java.util.Locale import java.util.concurrent.Semaphore import java.util.concurrent.atomic.AtomicBoolean @@ -235,6 +236,13 @@ private[spark] class StandaloneSchedulerBackend( } } + override def getDriverLogUrls: Option[Map[String, String]] = { + val prefix = "SPARK_DRIVER_LOG_URL_" + val driverLogUrls = sys.env.filterKeys(_.startsWith(prefix)) + .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)).toMap + if (driverLogUrls.nonEmpty) Some(driverLogUrls) else None + } + private def waitForRegistration() = { registrationBarrier.acquire() } diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala index 31f065e..b182b11 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala @@ -102,6 +102,7 @@ private[deploy] object DeployTestUtils { createDriverDesc(), null, "spark://worker", + "http://publicAddress:80", new SecurityManager(conf)) } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index c3b580e..e429ddf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -40,7 +40,8 @@ class DriverRunnerTest extends SparkFunSuite { val worker = mock(classOf[RpcEndpointRef]) doNothing().when(worker).send(any()) spy(new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"), - driverDescription, worker, "spark://1.2.3.4/worker/", new SecurityManager(conf))) + driverDescription, worker, "spark://1.2.3.4/worker/", "http://publicAddress:80", + new SecurityManager(conf))) } private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org