ulysses-you commented on a change in pull request #814:
URL: https://github.com/apache/incubator-kyuubi/pull/814#discussion_r671965427



##########
File path: 
externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala
##########
@@ -55,12 +78,55 @@ object KyuubiStatementMonitor extends Logging{
   }
 
   /**
-   * This function is used for removing kyuubiStatementInfo from 
blockingQueue(statementQueue)
+   * This method is used for removing kyuubiStatementInfo from 
blockingQueue(statementQueue)
    * and dumpping them to a file by threshold.
    */
   // TODO: Need ensure those items have finished. If not, we should put them 
into this queue again.
   private def removeAndDumpStatementInfoFromQueue(): Unit = {
     // TODO: Just for test
     kyuubiStatementQueue.clear()
   }
+
+  /**
+   * This method is used for putting kyuubiJobInfo into 
hashMap(kyuubiJobIdToJobInfoMap)
+   * and storing the mapping relationship between jobId and jobInfo.
+   * The reason that we need to maintain a mapping relationship
+   * is we need to store endTime and jobResult
+   * when this job has finished but the object-jobEnd has nothing but jobId.
+   *
+   * @param kyuubiJobInfo
+   */
+  // TODO: Lack size type threshold and time type threshold
+  def putJobInfoIntoMap(kyuubiJobInfo: KyuubiJobInfo): Unit = {
+    if (kyuubiJobIdToJobInfoMap.size() >= maxSize) {
+      removeAndDumpJobInfoFromMap()
+    }
+    // Put kyuubiJobInfo into kyuubiJobIdToJobInfoMap
+    kyuubiJobIdToJobInfoMap.put(kyuubiJobInfo.jobId, kyuubiJobInfo)
+  }
+
+  /**
+   * This method is used for removing kyuubiJobInfo from 
hashMap(kyuubiJobIdToJobInfoMap)
+   * and dumpping them to a file by threshold.
+   */
+  private def removeAndDumpJobInfoFromMap(): Unit = {
+    // TODO: Just for test
+    kyuubiJobIdToJobInfoMap.clear()
+  }
+
+  /**
+   * This method is used for inserting endTime and jobResult.
+   * Those fields can only get when this job has finished.
+   *
+   * Notice:
+   *    1. endTime and jobResult should consider the thread safe.
+   *
+   * @param jobEnd
+   */
+  def insertEndTimeAndJobResult(jobEnd: SparkListenerJobEnd): Unit = {

Review comment:
       `insertEndTimeAndJobResult` -> `insertJobEndTimeAndResult `




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to