This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.3
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.3 by this push:
     new 3c8cce4  [KYUUBI #985] Add logCaptureThreadReleased flag
3c8cce4 is described below

commit 3c8cce414de774f89a6007ebfd89aafa5fb13e22
Author: ulysses-you <[email protected]>
AuthorDate: Wed Aug 25 19:21:35 2021 +0800

    [KYUUBI #985] Add logCaptureThreadReleased flag
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    `isInterrupted` is always false if some `InterruptedException` are throwed. 
So we should use a new flag to check if log capture thread is released.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #985 from ulysses-you/flaky-test.
    
    Closes #985
    
    1fcc0a99 [ulysses-you] empty
    463a996f [ulysses-you] empty
    d1f2ea54 [ulysses-you] eventually
    f9f37343 [ulysses-you] fix
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 3afd20e03aaafff106da596208c3a2a89b2cab24)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala      | 5 ++++-
 .../org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala  | 7 +++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 41d7f87..6e48170 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -65,7 +65,8 @@ trait ProcBuilder {
   @volatile private var error: Throwable = UNCAUGHT_ERROR
   @volatile private var lastRowOfLog: String = "unknown"
   // Visible for test
-  private[kyuubi] var logCaptureThread: Thread = _
+  @volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
+  private var logCaptureThread: Thread = _
 
   private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
     val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
@@ -136,10 +137,12 @@ trait ProcBuilder {
         case _: IOException =>
         case _: InterruptedException =>
       } finally {
+        logCaptureThreadReleased = true
         reader.close()
       }
     }
 
+    logCaptureThreadReleased = false
     logCaptureThread = PROC_BUILD_LOGGER.newThread(redirect)
     logCaptureThread.start()
     proc
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index 6f2b2e1..5928229 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -126,13 +126,16 @@ class SparkProcessBuilderSuite extends 
KerberizedTestHelper {
   test("log capture should release after close") {
     val process = new FakeSparkProcessBuilder(KyuubiConf())
     try {
+      assert(process.logCaptureThreadReleased)
       val subProcess = process.start
-      assert(!process.logCaptureThread.isInterrupted)
+      assert(!process.logCaptureThreadReleased)
       subProcess.waitFor(3, TimeUnit.SECONDS)
     } finally {
       process.close()
     }
-    assert(process.logCaptureThread.isInterrupted)
+    eventually(timeout(3.seconds), interval(100.milliseconds)) {
+      assert(process.logCaptureThreadReleased)
+    }
   }
 
   test(s"sub process log should be overwritten") {

Reply via email to