Copilot commented on code in PR #495:
URL: https://github.com/apache/incubator-livy/pull/495#discussion_r2595449096


##########
repl/src/main/scala/org/apache/livy/repl/Session.scala:
##########
@@ -161,18 +163,25 @@ class Session(
     _statements.synchronized { _statements(statementId) = statement }
 
     Future {
-      setJobGroup(tpe, statementId)
-      statement.compareAndTransit(StatementState.Waiting, 
StatementState.Running)
+      val currentThread = Thread.currentThread()
+      statementThreads.put(statementId, currentThread)
+      try {
+        setJobGroup(tpe, statementId)
+        statement.compareAndTransit(StatementState.Waiting, 
StatementState.Running)
 
-      if (statement.state.get() == StatementState.Running) {
-        statement.started = System.currentTimeMillis()
-        statement.output = executeCode(interpreter(tpe), statementId, code)
-      }
+        if (statement.state.get() == StatementState.Running) {
+          statement.started = System.currentTimeMillis()
+          statement.output = executeCode(interpreter(tpe), statementId, code)
+        }
 
-      statement.compareAndTransit(StatementState.Running, 
StatementState.Available)
-      statement.compareAndTransit(StatementState.Cancelling, 
StatementState.Cancelled)
-      statement.updateProgress(1.0)
-      statement.completed = System.currentTimeMillis()
+        statement.compareAndTransit(StatementState.Running, 
StatementState.Available)
+        statement.compareAndTransit(StatementState.Cancelling, 
StatementState.Cancelled)
+        statement.updateProgress(1.0)
+        statement.completed = System.currentTimeMillis()
+      } finally {
+        statementThreads.remove(statementId, currentThread)
+        Thread.interrupted()
+      }

Review Comment:
   The call to `Thread.interrupted()` clears the interrupt flag but discards 
the result. This could mask important interrupt state information. Consider 
either:
   1. Checking the result and logging it if the thread was interrupted
   2. Using `Thread.currentThread().isInterrupted()` if you only want to check 
without clearing the flag
   
   If the intent is to reset the interrupt flag regardless, consider adding a 
comment explaining this design choice.
   ```suggestion
           // Clear the interrupt flag, but log if the thread was interrupted.
           if (Thread.interrupted()) {
             logWarning(s"Thread was interrupted during execution of statement 
$statementId; interrupt flag cleared.")
           }
   ```



##########
repl/src/main/scala/org/apache/livy/repl/Session.scala:
##########
@@ -161,18 +163,25 @@ class Session(
     _statements.synchronized { _statements(statementId) = statement }
 
     Future {
-      setJobGroup(tpe, statementId)
-      statement.compareAndTransit(StatementState.Waiting, 
StatementState.Running)
+      val currentThread = Thread.currentThread()
+      statementThreads.put(statementId, currentThread)
+      try {
+        setJobGroup(tpe, statementId)
+        statement.compareAndTransit(StatementState.Waiting, 
StatementState.Running)
 
-      if (statement.state.get() == StatementState.Running) {
-        statement.started = System.currentTimeMillis()
-        statement.output = executeCode(interpreter(tpe), statementId, code)
-      }
+        if (statement.state.get() == StatementState.Running) {
+          statement.started = System.currentTimeMillis()
+          statement.output = executeCode(interpreter(tpe), statementId, code)
+        }
 
-      statement.compareAndTransit(StatementState.Running, 
StatementState.Available)
-      statement.compareAndTransit(StatementState.Cancelling, 
StatementState.Cancelled)
-      statement.updateProgress(1.0)
-      statement.completed = System.currentTimeMillis()
+        statement.compareAndTransit(StatementState.Running, 
StatementState.Available)
+        statement.compareAndTransit(StatementState.Cancelling, 
StatementState.Cancelled)
+        statement.updateProgress(1.0)
+        statement.completed = System.currentTimeMillis()
+      } finally {
+        statementThreads.remove(statementId, currentThread)

Review Comment:
   The `statementThreads.remove(statementId, currentThread)` uses the 
two-argument remove method which only removes the entry if the current value 
matches. While this is correct, if a thread is somehow replaced in the map 
between put and remove, the entry won't be cleaned up. Consider adding a 
comment to document that this is intentional to handle edge cases, or use the 
simpler `statementThreads.remove(statementId)` if the additional safety check 
isn't needed.
   ```suggestion
           // Always remove the entry for this statementId to avoid leaks, even 
if the thread was replaced.
           statementThreads.remove(statementId)
   ```



##########
repl/src/main/scala/org/apache/livy/repl/Session.scala:
##########
@@ -161,18 +163,25 @@ class Session(
     _statements.synchronized { _statements(statementId) = statement }
 
     Future {
-      setJobGroup(tpe, statementId)
-      statement.compareAndTransit(StatementState.Waiting, 
StatementState.Running)
+      val currentThread = Thread.currentThread()
+      statementThreads.put(statementId, currentThread)
+      try {

Review Comment:
   There's a potential race condition: the thread could be interrupted between 
`statementThreads.put()` (line 167) and the try block execution. If `cancel()` 
is called during this window, the interrupt would be lost. Consider moving the 
`statementThreads.put()` call inside the try block, immediately before 
`setJobGroup()`, to minimize the race window.
   ```suggestion
         try {
           statementThreads.put(statementId, currentThread)
   ```



##########
repl/src/main/scala/org/apache/livy/repl/Session.scala:
##########
@@ -212,6 +221,7 @@ class Session(
           info(s"Failed to cancel statement $statementId.")
           statement.compareAndTransit(StatementState.Cancelling, 
StatementState.Cancelled)
         } else {
+          Option(statementThreads.get(statementId)).foreach(_.interrupt())

Review Comment:
   The thread interrupt is called repeatedly in a loop (line 224) for the same 
thread, which is redundant. Once a thread is interrupted, repeatedly calling 
`interrupt()` doesn't provide additional benefit. Consider moving the interrupt 
call outside the while loop, right before line 219, to interrupt the thread 
once at the beginning of the cancellation process rather than repeatedly.



##########
repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala:
##########
@@ -236,6 +236,33 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
     }
   }
 
+  it should "cancel driver code without spark jobs" in withSession { session =>
+    val stmtId = session.execute(
+      """
+        |Thread.sleep(60000)

Review Comment:
   [nitpick] Using a 60-second sleep in tests can make the test suite slow. 
Consider using a shorter sleep duration (e.g., 5-10 seconds) which should still 
be sufficient to validate the cancellation behavior while keeping tests fast. 
The timeout on line 247 is already set to 30 seconds, so a shorter sleep would 
be more appropriate.
   ```suggestion
           |Thread.sleep(5000)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to