This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.3
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.3 by this push:
new 3c8cce4 [KYUUBI #985] Add logCaptureThreadReleased flag
3c8cce4 is described below
commit 3c8cce414de774f89a6007ebfd89aafa5fb13e22
Author: ulysses-you <[email protected]>
AuthorDate: Wed Aug 25 19:21:35 2021 +0800
[KYUUBI #985] Add logCaptureThreadReleased flag
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
`isInterrupted` is always false if some `InterruptedException` are throwed.
So we should use a new flag to check if log capture thread is released.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #985 from ulysses-you/flaky-test.
Closes #985
1fcc0a99 [ulysses-you] empty
463a996f [ulysses-you] empty
d1f2ea54 [ulysses-you] eventually
f9f37343 [ulysses-you] fix
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 3afd20e03aaafff106da596208c3a2a89b2cab24)
Signed-off-by: Cheng Pan <[email protected]>
---
.../src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala | 5 ++++-
.../org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala | 7 +++++--
2 files changed, 9 insertions(+), 3 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 41d7f87..6e48170 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -65,7 +65,8 @@ trait ProcBuilder {
@volatile private var error: Throwable = UNCAUGHT_ERROR
@volatile private var lastRowOfLog: String = "unknown"
// Visible for test
- private[kyuubi] var logCaptureThread: Thread = _
+ @volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
+ private var logCaptureThread: Thread = _
private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
@@ -136,10 +137,12 @@ trait ProcBuilder {
case _: IOException =>
case _: InterruptedException =>
} finally {
+ logCaptureThreadReleased = true
reader.close()
}
}
+ logCaptureThreadReleased = false
logCaptureThread = PROC_BUILD_LOGGER.newThread(redirect)
logCaptureThread.start()
proc
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index 6f2b2e1..5928229 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -126,13 +126,16 @@ class SparkProcessBuilderSuite extends
KerberizedTestHelper {
test("log capture should release after close") {
val process = new FakeSparkProcessBuilder(KyuubiConf())
try {
+ assert(process.logCaptureThreadReleased)
val subProcess = process.start
- assert(!process.logCaptureThread.isInterrupted)
+ assert(!process.logCaptureThreadReleased)
subProcess.waitFor(3, TimeUnit.SECONDS)
} finally {
process.close()
}
- assert(process.logCaptureThread.isInterrupted)
+ eventually(timeout(3.seconds), interval(100.milliseconds)) {
+ assert(process.logCaptureThreadReleased)
+ }
}
test(s"sub process log should be overwritten") {