This is an automated email from the ASF dual-hosted git repository.

lsm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8056235ec [KYUUBI #6696] Fix Trino Status Printer to Prevent Thread 
Leak
8056235ec is described below

commit 8056235ec150f24a539c831973e7f36fc8bb6c0b
Author: senmiaoliu <[email protected]>
AuthorDate: Thu Sep 19 11:02:20 2024 +0800

    [KYUUBI #6696] Fix Trino Status Printer to Prevent Thread Leak
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    ## Describe Your Solution ๐Ÿ”ง
    
    - use `newDaemonSingleThreadScheduledExecutor` avoid `timer` thread leak
    - reduce same status info out
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [ ] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6696 from lsm1/branch-fix-trino-printer.
    
    Closes #6696
    
    01f917cb7 [senmiaoliu] fix style
    0d20fd1f9 [senmiaoliu] fix trino info printer thread leak
    
    Authored-by: senmiaoliu <[email protected]>
    Signed-off-by: senmiaoliu <[email protected]>
---
 .../kyuubi/engine/trino/TrinoStatement.scala       | 29 ++++++++++++++--------
 .../kyuubi/engine/trino/TrinoStatusPrinter.scala   | 12 +++++++--
 .../engine/trino/operation/ExecuteStatement.scala  |  1 +
 3 files changed, 29 insertions(+), 13 deletions(-)

diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
index 51deee84a..b8d0bba31 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
@@ -17,8 +17,8 @@
 
 package org.apache.kyuubi.engine.trino
 
-import java.util.{Timer, TimerTask}
-import java.util.concurrent.Executors
+import java.util.OptionalDouble
+import java.util.concurrent.{Executors, TimeUnit}
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
@@ -38,6 +38,7 @@ import 
org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_DEBUG
 import org.apache.kyuubi.engine.trino.TrinoConf.DATA_PROCESSING_POOL_SIZE
 import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.ThreadUtils
 
 /**
  * Trino client communicate with trino cluster.
@@ -58,7 +59,9 @@ class TrinoStatement(
   private lazy val showProcess = kyuubiConf.get(ENGINE_TRINO_SHOW_PROGRESS)
   private lazy val showDebug = kyuubiConf.get(ENGINE_TRINO_SHOW_PROGRESS_DEBUG)
 
-  private lazy val timer = new Timer("refresh status info", true)
+  private val timer =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("Trino-Status-Printer", 
false)
+  private var lastStats: OptionalDouble = OptionalDouble.empty()
 
   implicit val ec: ExecutionContext =
     
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(dataProcessingPoolSize))
@@ -105,7 +108,7 @@ class TrinoStatement(
             getData()
           }
         } else {
-          timer.cancel()
+          timer.shutdown()
           Verify.verify(trino.isFinished)
           if (operationLog.isDefined && showProcess) {
             TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, 
showDebug)
@@ -153,18 +156,22 @@ class TrinoStatement(
   }
   def printStatusInfo(): Unit = {
     if (operationLog.isDefined && showProcess) {
-      timer.schedule(
-        new TimerTask {
-          override def run(): Unit = {
-            if (trino.isRunning) {
-              TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, 
showDebug)
-            }
+      timer.scheduleWithFixedDelay(
+        () => {
+          if (trino.isRunning) {
+            lastStats =
+              TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, 
showDebug, lastStats)
           }
         },
         500L,
-        kyuubiConf.get(KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_UPDATE_INTERVAL))
+        kyuubiConf.get(KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_UPDATE_INTERVAL),
+        TimeUnit.MILLISECONDS)
     }
   }
+
+  def stopPrinter(): Unit = {
+    timer.shutdown()
+  }
 }
 
 object TrinoStatement {
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala
index 2654f5413..70a4d88aa 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.engine.trino
 
+import java.util.OptionalDouble
 import java.util.concurrent.TimeUnit._
 
 import io.airlift.units.DataSize
@@ -35,7 +36,8 @@ object TrinoStatusPrinter {
   def printStatusInfo(
       client: StatementClient,
       operationLog: OperationLog,
-      debug: Boolean = false): Unit = {
+      debug: Boolean = false,
+      lastStats: OptionalDouble = null): OptionalDouble = {
     val out = new TrinoConsoleProgressBar(operationLog)
     val results =
       if (client.isRunning) {
@@ -46,11 +48,16 @@ object TrinoStatusPrinter {
 
     val stats = results.getStats
 
+    if (lastStats != null &&
+      stats.getProgressPercentage.equals(lastStats)) {
+      return lastStats
+    }
+
     val wallTime = Duration.succinctDuration(stats.getElapsedTimeMillis(), 
MILLISECONDS)
 
     val nodes = stats.getNodes
     if ((nodes == 0) || (stats.getTotalSplits == 0)) {
-      return
+      return stats.getProgressPercentage
     }
 
     // Query 12, FINISHED, 1 node
@@ -122,6 +129,7 @@ object TrinoStatusPrinter {
       s"[${formatCountRate(stats.getProcessedRows(), wallTime, false)} rows/s, 
" +
       s"${formatDataRate(DataSize.ofBytes(stats.getProcessedBytes()), 
wallTime, true)}]"
     out.printLine(statsLine)
+    stats.getProgressPercentage
   }
 
   def percentage(count: Double, total: Double): Int = {
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index 0ba2297c3..250b8d64b 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -127,6 +127,7 @@ class ExecuteStatement(
     } catch {
       onError(cancel = true)
     } finally {
+      trinoStatement.stopPrinter()
       shutdownTimeoutMonitor()
     }
   }

Reply via email to