This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new db57e9365d [KYUUBI #6587] Periodically expire temp files and operation
logs on server to avoid memeory leak by Files.deleteOnExit
db57e9365d is described below
commit db57e9365d7933942197936ac7ff711d58f7ea91
Author: Bowen Liang <[email protected]>
AuthorDate: Wed Aug 28 17:13:27 2024 +0800
[KYUUBI #6587] Periodically expire temp files and operation logs on server
to avoid memeory leak by Files.deleteOnExit
# :mag: Description
## Issue References ๐
-
## Describe Your Solution ๐ง
Fix the memory leak on server caused by `Files.deleteOnExit`.
For long-running Kyuubi server instances, some operation log files and
batch job upload files are marked for deletion at exit using
`Files.deleteOnExit`. However, the `files` list within the `DeleteOnExitHook`
by `Files.deleteOnExit` method continuously accumulates file paths without
being cleaned up, leading to a memory leak issue.
This PR fix this issue by:
1. introduce a new util `FileExpirationUtils` for similar use of
`Files.deleteOnExit`, with exposed method for evict file path from the list to
prevent accumulative path list
2. adding a service `TempFileService ` in server module, periodical
clean-up the files for operation logging path, uploaded resources and etc. And
it evict the paths in `TempFileCleanupUtils` instance after cleanup.
3. add the new config `kyuubi.server.tempFile.expireTime` with a default
value of 7 days, to control How often to trigger a file expiration clean-up for
stale files
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6587 from bowenliang123/file-expiration.
Closes #6587
e23b72e08 [liangbowen] change to P14D
acaf370e7 [liangbowen] change config name to
kyuubi.server.tempFile.expireTime
6c7ddd527 [liangbowen] import
ed1e4d76f [liangbowen] comment: ConcurrentHashMap.newKeySet
fbf73ccb4 [liangbowen] update
34d3fc71c [liangbowen] add guava to common module's dep
49c10e5ef [Bowen Liang] file expiration
Lead-authored-by: Bowen Liang <[email protected]>
Co-authored-by: liangbowen <[email protected]>
Co-authored-by: Bowen Liang <[email protected]>
Signed-off-by: liangbowen <[email protected]>
---
.gitignore | 1 +
docs/configuration/settings.md | 37 ++++-----
.../engine/spark/operation/ExecutePython.scala | 3 +-
kyuubi-common/pom.xml | 5 ++
.../src/main/scala/org/apache/kyuubi/Utils.scala | 12 ++-
.../org/apache/kyuubi/config/KyuubiConf.scala | 8 ++
.../apache/kyuubi/operation/log/OperationLog.scala | 16 ++--
.../apache/kyuubi/service/TempFileService.scala | 91 ++++++++++++++++++++++
.../apache/kyuubi/session/AbstractSession.scala | 5 +-
.../apache/kyuubi/util/TempFileCleanupUtils.scala | 65 ++++++++++++++++
.../org/apache/kyuubi/server/KyuubiServer.scala | 7 +-
.../kyuubi/server/api/v1/BatchesResource.scala | 8 +-
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 2 +
.../kyuubi/session/KyuubiSessionManager.scala | 4 +
.../kyuubi/server/TempFileServiceSuite.scala | 53 +++++++++++++
15 files changed, 284 insertions(+), 33 deletions(-)
diff --git a/.gitignore b/.gitignore
index 29bb61545c..9f9ba71548 100644
--- a/.gitignore
+++ b/.gitignore
@@ -67,6 +67,7 @@ embedded_zookeeper/
/externals/kyuubi-spark-sql-engine/engine_operation_logs/
/externals/kyuubi-spark-sql-engine/spark-warehouse/
/work/
+/upload/
/docs/_build/
/kyuubi-common/metrics/
/kyuubi-server/metrics/
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 68014a5cf3..525316b8e0 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -433,24 +433,25 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
### Server
-| Key | Default
|
Meaning
| Type | Since |
-|----------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
-| kyuubi.server.administrators
|| Comma-separated list of Kyuubi service administrators. We use this config to
grant admin permission to any service accounts when security mechanism is
enabled. Note, when kyuubi.authentication is configured to NOSASL or NONE,
everyone is treated as administrator. | set | 1.8.0 |
-| kyuubi.server.info.provider | ENGINE
| The server information provider name, some clients may rely on this
information to check the server compatibilities and functionalities.
<li>SERVER: Return Kyuubi server information.</li> <li>ENGINE: Return Kyuubi
engine information.</li> | string | 1.6.1 |
-| kyuubi.server.limit.batch.connections.per.ipaddress | <undefined>
| Maximum kyuubi server batch connections per ipaddress. Any user exceeding
this limit will not be allowed to connect.
| int | 1.7.0 |
-| kyuubi.server.limit.batch.connections.per.user | <undefined>
| Maximum kyuubi server batch connections per user. Any user exceeding this
limit will not be allowed to connect.
| int | 1.7.0 |
-| kyuubi.server.limit.batch.connections.per.user.ipaddress | <undefined>
| Maximum kyuubi server batch connections per user:ipaddress combination. Any
user-ipaddress exceeding this limit will not be allowed to connect.
| int | 1.7.0 |
-| kyuubi.server.limit.client.fetch.max.rows | <undefined>
| Max rows limit for getting result row set operation. If the max rows
specified by client-side is larger than the limit, request will fail directly.
| int | 1.8.0 |
-| kyuubi.server.limit.connections.ip.deny.list
|| The client ip in the deny list will be denied to connect to kyuubi server.
| set | 1.9.1 |
-| kyuubi.server.limit.connections.per.ipaddress | <undefined>
| Maximum kyuubi server connections per ipaddress. Any user exceeding this
limit will not be allowed to connect.
| int | 1.6.0 |
-| kyuubi.server.limit.connections.per.user | <undefined>
| Maximum kyuubi server connections per user. Any user exceeding this limit
will not be allowed to connect.
| int | 1.6.0 |
-| kyuubi.server.limit.connections.per.user.ipaddress | <undefined>
| Maximum kyuubi server connections per user:ipaddress combination. Any
user-ipaddress exceeding this limit will not be allowed to connect.
| int | 1.6.0 |
-| kyuubi.server.limit.connections.user.deny.list
|| The user in the deny list will be denied to connect to kyuubi server, if the
user has configured both user.unlimited.list and user.deny.list, the priority
of the latter is higher.
| set | 1.8.0 |
-| kyuubi.server.limit.connections.user.unlimited.list
|| The maximum connections of the user in the white list will not be limited.
| set | 1.7.0 |
-| kyuubi.server.name | <undefined>
| The name of Kyuubi Server.
| string | 1.5.0 |
-| kyuubi.server.periodicGC.interval | PT30M
| How often to trigger a garbage collection.
| duration | 1.7.0 |
-| kyuubi.server.redaction.regex | <undefined>
| Regex to decide which Kyuubi contain sensitive information. When this regex
matches a property key or value, the value is redacted from the various logs.
|| 1.6.0 |
-| kyuubi.server.thrift.resultset.default.fetch.size | 1000
| The number of rows sent in one Fetch RPC call by the server to the client, if
not specified by the client. Respect
`hive.server2.thrift.resultset.default.fetch.size` hive conf.
| int
| 1.9.1 |
+| Key | Default
|
Meaning
| Type | Since |
+|----------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------|
+| kyuubi.server.administrators
|| Comma-separated list of Kyuubi service administrators. We use this config to
grant admin permission to any service accounts when security mechanism is
enabled. Note, when kyuubi.authentication is configured to NOSASL or NONE,
everyone is treated as administrator. | set | 1.8.0 |
+| kyuubi.server.info.provider | ENGINE
| The server information provider name, some clients may rely on this
information to check the server compatibilities and functionalities.
<li>SERVER: Return Kyuubi server information.</li> <li>ENGINE: Return Kyuubi
engine information.</li> | string | 1.6.1 |
+| kyuubi.server.limit.batch.connections.per.ipaddress | <undefined>
| Maximum kyuubi server batch connections per ipaddress. Any user exceeding
this limit will not be allowed to connect.
| int | 1.7.0 |
+| kyuubi.server.limit.batch.connections.per.user | <undefined>
| Maximum kyuubi server batch connections per user. Any user exceeding this
limit will not be allowed to connect.
| int | 1.7.0 |
+| kyuubi.server.limit.batch.connections.per.user.ipaddress | <undefined>
| Maximum kyuubi server batch connections per user:ipaddress combination. Any
user-ipaddress exceeding this limit will not be allowed to connect.
| int | 1.7.0 |
+| kyuubi.server.limit.client.fetch.max.rows | <undefined>
| Max rows limit for getting result row set operation. If the max rows
specified by client-side is larger than the limit, request will fail directly.
| int | 1.8.0 |
+| kyuubi.server.limit.connections.ip.deny.list
|| The client ip in the deny list will be denied to connect to kyuubi server.
| set | 1.9.1 |
+| kyuubi.server.limit.connections.per.ipaddress | <undefined>
| Maximum kyuubi server connections per ipaddress. Any user exceeding this
limit will not be allowed to connect.
| int | 1.6.0 |
+| kyuubi.server.limit.connections.per.user | <undefined>
| Maximum kyuubi server connections per user. Any user exceeding this limit
will not be allowed to connect.
| int | 1.6.0 |
+| kyuubi.server.limit.connections.per.user.ipaddress | <undefined>
| Maximum kyuubi server connections per user:ipaddress combination. Any
user-ipaddress exceeding this limit will not be allowed to connect.
| int | 1.6.0 |
+| kyuubi.server.limit.connections.user.deny.list
|| The user in the deny list will be denied to connect to kyuubi server, if the
user has configured both user.unlimited.list and user.deny.list, the priority
of the latter is higher.
| set | 1.8.0 |
+| kyuubi.server.limit.connections.user.unlimited.list
|| The maximum connections of the user in the white list will not be limited.
| set | 1.7.0 |
+| kyuubi.server.name | <undefined>
| The name of Kyuubi Server.
| string | 1.5.0 |
+| kyuubi.server.periodicGC.interval | PT30M
| How often to trigger a garbage collection.
| duration | 1.7.0 |
+| kyuubi.server.redaction.regex | <undefined>
| Regex to decide which Kyuubi contain sensitive information. When this regex
matches a property key or value, the value is redacted from the various logs.
|| 1.6.0 |
+| kyuubi.server.tempFile.expireTime | P14D
| Expiration timout for cleanup server-side temporary files, e.g. operation
logs.
| duration | 1.10.0 |
+| kyuubi.server.thrift.resultset.default.fetch.size | 1000
| The number of rows sent in one Fetch RPC call by the server to the client, if
not specified by the client. Respect
`hive.server2.thrift.resultset.default.fetch.size` hive conf.
| int
| 1.9.1 |
### Session
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index d58a22e45a..771bb65ee2 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -43,6 +43,7 @@ import org.apache.kyuubi.operation.{ArrayFetchIterator,
OperationHandle, Operati
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.TempFileCleanupUtils
import org.apache.kyuubi.util.reflect.DynFields
class ExecutePython(
@@ -398,7 +399,7 @@ object ExecutePython extends Logging {
val source = getClass.getClassLoader.getResourceAsStream(s"python/$pyfile")
val file = new File(pythonPath.toFile, pyfile)
- file.deleteOnExit()
+ TempFileCleanupUtils.deleteOnExit(file)
val sink = new FileOutputStream(file)
val buf = new Array[Byte](1024)
diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml
index 747351e41b..57c5a27fdf 100644
--- a/kyuubi-common/pom.xml
+++ b/kyuubi-common/pom.xml
@@ -123,6 +123,11 @@
<artifactId>HikariCP</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 326b1601ff..f58bff3a37 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -38,6 +38,7 @@ import org.apache.hadoop.util.ShutdownHookManager
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.internal.Tests.IS_TESTING
+import org.apache.kyuubi.util.TempFileCleanupUtils
import org.apache.kyuubi.util.command.CommandLineUtils._
object Utils extends Logging {
@@ -138,6 +139,10 @@ object Utils extends Logging {
* Delete a directory recursively.
*/
def deleteDirectoryRecursively(f: File, ignoreException: Boolean = true):
Unit = {
+ if (f == null || !f.exists()) {
+ return
+ }
+
if (f.isDirectory) {
val files = f.listFiles
if (files != null && files.nonEmpty) {
@@ -164,7 +169,7 @@ object Utils extends Logging {
prefix: String = "kyuubi",
root: String = System.getProperty("java.io.tmpdir")): Path = {
val dir = createDirectory(root, prefix)
- dir.toFile.deleteOnExit()
+ TempFileCleanupUtils.deleteOnExit(dir)
dir
}
@@ -211,9 +216,8 @@ object Utils extends Logging {
} finally {
source.close()
}
- val file = filePath.toFile
- file.deleteOnExit()
- file
+ TempFileCleanupUtils.deleteOnExit(filePath)
+ filePath.toFile
} catch {
case e: Exception =>
error(
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index a88b5f615e..ebb28d4150 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -3113,6 +3113,14 @@ object KyuubiConf {
.timeConf
.createWithDefaultString("PT30M")
+ val SERVER_TEMP_FILE_EXPIRE_TIME: ConfigEntry[Long] =
+ buildConf("kyuubi.server.tempFile.expireTime")
+ .doc("Expiration timout for cleanup server-side temporary files, e.g.
operation logs.")
+ .version("1.10.0")
+ .serverOnly
+ .timeConf
+ .createWithDefaultString("P14D")
+
val SERVER_ADMINISTRATORS: ConfigEntry[Set[String]] =
buildConf("kyuubi.server.administrators")
.doc("Comma-separated list of Kyuubi service administrators. " +
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index b3bd46d35a..e77a726d8c 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -31,7 +31,7 @@ import
org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, Fe
import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TRow,
TRowSet, TStringColumn}
-import org.apache.kyuubi.util.ThriftUtils
+import org.apache.kyuubi.util.{TempFileCleanupUtils, ThriftUtils}
object OperationLog extends Logging {
final private val OPERATION_LOG: InheritableThreadLocal[OperationLog] = {
@@ -49,19 +49,23 @@ object OperationLog extends Logging {
def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()
/**
- * The operation log root directory, this directory will delete when JVM
exit.
+ * The operation log root directory, this directory will be deleted
+ * either after the duration of `kyuubi.server.tempFile.expireTime`
+ * or when JVM exit.
*/
- def createOperationLogRootDirectory(session: Session): Unit = {
- session.sessionManager.operationLogRoot.foreach { operationLogRoot =>
+ def createOperationLogRootDirectory(session: Session): Path = {
+ session.sessionManager.operationLogRoot.map { operationLogRoot =>
val path = Paths.get(operationLogRoot,
session.handle.identifier.toString)
try {
Files.createDirectories(path)
- path.toFile.deleteOnExit()
+ TempFileCleanupUtils.deleteOnExit(path)
+ path
} catch {
case e: IOException =>
error(s"Failed to create operation log root directory: $path", e)
+ null
}
- }
+ }.orNull
}
/**
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala
new file mode 100644
index 0000000000..a53259e7a1
--- /dev/null
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.service
+
+import java.nio.file.{Path, Paths}
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.TempFileService.tempFileCounter
+import org.apache.kyuubi.util.{TempFileCleanupUtils, ThreadUtils}
+
+class TempFileService(name: String) extends AbstractService(name) {
+ def this() = this(classOf[TempFileService].getSimpleName)
+
+ final private var expiringFiles: Cache[String, String] = _
+ private lazy val cleanupScheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-cleanup-scheduler")
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ super.initialize(conf)
+ val expireTimeInMs = conf.get(KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME)
+ expiringFiles = CacheBuilder.newBuilder()
+ .expireAfterWrite(expireTimeInMs, TimeUnit.MILLISECONDS)
+ .removalListener((notification: RemovalNotification[String, String]) => {
+ val pathStr = notification.getValue
+ debug(s"Remove expired temp file: $pathStr")
+ cleanupFilePath(pathStr)
+ })
+ .build[String, String]()
+
+ cleanupScheduler.scheduleAtFixedRate(
+ () => expiringFiles.cleanUp(),
+ 0,
+ Math.max(expireTimeInMs / 10, 100),
+ TimeUnit.MILLISECONDS)
+ }
+
+ override def stop(): Unit = {
+ expiringFiles.asMap().values().forEach(cleanupFilePath)
+ super.stop()
+ }
+
+ private def cleanupFilePath(pathStr: String): Unit = {
+ try {
+ val path = Paths.get(pathStr)
+ TempFileCleanupUtils.cancelDeleteOnExit(path)
+ Utils.deleteDirectoryRecursively(path.toFile)
+ } catch {
+ case e: Throwable => error(s"Failed to delete file $pathStr", e)
+ }
+ }
+
+ /**
+ * add the file path to the expiration list
+ * ensuring the path will be deleted
+ * either after duration
+ * or on the JVM exit
+ *
+ * @param path the path of file or directory
+ */
+ def addPathToExpiration(path: Path): Unit = {
+ require(path != null)
+ expiringFiles.put(
+ s"${tempFileCounter.incrementAndGet()}-${System.currentTimeMillis()}",
+ path.toString)
+ TempFileCleanupUtils.deleteOnExit(path)
+ }
+}
+
+object TempFileService {
+ private lazy val tempFileCounter = new AtomicLong(0)
+}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 14e59078fb..1fe5188ad6 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.session
+import java.nio.file.Path
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
@@ -259,8 +260,10 @@ abstract class AbstractSession(
}
}
+ protected var operationalLogRootDir: Option[Path] = None
+
override def open(): Unit = {
- OperationLog.createOperationLogRootDirectory(this)
+ operationalLogRootDir =
Option(OperationLog.createOperationLogRootDirectory(this))
}
val isForAliveProbe: Boolean =
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala
new file mode 100644
index 0000000000..9e27fe4269
--- /dev/null
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.util
+
+import java.io.File
+import java.nio.file.{Path, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.kyuubi.Utils
+
+object TempFileCleanupUtils {
+ private lazy val deleteTargets = ConcurrentHashMap.newKeySet[String]()
+
+ private lazy val isCleanupShutdownHookInstalled = {
+ installFilesCleanupOnExitShutdownHook()
+ new AtomicBoolean(true)
+ }
+
+ private def installFilesCleanupOnExitShutdownHook(): Unit = {
+ Utils.addShutdownHook(() => {
+ deleteTargets.forEach { pathStr =>
+ try {
+ Utils.deleteDirectoryRecursively(Paths.get(pathStr).toFile)
+ } catch {
+ case _: Exception =>
+ }
+ }
+ deleteTargets.clear()
+ })
+ }
+
+ def deleteOnExit(file: File): Unit = {
+ require(file != null)
+ deleteOnExit(file.toPath)
+ }
+
+ def deleteOnExit(path: Path): Unit = {
+ require(path != null)
+ isCleanupShutdownHookInstalled.get()
+ deleteTargets.add(path.toString)
+ }
+
+ def cancelDeleteOnExit(path: Path): Unit = {
+ require(path != null)
+ isCleanupShutdownHookInstalled.get()
+ deleteTargets.remove(path.toString)
+ }
+
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index ace7ba9d46..f8c9ffa1b4 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -32,7 +32,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{AuthTypes, ServiceDiscovery}
import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf
-import org.apache.kyuubi.service.{AbstractBackendService,
AbstractFrontendService, Serverable, ServiceState}
+import org.apache.kyuubi.service.{AbstractBackendService,
AbstractFrontendService, Serverable, ServiceState, TempFileService}
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
@@ -202,6 +202,8 @@ class KyuubiServer(name: String) extends Serverable(name) {
throw new UnsupportedOperationException(s"Frontend protocol $other is
not supported yet.")
}
+ final var tempFileService: TempFileService = _
+
override def initialize(conf: KyuubiConf): Unit = synchronized {
val kinit = new KinitAuxiliaryService()
addService(kinit)
@@ -209,6 +211,9 @@ class KyuubiServer(name: String) extends Serverable(name) {
val periodicGCService = new PeriodicGCService
addService(periodicGCService)
+ tempFileService = new TempFileService
+ addService(tempFileService)
+
if (conf.get(MetricsConf.METRICS_ENABLED)) {
addService(new MetricsSystem)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index b5e98845e6..de69cf3771 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -44,6 +44,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys._
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo,
ApplicationState, KillResponse, KyuubiApplicationManager}
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation,
OperationState}
import org.apache.kyuubi.server.KyuubiServer
+import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.metadata.MetadataManager
@@ -568,6 +569,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
uploadFileFolderPath: JPath): Unit = {
try {
val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath,
fileName)
+ kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
request.setResource(tempFile.getPath)
} catch {
case e: Exception =>
@@ -599,10 +601,12 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
val tempFilePaths = fileParts.map { filePart =>
val fileName = filePart.getContentDisposition.getFileName
try {
- Utils.writeToTempFile(
+ val tempFile = Utils.writeToTempFile(
filePart.getValueAs(classOf[InputStream]),
uploadFileFolderPath,
- fileName).getPath
+ fileName)
+
kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
+ tempFile.getPath
} catch {
case e: Exception =>
throw new RuntimeException(
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index d0e8e042f7..6b4e2a2e7d 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -118,6 +118,8 @@ class KyuubiSessionImpl(
// we should call super.open before running launch engine operation
super.open()
+
sessionManager.tempFileService.addPathToExpiration(operationalLogRootDir.get)
+
runOperation(launchEngineOp)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 9edc8218eb..a20b4bc97c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -36,8 +36,10 @@ import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState}
import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader,
SessionConfAdvisor}
+import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
import org.apache.kyuubi.server.metadata.{MetadataManager,
MetadataRequestsRetryRef}
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
+import org.apache.kyuubi.service.TempFileService
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.sql.parser.server.KyuubiParser
import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
@@ -71,6 +73,8 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
private val engineConnectionAliveChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
+ def tempFileService: TempFileService = kyuubiServer.tempFileService
+
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
addService(applicationManager)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
new file mode 100644
index 0000000000..4b7568c1c7
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import java.io.ByteArrayInputStream
+import java.time.Duration
+
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.{Utils, WithKyuubiServer}
+import org.apache.kyuubi.Utils.writeToTempFile
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME
+
+class TempFileServiceSuite extends WithKyuubiServer {
+ private val expirationInMs = 100
+
+ override protected val conf: KyuubiConf = KyuubiConf()
+ .set(SERVER_TEMP_FILE_EXPIRE_TIME,
Duration.ofMillis(expirationInMs).toMillis)
+
+ test("file cleaned up after expiration") {
+ val tempFileService = KyuubiServer.kyuubiServer.tempFileService
+ (0 until 3).map { i =>
+ val dir = Utils.createTempDir()
+ writeToTempFile(new ByteArrayInputStream(s"$i".getBytes()), dir,
s"$i.txt")
+ dir.toFile
+ }.map { dirFile =>
+ assert(dirFile.exists())
+ tempFileService.addPathToExpiration(dirFile.toPath)
+ dirFile
+ }.foreach { f =>
+ eventually(Timeout((expirationInMs * 2).millis)) {
+ assert(!f.exists())
+ }
+ }
+ }
+}