This is an automated email from the ASF dual-hosted git repository.
lsm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 8056235ec [KYUUBI #6696] Fix Trino Status Printer to Prevent Thread
Leak
8056235ec is described below
commit 8056235ec150f24a539c831973e7f36fc8bb6c0b
Author: senmiaoliu <[email protected]>
AuthorDate: Thu Sep 19 11:02:20 2024 +0800
[KYUUBI #6696] Fix Trino Status Printer to Prevent Thread Leak
# :mag: Description
## Issue References ๐
## Describe Your Solution ๐ง
- use `newDaemonSingleThreadScheduledExecutor` avoid `timer` thread leak
- reduce same status info out
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6696 from lsm1/branch-fix-trino-printer.
Closes #6696
01f917cb7 [senmiaoliu] fix style
0d20fd1f9 [senmiaoliu] fix trino info printer thread leak
Authored-by: senmiaoliu <[email protected]>
Signed-off-by: senmiaoliu <[email protected]>
---
.../kyuubi/engine/trino/TrinoStatement.scala | 29 ++++++++++++++--------
.../kyuubi/engine/trino/TrinoStatusPrinter.scala | 12 +++++++--
.../engine/trino/operation/ExecuteStatement.scala | 1 +
3 files changed, 29 insertions(+), 13 deletions(-)
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
index 51deee84a..b8d0bba31 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
@@ -17,8 +17,8 @@
package org.apache.kyuubi.engine.trino
-import java.util.{Timer, TimerTask}
-import java.util.concurrent.Executors
+import java.util.OptionalDouble
+import java.util.concurrent.{Executors, TimeUnit}
import scala.annotation.tailrec
import scala.collection.JavaConverters._
@@ -38,6 +38,7 @@ import
org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS
import org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_DEBUG
import org.apache.kyuubi.engine.trino.TrinoConf.DATA_PROCESSING_POOL_SIZE
import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.ThreadUtils
/**
* Trino client communicate with trino cluster.
@@ -58,7 +59,9 @@ class TrinoStatement(
private lazy val showProcess = kyuubiConf.get(ENGINE_TRINO_SHOW_PROGRESS)
private lazy val showDebug = kyuubiConf.get(ENGINE_TRINO_SHOW_PROGRESS_DEBUG)
- private lazy val timer = new Timer("refresh status info", true)
+ private val timer =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("Trino-Status-Printer",
false)
+ private var lastStats: OptionalDouble = OptionalDouble.empty()
implicit val ec: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(dataProcessingPoolSize))
@@ -105,7 +108,7 @@ class TrinoStatement(
getData()
}
} else {
- timer.cancel()
+ timer.shutdown()
Verify.verify(trino.isFinished)
if (operationLog.isDefined && showProcess) {
TrinoStatusPrinter.printStatusInfo(trino, operationLog.get,
showDebug)
@@ -153,18 +156,22 @@ class TrinoStatement(
}
def printStatusInfo(): Unit = {
if (operationLog.isDefined && showProcess) {
- timer.schedule(
- new TimerTask {
- override def run(): Unit = {
- if (trino.isRunning) {
- TrinoStatusPrinter.printStatusInfo(trino, operationLog.get,
showDebug)
- }
+ timer.scheduleWithFixedDelay(
+ () => {
+ if (trino.isRunning) {
+ lastStats =
+ TrinoStatusPrinter.printStatusInfo(trino, operationLog.get,
showDebug, lastStats)
}
},
500L,
- kyuubiConf.get(KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_UPDATE_INTERVAL))
+ kyuubiConf.get(KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_UPDATE_INTERVAL),
+ TimeUnit.MILLISECONDS)
}
}
+
+ def stopPrinter(): Unit = {
+ timer.shutdown()
+ }
}
object TrinoStatement {
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala
index 2654f5413..70a4d88aa 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.trino
+import java.util.OptionalDouble
import java.util.concurrent.TimeUnit._
import io.airlift.units.DataSize
@@ -35,7 +36,8 @@ object TrinoStatusPrinter {
def printStatusInfo(
client: StatementClient,
operationLog: OperationLog,
- debug: Boolean = false): Unit = {
+ debug: Boolean = false,
+ lastStats: OptionalDouble = null): OptionalDouble = {
val out = new TrinoConsoleProgressBar(operationLog)
val results =
if (client.isRunning) {
@@ -46,11 +48,16 @@ object TrinoStatusPrinter {
val stats = results.getStats
+ if (lastStats != null &&
+ stats.getProgressPercentage.equals(lastStats)) {
+ return lastStats
+ }
+
val wallTime = Duration.succinctDuration(stats.getElapsedTimeMillis(),
MILLISECONDS)
val nodes = stats.getNodes
if ((nodes == 0) || (stats.getTotalSplits == 0)) {
- return
+ return stats.getProgressPercentage
}
// Query 12, FINISHED, 1 node
@@ -122,6 +129,7 @@ object TrinoStatusPrinter {
s"[${formatCountRate(stats.getProcessedRows(), wallTime, false)} rows/s,
" +
s"${formatDataRate(DataSize.ofBytes(stats.getProcessedBytes()),
wallTime, true)}]"
out.printLine(statsLine)
+ stats.getProgressPercentage
}
def percentage(count: Double, total: Double): Int = {
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index 0ba2297c3..250b8d64b 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -127,6 +127,7 @@ class ExecuteStatement(
} catch {
onError(cancel = true)
} finally {
+ trinoStatement.stopPrinter()
shutdownTimeoutMonitor()
}
}