Repository: spark Updated Branches: refs/heads/branch-0.9 d68549e8f -> 8856076b5
SPARK-1284: Fix improper use of SimpleDateFormat `SimpleDateFormat` is not thread-safe. Some places use the same SimpleDateFormat object without safeguard in the multiple threads. It will cause that the Web UI displays improper date. This PR creates a new `SimpleDateFormat` every time when it's necessary. Another solution is using `ThreadLocal` to store a `SimpleDateFormat` in each thread. If this PR impacts the performance, I can change to the latter one. Author: zsxwing <[email protected]> Closes #179 from zsxwing/SPARK-1278 and squashes the following commits: 21fabd3 [zsxwing] SPARK-1278: Fix improper use of SimpleDateFormat Conflicts: core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala core/src/main/scala/org/apache/spark/util/FileLogger.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8856076b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8856076b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8856076b Branch: refs/heads/branch-0.9 Commit: 8856076b5870df64f21d400c67b17bda5a336627 Parents: d68549e Author: zsxwing <[email protected]> Authored: Fri Mar 21 16:07:22 2014 -0700 Committer: Patrick Wendell <[email protected]> Committed: Fri Mar 21 16:39:23 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/deploy/WebUI.scala | 47 ------------------ .../org/apache/spark/deploy/master/Master.scala | 6 +-- .../spark/deploy/master/ui/IndexPage.scala | 8 ++-- .../org/apache/spark/deploy/worker/Worker.scala | 4 +- .../org/apache/spark/scheduler/JobLogger.scala | 5 +- .../main/scala/org/apache/spark/ui/WebUI.scala | 50 ++++++++++++++++++++ .../apache/spark/ui/jobs/ExecutorTable.scala | 1 - .../apache/spark/ui/jobs/JobProgressUI.scala | 1 - .../org/apache/spark/ui/jobs/StagePage.scala | 4 +- .../org/apache/spark/ui/jobs/StageTable.scala | 5 +- 10 files changed, 67 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/deploy/WebUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala b/core/src/main/scala/org/apache/spark/deploy/WebUI.scala deleted file mode 100644 index ae258b5..0000000 --- a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy - -import java.text.SimpleDateFormat -import java.util.Date - -/** - * Utilities used throughout the web UI. - */ -private[spark] object DeployWebUI { - val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") - - def formatDate(date: Date): String = DATE_FORMAT.format(date) - - def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp)) - - def formatDuration(milliseconds: Long): String = { - val seconds = milliseconds.toDouble / 1000 - if (seconds < 60) { - return "%.0f s".format(seconds) - } - val minutes = seconds / 60 - if (minutes < 10) { - return "%.1f min".format(minutes) - } else if (minutes < 60) { - return "%.0f min".format(minutes) - } - val hours = minutes / 60 - return "%.1f h".format(hours) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 3897156..d72bb7a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -45,7 +45,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val conf = new SparkConf - val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs + def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000 val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200) val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15) @@ -621,7 +621,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act /** Generate a new app ID given a app's submission date */ def newApplicationId(submitDate: Date): String = { - val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber) + val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber) nextAppNumber += 1 appId } @@ -644,7 +644,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } def newDriverId(submitDate: Date): String = { - val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber) + val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber) nextDriverNumber += 1 appId } http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index a9af8df..b549825 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -25,10 +25,10 @@ import akka.pattern.ask import javax.servlet.http.HttpServletRequest import net.liftweb.json.JsonAST.JValue -import org.apache.spark.deploy.{DeployWebUI, JsonProtocol} +import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{WebUI, UIUtils} import org.apache.spark.util.Utils private[spark] class IndexPage(parent: MasterWebUI) { @@ -164,10 +164,10 @@ private[spark] class IndexPage(parent: MasterWebUI) { <td sorttable_customkey={app.desc.memoryPerSlave.toString}> {Utils.megabytesToString(app.desc.memoryPerSlave)} </td> - <td>{DeployWebUI.formatDate(app.submitDate)}</td> + <td>{WebUI.formatDate(app.submitDate)}</td> <td>{app.desc.user}</td> <td>{app.state.toString}</td> - <td>{DeployWebUI.formatDuration(app.duration)}</td> + <td>{WebUI.formatDuration(app.duration)}</td> </tr> } http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 36bb289..cecb2c8 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -56,7 +56,7 @@ private[spark] class Worker( Utils.checkHost(host, "Expected hostname") assert (port > 0) - val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs + def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs // Send a heartbeat every (heartbeat timeout) / 4 milliseconds val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4 @@ -309,7 +309,7 @@ private[spark] class Worker( } def generateWorkerId(): String = { - "worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), host, port) + "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port) } override def postStop() { http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index f8fa5a9..835e181 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -53,6 +53,9 @@ class JobLogger(val user: String, val logDirName: String) private val jobIDToPrintWriter = new HashMap[Int, PrintWriter] private val stageIDToJobID = new HashMap[Int, Int] private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]] + private val dateFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + } private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents] @@ -116,7 +119,7 @@ class JobLogger(val user: String, val logDirName: String) var writeInfo = info if (withTime) { val date = new Date(System.currentTimeMillis()) - writeInfo = DATE_FORMAT.format(date) + ": " +info + writeInfo = dateFormat.get.format(date) + ": " + info } jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo)) } http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/ui/WebUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala new file mode 100644 index 0000000..459d298 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +import java.text.SimpleDateFormat +import java.util.Date + +/** + * Utilities used throughout the web UI. + */ +private[spark] object WebUI { + // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. + private val dateFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + } + + def formatDate(date: Date): String = dateFormat.get.format(date) + + def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp)) + + def formatDuration(milliseconds: Long): String = { + val seconds = milliseconds.toDouble / 1000 + if (seconds < 60) { + return "%.0f s".format(seconds) + } + val minutes = seconds / 60 + if (minutes < 10) { + return "%.1f min".format(minutes) + } else if (minutes < 60) { + return "%.0f min".format(minutes) + } + val hours = minutes / 60 + return "%.1f h".format(hours) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index ab03eb5..4d30760 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -27,7 +27,6 @@ import scala.collection.mutable private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) { val listener = parent.listener - val dateFmt = parent.dateFmt val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR def toNodeSeq(): Seq[Node] = { http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index c1ee2f3..1a2eb2a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -40,7 +40,6 @@ import org.apache.spark.util.Utils private[spark] class JobProgressUI(val sc: SparkContext) { private var _listener: Option[JobProgressListener] = None def listener = _listener.get - val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") private val indexPage = new IndexPage(this) private val stagePage = new StagePage(this) http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index cfaf121..6207ef1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -26,6 +26,7 @@ import scala.xml.Node import org.apache.spark.{ExceptionFailure} import org.apache.spark.executor.TaskMetrics import org.apache.spark.ui.UIUtils._ +import org.apache.spark.ui.WebUI import org.apache.spark.ui.Page._ import org.apache.spark.util.{Utils, Distribution} import org.apache.spark.scheduler.TaskInfo @@ -33,7 +34,6 @@ import org.apache.spark.scheduler.TaskInfo /** Page showing statistics and task list for a given stage */ private[spark] class StagePage(parent: JobProgressUI) { def listener = parent.listener - val dateFmt = parent.dateFmt def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { @@ -248,7 +248,7 @@ private[spark] class StagePage(parent: JobProgressUI) { <td>{info.status}</td> <td>{info.taskLocality}</td> <td>{info.host}</td> - <td>{dateFmt.format(new Date(info.launchTime))}</td> + <td>{WebUI.formatDate(new Date(info.launchTime))}</td> <td sorttable_customkey={duration.toString}> {formatDuration} </td> http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 8ea32db..45a0783 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -23,7 +23,7 @@ import scala.xml.Node import scala.collection.mutable.HashSet import org.apache.spark.scheduler.{SchedulingMode, StageInfo, TaskInfo} -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{WebUI, UIUtils} import org.apache.spark.util.Utils @@ -31,7 +31,6 @@ import org.apache.spark.util.Utils private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgressUI) { val listener = parent.listener - val dateFmt = parent.dateFmt val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR def toNodeSeq(): Seq[Node] = { @@ -75,7 +74,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr private def stageRow(s: StageInfo): Seq[Node] = { val submissionTime = s.submissionTime match { - case Some(t) => dateFmt.format(new Date(t)) + case Some(t) => WebUI.formatDate(new Date(t)) case None => "Unknown" }
