This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new a0fdead6d [KYUUBI #5853] Ensure closing the operation log for batch
submission in fast-failed case
a0fdead6d is described below
commit a0fdead6d257be54127d84f526918e4fd52a18ce
Author: Bowen Liang <[email protected]>
AuthorDate: Thu Dec 14 20:24:48 2023 +0800
[KYUUBI #5853] Ensure closing the operation log for batch submission in
fast-failed case
# :mag: Description
## Issue References ๐
This pull request fixes comment in PR
[#5733](https://github.com/apache/kyuubi/pull/5733#issuecomment-1855055891)
## Describe Your Solution ๐ง
To ensure the operation log closed even in fast-failed case.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklists
## ๐ Author Self Checklist
- [x] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature
works
- [x] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested
**Be nice. Be informative.**
Closes #5853 from bowenliang123/close-oplog.
Closes #5853
34192fb6b [Bowen Liang] import
4fe004613 [Bowen Liang] withClosingOperationLog
Authored-by: Bowen Liang <[email protected]>
Signed-off-by: liangbowen <[email protected]>
---
.../org/apache/kyuubi/operation/AbstractOperation.scala | 16 ++++++++++++++++
.../org/apache/kyuubi/operation/BatchJobSubmission.scala | 11 ++---------
2 files changed, 18 insertions(+), 9 deletions(-)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 202112f59..9d5621a82 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.operation
+import java.io.IOException
import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.locks.ReentrantLock
@@ -247,4 +248,19 @@ abstract class AbstractOperation(session: Session) extends
Operation with Loggin
ok.setInfoMessages(hints.asJava)
ok
}
+
+ /**
+ * Close the OperationLog, after running the block
+ */
+ def withClosingOperationLog[T](f: => T): T = {
+ try {
+ f
+ } finally {
+ try {
+ getOperationLog.foreach(_.close())
+ } catch {
+ case e: IOException => error(e.getMessage, e)
+ }
+ }
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 9778b2040..276fe3446 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -17,7 +17,6 @@
package org.apache.kyuubi.operation
-import java.io.IOException
import java.nio.file.{Files, Paths}
import java.util.Locale
import java.util.concurrent.TimeUnit
@@ -336,7 +335,7 @@ class BatchJobSubmission(
}
}
- override def close(): Unit = withLockRequired {
+ override def close(): Unit = withLockRequired(withClosingOperationLog {
if (!isClosedOrCanceled) {
MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN,
opType)))
@@ -373,13 +372,7 @@ class BatchJobSubmission(
}
}
}
-
- try {
- getOperationLog.foreach(_.close())
- } catch {
- case e: IOException => error(e.getMessage, e)
- }
- }
+ })
override def cancel(): Unit = {
throw new IllegalStateException("Use close instead.")