ulysses-you commented on a change in pull request #814:
URL: https://github.com/apache/incubator-kyuubi/pull/814#discussion_r671966769
##########
File path:
externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
##########
@@ -76,6 +78,46 @@ class KyuubiStatementMonitorSuite extends WithSparkSQLEngine
with HiveJDBCTests
}
}
+ test("add kyuubiJobInfo into queue and remove them when threshold reached") {
+ val sql = "select timestamp'2021-06-01'"
+ val getJobMap = PrivateMethod[
+ ConcurrentHashMap[Int,
KyuubiJobInfo]](Symbol("kyuubiJobIdToJobInfoMap"))()
+
+ val jobIdToJobInfoMap = KyuubiStatementMonitor.invokePrivate(getJobMap)
+ jobIdToJobInfoMap.clear()
+ withSessionHandle { (client, handle) =>
+ val req = new TExecuteStatementReq()
+ req.setSessionHandle(handle)
+ req.setStatement(sql)
+ val tExecuteStatementResp = client.ExecuteStatement(req)
+ val opHandle = tExecuteStatementResp.getOperationHandle
+
+ eventually(timeout(10.seconds), interval(100.milliseconds)) {
+ val elements = jobIdToJobInfoMap.elements()
+ while (elements.hasMoreElements) {
+ val kyuubiJobInfo = elements.nextElement()
+ assert(jobIdToJobInfoMap.size() === 1)
+ assert(kyuubiJobInfo.statementId ===
OperationHandle(opHandle).identifier.toString)
+ assert(kyuubiJobInfo.stageIds.length === 1)
+ assert(kyuubiJobInfo.jobResult === JobSucceeded)
+ assert(kyuubiJobInfo.endTime !== 0)
+ }
+ }
+
+ // Test for clear kyuubiJobIdToJobInfoMap when threshold reached
+ // This function is used for avoiding mem leak
+ for ( a <- 1 to 7 ) {
Review comment:
```
(1 to 7).forech { _ =>
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]