This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f65db03 [KYUUBI #2119] Support output progress bar in Spark engine
f65db03 is described below
commit f65db034013078ed0bc501b87470f06be5eb0f8e
Author: sychen <[email protected]>
AuthorDate: Fri Mar 18 13:40:17 2022 +0800
[KYUUBI #2119] Support output progress bar in Spark engine
### _Why are the changes needed?_
In the `ExecuteStatement#waitStatementComplete` method, only output Query
is Running to the log, the user does not know the progress of the query.
close https://github.com/apache/incubator-kyuubi/issues/2119
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2162 from cxzl25/KYUUBI-2119.
Closes #2119
692b3d10 [sychen] add progress bar
Authored-by: sychen <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../apache/spark/kyuubi/SQLOperationListener.scala | 26 ++++-
.../spark/kyuubi/SparkConsoleProgressBar.scala | 129 +++++++++++++++++++++
.../org/apache/spark/kyuubi/StageStatus.scala | 27 +++++
3 files changed, 181 insertions(+), 1 deletion(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
index 7950db9..d59d8f5 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
@@ -18,6 +18,7 @@
package org.apache.spark.kyuubi
import java.util.Properties
+import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.scheduler._
import org.apache.spark.sql.SparkSession
@@ -43,6 +44,11 @@ class SQLOperationListener(
private val activeJobs = new java.util.HashSet[Int]()
private val activeStages = new java.util.HashSet[Int]()
private var executionId: Option[Long] = None
+ private val liveStages = new ConcurrentHashMap[StageAttempt, StageInfo]()
+
+ private val consoleProgressBar = new SparkConsoleProgressBar(
+ operation,
+ liveStages)
def getExecutionId: Option[Long] = executionId
@@ -98,6 +104,9 @@ class SQLOperationListener(
val stageInfo = stageSubmitted.stageInfo
val stageId = stageInfo.stageId
activeStages.add(stageId)
+ liveStages.put(
+ StageAttempt(stageId, stageInfo.attemptNumber()),
+ new StageInfo(stageId, stageInfo.numTasks))
withOperationLog {
info(s"Query [$operationId]: Stage $stageId started with
${stageInfo.numTasks} tasks," +
s" ${activeStages.size()} active stages running")
@@ -111,13 +120,27 @@ class SQLOperationListener(
val stageId = stageInfo.stageId
activeStages.synchronized {
if (activeStages.remove(stageId)) {
+ liveStages.remove(StageAttempt(stageId, stageInfo.attemptNumber()))
withOperationLog(super.onStageCompleted(stageCompleted))
}
}
}
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit =
activeStages.synchronized {
+ if (activeStages.contains(taskStart.stageId)) {
+ liveStages.get(StageAttempt(taskStart.stageId,
taskStart.stageAttemptId)).numActiveTasks += 1
+ super.onTaskStart(taskStart)
+ }
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit =
activeStages.synchronized {
- if (activeStages.contains(taskEnd.stageId)) super.onTaskEnd(taskEnd)
+ if (activeStages.contains(taskEnd.stageId)) {
+ liveStages.get(StageAttempt(taskEnd.stageId,
taskEnd.stageAttemptId)).numActiveTasks -= 1
+ if (taskEnd.reason == org.apache.spark.Success) {
+ liveStages.get(StageAttempt(taskEnd.stageId,
taskEnd.stageAttemptId)).numCompleteTasks += 1
+ }
+ super.onTaskEnd(taskEnd)
+ }
}
override def onOtherEvent(event: SparkListenerEvent): Unit = {
@@ -125,6 +148,7 @@ class SQLOperationListener(
case sqlExecutionEnd: SparkListenerSQLExecutionEnd
if executionId.contains(sqlExecutionEnd.executionId) =>
spark.sparkContext.removeSparkListener(this)
+ consoleProgressBar.finish()
case _ =>
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
new file mode 100644
index 0000000..eefdc3b
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+import java.time.{Instant, ZoneId}
+import java.time.format.DateTimeFormatter
+import java.util.{Locale, Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.operation.Operation
+
+class SparkConsoleProgressBar(
+ operation: Operation,
+ liveStages: ConcurrentHashMap[StageAttempt, StageInfo])
+ extends Logging {
+ // Carriage return
+ private val CR = '\r'
+ // Update period of progress bar, in milliseconds
+ private val updatePeriodMSec = 200L
+ // Delay to show up a progress bar, in milliseconds
+ private val firstDelayMSec = 500L
+
+ // The width of terminal
+ private val TerminalWidth = sys.env.getOrElse("COLUMNS", "80").toInt
+
+ private var lastUpdateTime = 0L
+ private var lastProgressBar = ""
+
+ val dtFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss.SSS")
+ .withLocale(Locale.getDefault).withZone(ZoneId.systemDefault)
+
+ // Schedule a refresh thread to run periodically
+ private val timer = new Timer(
+ "refresh progress for " +
+ operation.getHandle.identifier.toString,
+ true)
+ timer.schedule(
+ new TimerTask {
+ override def run(): Unit = {
+ refresh()
+ }
+ },
+ firstDelayMSec,
+ updatePeriodMSec)
+
+ /**
+ * Try to refresh the progress bar in every cycle
+ */
+ private def refresh(): Unit = {
+ val now = System.currentTimeMillis()
+ val stages = liveStages.values.asScala.toList.sortBy(_.stageId)
+ if (stages.nonEmpty) {
+ show(now, stages.take(3)) // display at most 3 stages in same time
+ }
+ }
+
+ /**
+ * Show progress bar in console. The progress bar is displayed in the next
line
+ * after your last output, keeps overwriting itself to hold in one line. The
logging will follow
+ * the progress bar, then progress bar will be showed in next line without
overwrite logs.
+ */
+ private def show(now: Long, stages: Seq[StageInfo]): Unit = {
+ val width = TerminalWidth / stages.size
+ val bar = stages.map { s =>
+ val total = s.numTasks
+ val header = s"[Stage ${s.stageId}:"
+ val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
+ val w = width - header.length - tailer.length
+ val bar =
+ if (w > 0) {
+ val percent = w * s.numCompleteTasks / total
+ (0 until w).map { i =>
+ if (i < percent) "=" else if (i == percent) ">" else " "
+ }.mkString("")
+ } else {
+ ""
+ }
+ header + bar + tailer
+ }.mkString("")
+
+ // only refresh if it's changed OR after 1 minute (or the ssh connection
will be closed
+ // after idle some time)
+ if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
+ operation.getOperationLog.foreach(log => {
+ log.write(dtFormatter.format(Instant.ofEpochMilli(now)) + ' ' + bar +
CR)
+ })
+ lastUpdateTime = now
+ }
+ lastProgressBar = bar
+ }
+
+ /**
+ * Clear the progress bar if showed.
+ */
+ private def clear(): Unit = {
+ if (lastProgressBar.nonEmpty) {
+ operation.getOperationLog.foreach(log => {
+ log.write(" " * TerminalWidth + CR)
+ })
+ lastProgressBar = ""
+ }
+ }
+
+ /**
+ * Mark all the stages as finished, clear the progress bar if showed
+ */
+ def finish(): Unit = synchronized {
+ clear()
+ timer.cancel()
+ }
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
new file mode 100644
index 0000000..1445708
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+case class StageAttempt(stageId: Int, stageAttemptId: Int) {
+ override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
+}
+
+class StageInfo(val stageId: Int, val numTasks: Int) {
+ var numActiveTasks = 0
+ var numCompleteTasks = 0
+}