This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f65db03  [KYUUBI #2119] Support output progress bar in Spark engine
f65db03 is described below

commit f65db034013078ed0bc501b87470f06be5eb0f8e
Author: sychen <[email protected]>
AuthorDate: Fri Mar 18 13:40:17 2022 +0800

    [KYUUBI #2119] Support output progress bar in Spark engine
    
    ### _Why are the changes needed?_
    In the `ExecuteStatement#waitStatementComplete` method, only output Query 
is Running to the log, the user does not know the progress of the query.
    
    close https://github.com/apache/incubator-kyuubi/issues/2119
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2162 from cxzl25/KYUUBI-2119.
    
    Closes #2119
    
    692b3d10 [sychen] add progress bar
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../apache/spark/kyuubi/SQLOperationListener.scala |  26 ++++-
 .../spark/kyuubi/SparkConsoleProgressBar.scala     | 129 +++++++++++++++++++++
 .../org/apache/spark/kyuubi/StageStatus.scala      |  27 +++++
 3 files changed, 181 insertions(+), 1 deletion(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
index 7950db9..d59d8f5 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.kyuubi
 
 import java.util.Properties
+import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.SparkSession
@@ -43,6 +44,11 @@ class SQLOperationListener(
   private val activeJobs = new java.util.HashSet[Int]()
   private val activeStages = new java.util.HashSet[Int]()
   private var executionId: Option[Long] = None
+  private val liveStages = new ConcurrentHashMap[StageAttempt, StageInfo]()
+
+  private val consoleProgressBar = new SparkConsoleProgressBar(
+    operation,
+    liveStages)
 
   def getExecutionId: Option[Long] = executionId
 
@@ -98,6 +104,9 @@ class SQLOperationListener(
         val stageInfo = stageSubmitted.stageInfo
         val stageId = stageInfo.stageId
         activeStages.add(stageId)
+        liveStages.put(
+          StageAttempt(stageId, stageInfo.attemptNumber()),
+          new StageInfo(stageId, stageInfo.numTasks))
         withOperationLog {
           info(s"Query [$operationId]: Stage $stageId started with 
${stageInfo.numTasks} tasks," +
             s" ${activeStages.size()} active stages running")
@@ -111,13 +120,27 @@ class SQLOperationListener(
     val stageId = stageInfo.stageId
     activeStages.synchronized {
       if (activeStages.remove(stageId)) {
+        liveStages.remove(StageAttempt(stageId, stageInfo.attemptNumber()))
         withOperationLog(super.onStageCompleted(stageCompleted))
       }
     }
   }
 
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = 
activeStages.synchronized {
+    if (activeStages.contains(taskStart.stageId)) {
+      liveStages.get(StageAttempt(taskStart.stageId, 
taskStart.stageAttemptId)).numActiveTasks += 1
+      super.onTaskStart(taskStart)
+    }
+  }
+
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = 
activeStages.synchronized {
-    if (activeStages.contains(taskEnd.stageId)) super.onTaskEnd(taskEnd)
+    if (activeStages.contains(taskEnd.stageId)) {
+      liveStages.get(StageAttempt(taskEnd.stageId, 
taskEnd.stageAttemptId)).numActiveTasks -= 1
+      if (taskEnd.reason == org.apache.spark.Success) {
+        liveStages.get(StageAttempt(taskEnd.stageId, 
taskEnd.stageAttemptId)).numCompleteTasks += 1
+      }
+      super.onTaskEnd(taskEnd)
+    }
   }
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
@@ -125,6 +148,7 @@ class SQLOperationListener(
       case sqlExecutionEnd: SparkListenerSQLExecutionEnd
           if executionId.contains(sqlExecutionEnd.executionId) =>
         spark.sparkContext.removeSparkListener(this)
+        consoleProgressBar.finish()
       case _ =>
     }
   }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
new file mode 100644
index 0000000..eefdc3b
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+import java.time.{Instant, ZoneId}
+import java.time.format.DateTimeFormatter
+import java.util.{Locale, Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.operation.Operation
+
+class SparkConsoleProgressBar(
+    operation: Operation,
+    liveStages: ConcurrentHashMap[StageAttempt, StageInfo])
+  extends Logging {
+  // Carriage return
+  private val CR = '\r'
+  // Update period of progress bar, in milliseconds
+  private val updatePeriodMSec = 200L
+  // Delay to show up a progress bar, in milliseconds
+  private val firstDelayMSec = 500L
+
+  // The width of terminal
+  private val TerminalWidth = sys.env.getOrElse("COLUMNS", "80").toInt
+
+  private var lastUpdateTime = 0L
+  private var lastProgressBar = ""
+
+  val dtFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSS")
+    .withLocale(Locale.getDefault).withZone(ZoneId.systemDefault)
+
+  // Schedule a refresh thread to run periodically
+  private val timer = new Timer(
+    "refresh progress for " +
+      operation.getHandle.identifier.toString,
+    true)
+  timer.schedule(
+    new TimerTask {
+      override def run(): Unit = {
+        refresh()
+      }
+    },
+    firstDelayMSec,
+    updatePeriodMSec)
+
+  /**
+   * Try to refresh the progress bar in every cycle
+   */
+  private def refresh(): Unit = {
+    val now = System.currentTimeMillis()
+    val stages = liveStages.values.asScala.toList.sortBy(_.stageId)
+    if (stages.nonEmpty) {
+      show(now, stages.take(3)) // display at most 3 stages in same time
+    }
+  }
+
+  /**
+   * Show progress bar in console. The progress bar is displayed in the next 
line
+   * after your last output, keeps overwriting itself to hold in one line. The 
logging will follow
+   * the progress bar, then progress bar will be showed in next line without 
overwrite logs.
+   */
+  private def show(now: Long, stages: Seq[StageInfo]): Unit = {
+    val width = TerminalWidth / stages.size
+    val bar = stages.map { s =>
+      val total = s.numTasks
+      val header = s"[Stage ${s.stageId}:"
+      val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
+      val w = width - header.length - tailer.length
+      val bar =
+        if (w > 0) {
+          val percent = w * s.numCompleteTasks / total
+          (0 until w).map { i =>
+            if (i < percent) "=" else if (i == percent) ">" else " "
+          }.mkString("")
+        } else {
+          ""
+        }
+      header + bar + tailer
+    }.mkString("")
+
+    // only refresh if it's changed OR after 1 minute (or the ssh connection 
will be closed
+    // after idle some time)
+    if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
+      operation.getOperationLog.foreach(log => {
+        log.write(dtFormatter.format(Instant.ofEpochMilli(now)) + ' ' + bar + 
CR)
+      })
+      lastUpdateTime = now
+    }
+    lastProgressBar = bar
+  }
+
+  /**
+   * Clear the progress bar if showed.
+   */
+  private def clear(): Unit = {
+    if (lastProgressBar.nonEmpty) {
+      operation.getOperationLog.foreach(log => {
+        log.write(" " * TerminalWidth + CR)
+      })
+      lastProgressBar = ""
+    }
+  }
+
+  /**
+   * Mark all the stages as finished, clear the progress bar if showed
+   */
+  def finish(): Unit = synchronized {
+    clear()
+    timer.cancel()
+  }
+}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
new file mode 100644
index 0000000..1445708
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+case class StageAttempt(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
+}
+
+class StageInfo(val stageId: Int, val numTasks: Int) {
+  var numActiveTasks = 0
+  var numCompleteTasks = 0
+}

Reply via email to