This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 00ddcd39b [KYUUBI #5184] [Improvement] Rename Kyuubi's StageInfo to
SparkStageInfo to fix class mismatch
00ddcd39b is described below
commit 00ddcd39bb9062f79c26bb14dcb719fe31513a10
Author: liangbowen <[email protected]>
AuthorDate: Mon Aug 21 19:59:43 2023 +0800
[KYUUBI #5184] [Improvement] Rename Kyuubi's StageInfo to SparkStageInfo to
fix class mismatch
### _Why are the changes needed?_
- Fix class mismatch when trying to compilation on Scala 2.13, due to
implicit class reference to `StageInfo`. The compilation fails by type
mismatching, if the compiler classloader loads Spark's
`org.apache.spark.schedulerStageInfo` ahead of Kyuubi's
`org.apache.spark.kyuubi.StageInfo`.
- Change var integer to AtomicInteger for `numActiveTasks` and
`numCompleteTasks`, preventing possible concurrent inconsistency
```
[ERROR] [Error]
/Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:56:
type mismatch;
found :
java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.scheduler.StageInfo]
required:
java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.kyuubi.StageInfo]
[INFO] [Info] :
java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.scheduler.StageInfo]
<:
java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.kyuubi.StageInfo]?
[INFO] [Info] : false
[ERROR] [Error]
/Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:126:
not enough arguments for constructor StageInfo: (stageId: Int, attemptId: Int,
name: String, numTasks: Int, rddInfos: Seq[org.apache.spark.storage.RDDInfo],
parentIds: Seq[Int], details: String, taskMetrics:
org.apache.spark.executor.TaskMetrics, taskLocalityPreferences:
Seq[Seq[org.apache.spark.scheduler.TaskLocation]], shuffleDepId: Option[Int
[...]
Unspecified value parameters name, numTasks, rddInfos...
[ERROR] [Error]
/Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:148:
value numActiveTasks is not a member of org.apache.spark.scheduler.StageInfo
[ERROR] [Error]
/Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:156:
value numActiveTasks is not a member of org.apache.spark.scheduler.StageInfo
[ERROR] [Error]
/Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:158:
value numCompleteTasks is not a member of org.apache.spark.scheduler.StageInfo
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5184 from bowenliang123/spark-stage-info.
Closes #5184
fd0b9b564 [liangbowen] update
d410491f3 [liangbowen] rename Kyuubi's StageInfo to SparkStageInfo
preventing class mismatch
Authored-by: liangbowen <[email protected]>
Signed-off-by: liangbowen <[email protected]>
---
.../org/apache/spark/kyuubi/SQLOperationListener.scala | 18 +++++++++---------
.../apache/spark/kyuubi/SparkConsoleProgressBar.scala | 6 +++---
.../scala/org/apache/spark/kyuubi/StageStatus.scala | 10 ++++++----
3 files changed, 18 insertions(+), 16 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
index 1a57fcf29..4e4a940d2 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
@@ -45,7 +45,7 @@ class SQLOperationListener(
private val operationId: String = operation.getHandle.identifier.toString
private lazy val activeJobs = new java.util.HashSet[Int]()
- private lazy val activeStages = new ConcurrentHashMap[StageAttempt,
StageInfo]()
+ private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt,
SparkStageInfo]()
private var executionId: Option[Long] = None
private val conf: KyuubiConf = operation.getSession.sessionManager.getConf
@@ -120,10 +120,10 @@ class SQLOperationListener(
val stageInfo = stageSubmitted.stageInfo
val stageId = stageInfo.stageId
val attemptNumber = stageInfo.attemptNumber()
- val stageAttempt = StageAttempt(stageId, attemptNumber)
+ val stageAttempt = SparkStageAttempt(stageId, attemptNumber)
activeStages.put(
stageAttempt,
- new StageInfo(stageId, stageInfo.numTasks))
+ new SparkStageInfo(stageId, stageInfo.numTasks))
withOperationLog {
info(s"Query [$operationId]: Stage $stageId.$attemptNumber started "
+
s"with ${stageInfo.numTasks} tasks, ${activeStages.size()} active
stages running")
@@ -134,7 +134,7 @@ class SQLOperationListener(
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted):
Unit = {
val stageInfo = stageCompleted.stageInfo
- val stageAttempt = StageAttempt(stageInfo.stageId,
stageInfo.attemptNumber())
+ val stageAttempt = SparkStageAttempt(stageInfo.stageId,
stageInfo.attemptNumber())
activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
withOperationLog(super.onStageCompleted(stageCompleted))
@@ -143,19 +143,19 @@ class SQLOperationListener(
}
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit =
activeStages.synchronized {
- val stageAttempt = StageAttempt(taskStart.stageId,
taskStart.stageAttemptId)
+ val stageAttempt = SparkStageAttempt(taskStart.stageId,
taskStart.stageAttemptId)
if (activeStages.containsKey(stageAttempt)) {
- activeStages.get(stageAttempt).numActiveTasks += 1
+ activeStages.get(stageAttempt).numActiveTasks.getAndIncrement()
super.onTaskStart(taskStart)
}
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit =
activeStages.synchronized {
- val stageAttempt = StageAttempt(taskEnd.stageId, taskEnd.stageAttemptId)
+ val stageAttempt = SparkStageAttempt(taskEnd.stageId,
taskEnd.stageAttemptId)
if (activeStages.containsKey(stageAttempt)) {
- activeStages.get(stageAttempt).numActiveTasks -= 1
+ activeStages.get(stageAttempt).numActiveTasks.getAndDecrement()
if (taskEnd.reason == org.apache.spark.Success) {
- activeStages.get(stageAttempt).numCompleteTasks += 1
+ activeStages.get(stageAttempt).numCompleteTasks.getAndIncrement()
}
super.onTaskEnd(taskEnd)
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
index fc2ebd5f8..dc8b493cc 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
@@ -29,7 +29,7 @@ import org.apache.kyuubi.operation.Operation
class SparkConsoleProgressBar(
operation: Operation,
- liveStages: ConcurrentHashMap[StageAttempt, StageInfo],
+ liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo],
updatePeriodMSec: Long,
timeFormat: String)
extends Logging {
@@ -77,7 +77,7 @@ class SparkConsoleProgressBar(
* after your last output, keeps overwriting itself to hold in one line. The
logging will follow
* the progress bar, then progress bar will be showed in next line without
overwrite logs.
*/
- private def show(now: Long, stages: Seq[StageInfo]): Unit = {
+ private def show(now: Long, stages: Seq[SparkStageInfo]): Unit = {
val width = TerminalWidth / stages.size
val bar = stages.map { s =>
val total = s.numTasks
@@ -86,7 +86,7 @@ class SparkConsoleProgressBar(
val w = width - header.length - tailer.length
val bar =
if (w > 0) {
- val percent = w * s.numCompleteTasks / total
+ val percent = w * s.numCompleteTasks.get / total
(0 until w).map { i =>
if (i < percent) "=" else if (i == percent) ">" else " "
}.mkString("")
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
index 144570862..2ea9c3fda 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
@@ -17,11 +17,13 @@
package org.apache.spark.kyuubi
-case class StageAttempt(stageId: Int, stageAttemptId: Int) {
+import java.util.concurrent.atomic.AtomicInteger
+
+case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) {
override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
}
-class StageInfo(val stageId: Int, val numTasks: Int) {
- var numActiveTasks = 0
- var numCompleteTasks = 0
+class SparkStageInfo(val stageId: Int, val numTasks: Int) {
+ var numActiveTasks = new AtomicInteger(0)
+ var numCompleteTasks = new AtomicInteger(0)
}