This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7605900db4 [KYUUBI #7176] Cleanup metadata with batch size and interval
7605900db4 is described below

commit 7605900db40e343569997f232af81dafc30ad79f
Author: Wang, Fei <fwan...@ebay.com>
AuthorDate: Mon Sep 15 10:10:49 2025 -0700

    [KYUUBI #7176] Cleanup metadata with batch size and interval
    
    ### Why are the changes needed?
    
    Cleanup metadata with batch size and interval to prevent hold the lock for 
long time if the metadata size is huge.
    ### How was this patch tested?
    
    GA.
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #7176 from turboFei/clean_batch.
    
    Closes #7176
    
    b4d6a264c [Wang, Fei] fix ut
    61786af3b [Wang, Fei] Merge branch 'master' into clean_batch
    11170756f [Cheng Pan] Update 
kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
    a5ffc9da9 [Wang, Fei] log for max loops
    b301b527b [Wang, Fei] remove unused method
    d172a61da [Wang, Fei] address all comments
    e4da02369 [Wang, Fei] Save
    c94d25451 [Wang, Fei] saev
    852e624c8 [Wang, Fei] save
    
    Lead-authored-by: Wang, Fei <fwan...@ebay.com>
    Co-authored-by: Cheng Pan <pan3...@gmail.com>
    Signed-off-by: Wang, Fei <fwan...@ebay.com>
---
 docs/configuration/settings.md                     |  1 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 18 ++++++++++
 .../kyuubi/server/metadata/MetadataManager.scala   | 40 ++++++++++++++++++++--
 .../kyuubi/server/metadata/MetadataStore.scala     |  6 ++--
 .../server/metadata/jdbc/JDBCMetadataStore.scala   | 17 +++++----
 .../server/metadata/jdbc/JdbcDatabaseDialect.scala | 14 +++++++-
 .../server/metadata/MetadataManagerSuite.scala     | 18 +++++++++-
 .../metadata/jdbc/JDBCMetadataStoreSuite.scala     |  4 +--
 8 files changed, 104 insertions(+), 14 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 83485a5217..ab1966cde6 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -382,6 +382,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 
 |                       Key                       |                         
Default                          |                                              
                                                                                
                                                                                
                                                                                
           Meaning                                                              
                 [...]
 
|-------------------------------------------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| kyuubi.metadata.cleaner.batch.size              | 1000                       
                              | The batch size for cleaning expired metadata. 
This is used to avoid holding the delete lock for a long time when there are 
too many expired metadata to be cleaned.                                        
                                                                                
                                                                                
                   [...]
 | kyuubi.metadata.cleaner.enabled                 | true                       
                              | Whether to clean the metadata periodically. If 
it is enabled, Kyuubi will clean the metadata that is in the terminate state 
with max age limitation.                                                        
                                                                                
                                                                                
                  [...]
 | kyuubi.metadata.cleaner.interval                | PT30M                      
                              | The interval to check and clean expired 
metadata.                                                                       
                                                                                
                                                                                
                                                                                
                      [...]
 | kyuubi.metadata.max.age                         | PT72H                      
                              | The maximum age of metadata, the metadata 
exceeding the age will be cleaned.                                              
                                                                                
                                                                                
                                                                                
                    [...]
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 06c8e7a9d5..6586498655 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2077,6 +2077,24 @@ object KyuubiConf {
       .timeConf
       .createWithDefault(Duration.ofMinutes(30).toMillis)
 
+  val METADATA_CLEANER_BATCH_SIZE: ConfigEntry[Int] =
+    buildConf("kyuubi.metadata.cleaner.batch.size")
+      .serverOnly
+      .doc("The batch size for cleaning expired metadata. " +
+        "This is used to avoid holding the delete lock for a long time " +
+        "when there are too many expired metadata to be cleaned.")
+      .version("1.11.0")
+      .intConf
+      .createWithDefault(1000)
+
+  val METADATA_CLEANER_BATCH_INTERVAL: ConfigEntry[Long] =
+    buildConf("kyuubi.metadata.cleaner.batch.interval")
+      .serverOnly
+      .internal
+      .doc("The interval to wait before next batch of cleaning expired 
metadata.")
+      .timeConf
+      .createWithDefault(Duration.ofSeconds(3).toMillis)
+
   val METADATA_RECOVERY_THREADS: ConfigEntry[Int] =
     buildConf("kyuubi.metadata.recovery.threads")
       .serverOnly
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index 522ac32dd6..c5182979ee 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
 
+import com.google.common.annotations.VisibleForTesting
+
 import org.apache.kyuubi.{KyuubiException, Logging}
 import org.apache.kyuubi.client.api.v1.dto.Batch
 import org.apache.kyuubi.config.KyuubiConf
@@ -236,10 +238,11 @@ class MetadataManager extends 
AbstractService("MetadataManager") {
   private def startMetadataCleaner(): Unit = {
     val stateMaxAge = conf.get(METADATA_MAX_AGE)
     val interval = conf.get(KyuubiConf.METADATA_CLEANER_INTERVAL)
+    val batchSize = conf.get(KyuubiConf.METADATA_CLEANER_BATCH_SIZE)
+    val batchInterval = conf.get(KyuubiConf.METADATA_CLEANER_BATCH_INTERVAL)
     val cleanerTask: Runnable = () => {
       try {
-        
withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge(stateMaxAge))
-        
withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByAge(stateMaxAge))
+        cleanupMetadata(stateMaxAge, batchSize, batchInterval)
       } catch {
         case e: Throwable => error("Error cleaning up the metadata by age", e)
       }
@@ -253,6 +256,39 @@ class MetadataManager extends 
AbstractService("MetadataManager") {
       TimeUnit.MILLISECONDS)
   }
 
+  @VisibleForTesting
+  private[metadata] def cleanupMetadata(maxAge: Long, batchSize: Int, 
batchInterval: Long): Unit = {
+    var needToCleanMetadata = true
+    var needToCleanKubernetesInfo = true
+    var cleanupLoop = 0
+
+    val MAX_CLEANUP_LOOPS = 100 // a guard in case it runs into an infinite 
loop
+    while ((needToCleanMetadata || needToCleanKubernetesInfo) && cleanupLoop < 
MAX_CLEANUP_LOOPS) {
+      cleanupLoop += 1
+      if (needToCleanMetadata) {
+        needToCleanMetadata =
+          withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge(
+            maxAge,
+            batchSize)) >= batchSize
+      }
+      if (needToCleanKubernetesInfo) {
+        needToCleanKubernetesInfo =
+          
withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByAge(
+            maxAge,
+            batchSize)) >= batchSize
+      }
+      if (needToCleanMetadata || needToCleanKubernetesInfo) {
+        if (cleanupLoop < MAX_CLEANUP_LOOPS) {
+          info(s"Sleep $batchInterval ms before next batch of metadata 
cleaning.")
+          Thread.sleep(batchInterval)
+        } else {
+          warn(s"Metadata cleaning reaches the maximum loop 
$MAX_CLEANUP_LOOPS, " +
+            s"will continue in the next round.")
+        }
+      }
+    }
+  }
+
   def addMetadataRetryRequest(request: MetadataRequest): Unit = {
     val maxRequestsAsyncRetryRefs: Int =
       conf.get(KyuubiConf.METADATA_REQUEST_ASYNC_RETRY_QUEUE_SIZE)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
index 46a5505afe..492b286e3b 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
@@ -102,8 +102,9 @@ trait MetadataStore extends Closeable {
   /**
    * Check and cleanup the terminated batches information with maxAge 
limitation.
    * @param maxAge the batch state info maximum age.
+   * @param limit the maximum number of metadata to be cleaned up.
    */
-  def cleanupMetadataByAge(maxAge: Long): Unit
+  def cleanupMetadataByAge(maxAge: Long, limit: Int): Int
 
   /**
    * Cleanup kubernetes engine info by identifier.
@@ -113,6 +114,7 @@ trait MetadataStore extends Closeable {
   /**
    * Check and cleanup the kubernetes engine info with maxAge limitation.
    * @param maxAge the kubernetes engine info maximum age.
+   * @param limit the maximum number of kubernetes engine info to be cleaned 
up.
    */
-  def cleanupKubernetesEngineInfoByAge(maxAge: Long): Unit
+  def cleanupKubernetesEngineInfoByAge(maxAge: Long, limit: Int): Int
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
index f3c0b2d7de..62395fd644 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
@@ -409,13 +409,15 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
     }
   }
 
-  override def cleanupMetadataByAge(maxAge: Long): Unit = {
+  override def cleanupMetadataByAge(maxAge: Long, limit: Int): Int = {
     val minEndTime = System.currentTimeMillis() - maxAge
     val query =
-      s"DELETE FROM $METADATA_TABLE WHERE end_time > 0 AND end_time < ? AND 
create_time < ?"
+      s"DELETE FROM $METADATA_TABLE WHERE end_time > 0 AND end_time < ? AND 
create_time < ?" +
+        s" ${dialect.deleteFromLimitClause(limit)}"
     JdbcUtils.withConnection { connection =>
       withUpdateCount(connection, query, minEndTime, minEndTime) { count =>
-        info(s"Cleaned up $count records older than $maxAge ms from 
$METADATA_TABLE.")
+        info(s"Cleaned up $count records older than $maxAge ms from 
$METADATA_TABLE limit:$limit.")
+        count
       }
     }
   }
@@ -462,12 +464,15 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
     }
   }
 
-  override def cleanupKubernetesEngineInfoByAge(maxAge: Long): Unit = {
+  override def cleanupKubernetesEngineInfoByAge(maxAge: Long, limit: Int): Int 
= {
     val minUpdateTime = System.currentTimeMillis() - maxAge
-    val query = s"DELETE FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE update_time 
< ?"
+    val query = s"DELETE FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE update_time 
< ?" +
+      s" ${dialect.deleteFromLimitClause(limit)}"
     JdbcUtils.withConnection { connection =>
       withUpdateCount(connection, query, minUpdateTime) { count =>
-        info(s"Cleaned up $count records older than $maxAge ms from 
$KUBERNETES_ENGINE_INFO_TABLE.")
+        info(s"Cleaned up $count records older than $maxAge ms from 
$KUBERNETES_ENGINE_INFO_TABLE" +
+          s" limit $limit.")
+        count
       }
     }
   }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala
index 24408e1613..9d400a6f16 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala
@@ -17,8 +17,11 @@
 
 package org.apache.kyuubi.server.metadata.jdbc
 
+import org.apache.kyuubi.Logging
+
 trait JdbcDatabaseDialect {
   def limitClause(limit: Int, offset: Int): String
+  def deleteFromLimitClause(limit: Int): String
   def insertOrReplace(
       table: String,
       cols: Seq[String],
@@ -26,11 +29,16 @@ trait JdbcDatabaseDialect {
       keyVal: String): String
 }
 
-class GenericDatabaseDialect extends JdbcDatabaseDialect {
+class GenericDatabaseDialect extends JdbcDatabaseDialect with Logging {
   override def limitClause(limit: Int, offset: Int): String = {
     s"LIMIT $limit OFFSET $offset"
   }
 
+  override def deleteFromLimitClause(limit: Int): String = {
+    warn("Generic dialect does not support LIMIT in DELETE statements")
+    ""
+  }
+
   override def insertOrReplace(
       table: String,
       cols: Seq[String],
@@ -71,6 +79,10 @@ class MySQLDatabaseDialect extends GenericDatabaseDialect {
        |${cols.filterNot(_ == keyCol).map(c => s"$c = new.$c").mkString(",")}
        |""".stripMargin
   }
+
+  override def deleteFromLimitClause(limit: Int): String = {
+    s"LIMIT $limit"
+  }
 }
 class PostgreSQLDatabaseDialect extends GenericDatabaseDialect {
   override def insertOrReplace(
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
index fe7fa58685..de2e651f44 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
@@ -212,8 +212,24 @@ class MetadataManagerSuite extends KyuubiFunSuite {
       f(metadataManager)
     } finally {
       metadataManager.getBatches(MetadataFilter(), 0, Int.MaxValue).foreach { 
batch =>
-        metadataManager.cleanupMetadataById(batch.getId)
+        // close the batch if not ended
+        if (batch.getEndTime == 0) {
+          metadataManager.updateMetadata(
+            Metadata(
+              identifier = batch.getId,
+              state = OperationState.CLOSED.toString,
+              endTime = System.currentTimeMillis()),
+            false)
+        }
       }
+
+      metadataManager.cleanupMetadata(Int.MinValue, 1, 0)
+
+      // ensure all metadata are cleaned up
+      eventually(timeout(3.seconds), interval(200.milliseconds)) {
+        assert(metadataManager.getBatches(MetadataFilter(), 0, 
Int.MaxValue).isEmpty)
+      }
+
       // ensure no metadata request leak
       eventually(timeout(5.seconds), interval(200.milliseconds)) {
         
assert(MetricsSystem.counterValue(METADATA_REQUEST_OPENED).getOrElse(0L) === 0)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
index 77f4ca57d5..713739d5f5 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
@@ -45,7 +45,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
       batch =>
         jdbcMetadataStore.cleanupMetadataByIdentifier(batch.identifier)
     }
-    jdbcMetadataStore.cleanupKubernetesEngineInfoByAge(0)
+    jdbcMetadataStore.cleanupKubernetesEngineInfoByAge(0, Int.MaxValue)
     jdbcMetadataStore.close()
   }
 
@@ -242,7 +242,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
       Int.MaxValue).isEmpty)
 
     eventually(Timeout(3.seconds)) {
-      jdbcMetadataStore.cleanupMetadataByAge(1000)
+      jdbcMetadataStore.cleanupMetadataByAge(1000, Int.MaxValue)
       assert(jdbcMetadataStore.getMetadata(batchId) == null)
     }
   }

Reply via email to