This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 00ddcd39b [KYUUBI #5184] [Improvement] Rename Kyuubi's StageInfo to 
SparkStageInfo to fix class mismatch
00ddcd39b is described below

commit 00ddcd39bb9062f79c26bb14dcb719fe31513a10
Author: liangbowen <[email protected]>
AuthorDate: Mon Aug 21 19:59:43 2023 +0800

    [KYUUBI #5184] [Improvement] Rename Kyuubi's StageInfo to SparkStageInfo to 
fix class mismatch
    
    ### _Why are the changes needed?_
    
    - Fix class mismatch when trying to compilation on Scala 2.13, due to 
implicit class reference to `StageInfo`. The compilation fails by type 
mismatching, if the compiler classloader loads Spark's 
`org.apache.spark.schedulerStageInfo` ahead of Kyuubi's 
`org.apache.spark.kyuubi.StageInfo`.
    - Change var integer to AtomicInteger for `numActiveTasks` and 
`numCompleteTasks`, preventing possible concurrent inconsistency
    
    ```
    [ERROR] [Error] 
/Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:56:
 type mismatch;
     found   : 
java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.scheduler.StageInfo]
     required: 
java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.kyuubi.StageInfo]
    [INFO] [Info] : 
java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.scheduler.StageInfo]
 <: 
java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.kyuubi.StageInfo]?
    [INFO] [Info] : false
    [ERROR] [Error] 
/Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:126:
 not enough arguments for constructor StageInfo: (stageId: Int, attemptId: Int, 
name: String, numTasks: Int, rddInfos: Seq[org.apache.spark.storage.RDDInfo], 
parentIds: Seq[Int], details: String, taskMetrics: 
org.apache.spark.executor.TaskMetrics, taskLocalityPreferences: 
Seq[Seq[org.apache.spark.scheduler.TaskLocation]], shuffleDepId: Option[Int 
[...]
    Unspecified value parameters name, numTasks, rddInfos...
    [ERROR] [Error] 
/Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:148:
 value numActiveTasks is not a member of org.apache.spark.scheduler.StageInfo
    [ERROR] [Error] 
/Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:156:
 value numActiveTasks is not a member of org.apache.spark.scheduler.StageInfo
    [ERROR] [Error] 
/Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:158:
 value numCompleteTasks is not a member of org.apache.spark.scheduler.StageInfo
    ```
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No.
    
    Closes #5184 from bowenliang123/spark-stage-info.
    
    Closes #5184
    
    fd0b9b564 [liangbowen] update
    d410491f3 [liangbowen] rename Kyuubi's StageInfo to SparkStageInfo 
preventing class mismatch
    
    Authored-by: liangbowen <[email protected]>
    Signed-off-by: liangbowen <[email protected]>
---
 .../org/apache/spark/kyuubi/SQLOperationListener.scala | 18 +++++++++---------
 .../apache/spark/kyuubi/SparkConsoleProgressBar.scala  |  6 +++---
 .../scala/org/apache/spark/kyuubi/StageStatus.scala    | 10 ++++++----
 3 files changed, 18 insertions(+), 16 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
index 1a57fcf29..4e4a940d2 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
@@ -45,7 +45,7 @@ class SQLOperationListener(
 
   private val operationId: String = operation.getHandle.identifier.toString
   private lazy val activeJobs = new java.util.HashSet[Int]()
-  private lazy val activeStages = new ConcurrentHashMap[StageAttempt, 
StageInfo]()
+  private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, 
SparkStageInfo]()
   private var executionId: Option[Long] = None
 
   private val conf: KyuubiConf = operation.getSession.sessionManager.getConf
@@ -120,10 +120,10 @@ class SQLOperationListener(
         val stageInfo = stageSubmitted.stageInfo
         val stageId = stageInfo.stageId
         val attemptNumber = stageInfo.attemptNumber()
-        val stageAttempt = StageAttempt(stageId, attemptNumber)
+        val stageAttempt = SparkStageAttempt(stageId, attemptNumber)
         activeStages.put(
           stageAttempt,
-          new StageInfo(stageId, stageInfo.numTasks))
+          new SparkStageInfo(stageId, stageInfo.numTasks))
         withOperationLog {
           info(s"Query [$operationId]: Stage $stageId.$attemptNumber started " 
+
             s"with ${stageInfo.numTasks} tasks, ${activeStages.size()} active 
stages running")
@@ -134,7 +134,7 @@ class SQLOperationListener(
 
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): 
Unit = {
     val stageInfo = stageCompleted.stageInfo
-    val stageAttempt = StageAttempt(stageInfo.stageId, 
stageInfo.attemptNumber())
+    val stageAttempt = SparkStageAttempt(stageInfo.stageId, 
stageInfo.attemptNumber())
     activeStages.synchronized {
       if (activeStages.remove(stageAttempt) != null) {
         withOperationLog(super.onStageCompleted(stageCompleted))
@@ -143,19 +143,19 @@ class SQLOperationListener(
   }
 
   override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = 
activeStages.synchronized {
-    val stageAttempt = StageAttempt(taskStart.stageId, 
taskStart.stageAttemptId)
+    val stageAttempt = SparkStageAttempt(taskStart.stageId, 
taskStart.stageAttemptId)
     if (activeStages.containsKey(stageAttempt)) {
-      activeStages.get(stageAttempt).numActiveTasks += 1
+      activeStages.get(stageAttempt).numActiveTasks.getAndIncrement()
       super.onTaskStart(taskStart)
     }
   }
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = 
activeStages.synchronized {
-    val stageAttempt = StageAttempt(taskEnd.stageId, taskEnd.stageAttemptId)
+    val stageAttempt = SparkStageAttempt(taskEnd.stageId, 
taskEnd.stageAttemptId)
     if (activeStages.containsKey(stageAttempt)) {
-      activeStages.get(stageAttempt).numActiveTasks -= 1
+      activeStages.get(stageAttempt).numActiveTasks.getAndDecrement()
       if (taskEnd.reason == org.apache.spark.Success) {
-        activeStages.get(stageAttempt).numCompleteTasks += 1
+        activeStages.get(stageAttempt).numCompleteTasks.getAndIncrement()
       }
       super.onTaskEnd(taskEnd)
     }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
index fc2ebd5f8..dc8b493cc 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
@@ -29,7 +29,7 @@ import org.apache.kyuubi.operation.Operation
 
 class SparkConsoleProgressBar(
     operation: Operation,
-    liveStages: ConcurrentHashMap[StageAttempt, StageInfo],
+    liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo],
     updatePeriodMSec: Long,
     timeFormat: String)
   extends Logging {
@@ -77,7 +77,7 @@ class SparkConsoleProgressBar(
    * after your last output, keeps overwriting itself to hold in one line. The 
logging will follow
    * the progress bar, then progress bar will be showed in next line without 
overwrite logs.
    */
-  private def show(now: Long, stages: Seq[StageInfo]): Unit = {
+  private def show(now: Long, stages: Seq[SparkStageInfo]): Unit = {
     val width = TerminalWidth / stages.size
     val bar = stages.map { s =>
       val total = s.numTasks
@@ -86,7 +86,7 @@ class SparkConsoleProgressBar(
       val w = width - header.length - tailer.length
       val bar =
         if (w > 0) {
-          val percent = w * s.numCompleteTasks / total
+          val percent = w * s.numCompleteTasks.get / total
           (0 until w).map { i =>
             if (i < percent) "=" else if (i == percent) ">" else " "
           }.mkString("")
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
index 144570862..2ea9c3fda 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.kyuubi
 
-case class StageAttempt(stageId: Int, stageAttemptId: Int) {
+import java.util.concurrent.atomic.AtomicInteger
+
+case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) {
   override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
 }
 
-class StageInfo(val stageId: Int, val numTasks: Int) {
-  var numActiveTasks = 0
-  var numCompleteTasks = 0
+class SparkStageInfo(val stageId: Int, val numTasks: Int) {
+  var numActiveTasks = new AtomicInteger(0)
+  var numCompleteTasks = new AtomicInteger(0)
 }

Reply via email to