yaooqinn commented on a change in pull request #814:
URL: https://github.com/apache/incubator-kyuubi/pull/814#discussion_r670935458
##########
File path:
externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala
##########
@@ -55,12 +85,57 @@ object KyuubiStatementMonitor extends Logging{
}
/**
- * This function is used for removing kyuubiStatementInfo from
blockingQueue(statementQueue)
+ * This method is used for removing kyuubiStatementInfo from
blockingQueue(statementQueue)
* and dumpping them to a file by threshold.
*/
// TODO: Need ensure those items have finished. If not, we should put them
into this queue again.
private def removeAndDumpStatementInfoFromQueue(): Unit = {
// TODO: Just for test
kyuubiStatementQueue.clear()
}
+
+ /**
+ * This method is used for putting kyuubiJobInfo into blockingQueue(jobQueue)
+ * and storing the mapping relationship between jobId and jobInfo.
+ * The reason that we need to maintain a mapping relationship
+ * is we need to store endTime and jobResult
+ * when this job has finished but the object-jobEnd has nothing but jobId.
+ * @param kyuubiJobInfo
+ */
+ // TODO: Lack size type threshold and time type threshold
+ def putJobInfoIntoQueue(kyuubiJobInfo: KyuubiJobInfo): Unit = {
+ if (kyuubiJobQueue.size() >= maxSize) {
+ removeAndDumpJobInfoFromQueue()
+ }
+ // Put kyuubiJobInfo into kyuubiJobQueue
+ kyuubiJobQueue.add(kyuubiJobInfo)
+ // Store the relationship between jobID and jobInfo
+ jobIdToJobInfoMap.put(kyuubiJobInfo.jobId, kyuubiJobInfo)
+ }
+
+ /**
+ * This method is used for removing kyuubiJobInfo from
blockingQueue(jobQueue)
+ * and dumpping them to a file by threshold.
+ */
+ // TODO: Need ensure those items have finished. If not, we should put them
into this queue again.
+ private def removeAndDumpJobInfoFromQueue(): Unit = {
+ // TODO: Just for test
+ kyuubiJobQueue.clear()
+ }
+
+ /**
+ * This method is used for adding endTime and jobResult into jobInfo.
+ * Those fields can only get when this job has finished.
+ *
+ * Notice:
+ * 1. When this job has finished, you should remove it from
jobIdToJobInfoMap.
+ * @param jobEnd
+ */
+ def addJobEndInfo(jobEnd: SparkListenerJobEnd): Unit = {
Review comment:
why does it call addXX but remove something?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]