This is an automated email from the ASF dual-hosted git repository. feiwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push: new 7605900db4 [KYUUBI #7176] Cleanup metadata with batch size and interval 7605900db4 is described below commit 7605900db40e343569997f232af81dafc30ad79f Author: Wang, Fei <fwan...@ebay.com> AuthorDate: Mon Sep 15 10:10:49 2025 -0700 [KYUUBI #7176] Cleanup metadata with batch size and interval ### Why are the changes needed? Cleanup metadata with batch size and interval to prevent hold the lock for long time if the metadata size is huge. ### How was this patch tested? GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7176 from turboFei/clean_batch. Closes #7176 b4d6a264c [Wang, Fei] fix ut 61786af3b [Wang, Fei] Merge branch 'master' into clean_batch 11170756f [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala a5ffc9da9 [Wang, Fei] log for max loops b301b527b [Wang, Fei] remove unused method d172a61da [Wang, Fei] address all comments e4da02369 [Wang, Fei] Save c94d25451 [Wang, Fei] saev 852e624c8 [Wang, Fei] save Lead-authored-by: Wang, Fei <fwan...@ebay.com> Co-authored-by: Cheng Pan <pan3...@gmail.com> Signed-off-by: Wang, Fei <fwan...@ebay.com> --- docs/configuration/settings.md | 1 + .../org/apache/kyuubi/config/KyuubiConf.scala | 18 ++++++++++ .../kyuubi/server/metadata/MetadataManager.scala | 40 ++++++++++++++++++++-- .../kyuubi/server/metadata/MetadataStore.scala | 6 ++-- .../server/metadata/jdbc/JDBCMetadataStore.scala | 17 +++++---- .../server/metadata/jdbc/JdbcDatabaseDialect.scala | 14 +++++++- .../server/metadata/MetadataManagerSuite.scala | 18 +++++++++- .../metadata/jdbc/JDBCMetadataStoreSuite.scala | 4 +-- 8 files changed, 104 insertions(+), 14 deletions(-) diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 83485a5217..ab1966cde6 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -382,6 +382,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | Key | Default | Meaning [...] |-------------------------------------------------|----------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +| kyuubi.metadata.cleaner.batch.size | 1000 | The batch size for cleaning expired metadata. This is used to avoid holding the delete lock for a long time when there are too many expired metadata to be cleaned. [...] | kyuubi.metadata.cleaner.enabled | true | Whether to clean the metadata periodically. If it is enabled, Kyuubi will clean the metadata that is in the terminate state with max age limitation. [...] | kyuubi.metadata.cleaner.interval | PT30M | The interval to check and clean expired metadata. [...] | kyuubi.metadata.max.age | PT72H | The maximum age of metadata, the metadata exceeding the age will be cleaned. [...] diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 06c8e7a9d5..6586498655 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2077,6 +2077,24 @@ object KyuubiConf { .timeConf .createWithDefault(Duration.ofMinutes(30).toMillis) + val METADATA_CLEANER_BATCH_SIZE: ConfigEntry[Int] = + buildConf("kyuubi.metadata.cleaner.batch.size") + .serverOnly + .doc("The batch size for cleaning expired metadata. " + + "This is used to avoid holding the delete lock for a long time " + + "when there are too many expired metadata to be cleaned.") + .version("1.11.0") + .intConf + .createWithDefault(1000) + + val METADATA_CLEANER_BATCH_INTERVAL: ConfigEntry[Long] = + buildConf("kyuubi.metadata.cleaner.batch.interval") + .serverOnly + .internal + .doc("The interval to wait before next batch of cleaning expired metadata.") + .timeConf + .createWithDefault(Duration.ofSeconds(3).toMillis) + val METADATA_RECOVERY_THREADS: ConfigEntry[Int] = buildConf("kyuubi.metadata.recovery.threads") .serverOnly diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala index 522ac32dd6..c5182979ee 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala @@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ +import com.google.common.annotations.VisibleForTesting + import org.apache.kyuubi.{KyuubiException, Logging} import org.apache.kyuubi.client.api.v1.dto.Batch import org.apache.kyuubi.config.KyuubiConf @@ -236,10 +238,11 @@ class MetadataManager extends AbstractService("MetadataManager") { private def startMetadataCleaner(): Unit = { val stateMaxAge = conf.get(METADATA_MAX_AGE) val interval = conf.get(KyuubiConf.METADATA_CLEANER_INTERVAL) + val batchSize = conf.get(KyuubiConf.METADATA_CLEANER_BATCH_SIZE) + val batchInterval = conf.get(KyuubiConf.METADATA_CLEANER_BATCH_INTERVAL) val cleanerTask: Runnable = () => { try { - withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge(stateMaxAge)) - withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByAge(stateMaxAge)) + cleanupMetadata(stateMaxAge, batchSize, batchInterval) } catch { case e: Throwable => error("Error cleaning up the metadata by age", e) } @@ -253,6 +256,39 @@ class MetadataManager extends AbstractService("MetadataManager") { TimeUnit.MILLISECONDS) } + @VisibleForTesting + private[metadata] def cleanupMetadata(maxAge: Long, batchSize: Int, batchInterval: Long): Unit = { + var needToCleanMetadata = true + var needToCleanKubernetesInfo = true + var cleanupLoop = 0 + + val MAX_CLEANUP_LOOPS = 100 // a guard in case it runs into an infinite loop + while ((needToCleanMetadata || needToCleanKubernetesInfo) && cleanupLoop < MAX_CLEANUP_LOOPS) { + cleanupLoop += 1 + if (needToCleanMetadata) { + needToCleanMetadata = + withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge( + maxAge, + batchSize)) >= batchSize + } + if (needToCleanKubernetesInfo) { + needToCleanKubernetesInfo = + withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByAge( + maxAge, + batchSize)) >= batchSize + } + if (needToCleanMetadata || needToCleanKubernetesInfo) { + if (cleanupLoop < MAX_CLEANUP_LOOPS) { + info(s"Sleep $batchInterval ms before next batch of metadata cleaning.") + Thread.sleep(batchInterval) + } else { + warn(s"Metadata cleaning reaches the maximum loop $MAX_CLEANUP_LOOPS, " + + s"will continue in the next round.") + } + } + } + } + def addMetadataRetryRequest(request: MetadataRequest): Unit = { val maxRequestsAsyncRetryRefs: Int = conf.get(KyuubiConf.METADATA_REQUEST_ASYNC_RETRY_QUEUE_SIZE) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala index 46a5505afe..492b286e3b 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala @@ -102,8 +102,9 @@ trait MetadataStore extends Closeable { /** * Check and cleanup the terminated batches information with maxAge limitation. * @param maxAge the batch state info maximum age. + * @param limit the maximum number of metadata to be cleaned up. */ - def cleanupMetadataByAge(maxAge: Long): Unit + def cleanupMetadataByAge(maxAge: Long, limit: Int): Int /** * Cleanup kubernetes engine info by identifier. @@ -113,6 +114,7 @@ trait MetadataStore extends Closeable { /** * Check and cleanup the kubernetes engine info with maxAge limitation. * @param maxAge the kubernetes engine info maximum age. + * @param limit the maximum number of kubernetes engine info to be cleaned up. */ - def cleanupKubernetesEngineInfoByAge(maxAge: Long): Unit + def cleanupKubernetesEngineInfoByAge(maxAge: Long, limit: Int): Int } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index f3c0b2d7de..62395fd644 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -409,13 +409,15 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { } } - override def cleanupMetadataByAge(maxAge: Long): Unit = { + override def cleanupMetadataByAge(maxAge: Long, limit: Int): Int = { val minEndTime = System.currentTimeMillis() - maxAge val query = - s"DELETE FROM $METADATA_TABLE WHERE end_time > 0 AND end_time < ? AND create_time < ?" + s"DELETE FROM $METADATA_TABLE WHERE end_time > 0 AND end_time < ? AND create_time < ?" + + s" ${dialect.deleteFromLimitClause(limit)}" JdbcUtils.withConnection { connection => withUpdateCount(connection, query, minEndTime, minEndTime) { count => - info(s"Cleaned up $count records older than $maxAge ms from $METADATA_TABLE.") + info(s"Cleaned up $count records older than $maxAge ms from $METADATA_TABLE limit:$limit.") + count } } } @@ -462,12 +464,15 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { } } - override def cleanupKubernetesEngineInfoByAge(maxAge: Long): Unit = { + override def cleanupKubernetesEngineInfoByAge(maxAge: Long, limit: Int): Int = { val minUpdateTime = System.currentTimeMillis() - maxAge - val query = s"DELETE FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE update_time < ?" + val query = s"DELETE FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE update_time < ?" + + s" ${dialect.deleteFromLimitClause(limit)}" JdbcUtils.withConnection { connection => withUpdateCount(connection, query, minUpdateTime) { count => - info(s"Cleaned up $count records older than $maxAge ms from $KUBERNETES_ENGINE_INFO_TABLE.") + info(s"Cleaned up $count records older than $maxAge ms from $KUBERNETES_ENGINE_INFO_TABLE" + + s" limit $limit.") + count } } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala index 24408e1613..9d400a6f16 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala @@ -17,8 +17,11 @@ package org.apache.kyuubi.server.metadata.jdbc +import org.apache.kyuubi.Logging + trait JdbcDatabaseDialect { def limitClause(limit: Int, offset: Int): String + def deleteFromLimitClause(limit: Int): String def insertOrReplace( table: String, cols: Seq[String], @@ -26,11 +29,16 @@ trait JdbcDatabaseDialect { keyVal: String): String } -class GenericDatabaseDialect extends JdbcDatabaseDialect { +class GenericDatabaseDialect extends JdbcDatabaseDialect with Logging { override def limitClause(limit: Int, offset: Int): String = { s"LIMIT $limit OFFSET $offset" } + override def deleteFromLimitClause(limit: Int): String = { + warn("Generic dialect does not support LIMIT in DELETE statements") + "" + } + override def insertOrReplace( table: String, cols: Seq[String], @@ -71,6 +79,10 @@ class MySQLDatabaseDialect extends GenericDatabaseDialect { |${cols.filterNot(_ == keyCol).map(c => s"$c = new.$c").mkString(",")} |""".stripMargin } + + override def deleteFromLimitClause(limit: Int): String = { + s"LIMIT $limit" + } } class PostgreSQLDatabaseDialect extends GenericDatabaseDialect { override def insertOrReplace( diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala index fe7fa58685..de2e651f44 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala @@ -212,8 +212,24 @@ class MetadataManagerSuite extends KyuubiFunSuite { f(metadataManager) } finally { metadataManager.getBatches(MetadataFilter(), 0, Int.MaxValue).foreach { batch => - metadataManager.cleanupMetadataById(batch.getId) + // close the batch if not ended + if (batch.getEndTime == 0) { + metadataManager.updateMetadata( + Metadata( + identifier = batch.getId, + state = OperationState.CLOSED.toString, + endTime = System.currentTimeMillis()), + false) + } } + + metadataManager.cleanupMetadata(Int.MinValue, 1, 0) + + // ensure all metadata are cleaned up + eventually(timeout(3.seconds), interval(200.milliseconds)) { + assert(metadataManager.getBatches(MetadataFilter(), 0, Int.MaxValue).isEmpty) + } + // ensure no metadata request leak eventually(timeout(5.seconds), interval(200.milliseconds)) { assert(MetricsSystem.counterValue(METADATA_REQUEST_OPENED).getOrElse(0L) === 0) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala index 77f4ca57d5..713739d5f5 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala @@ -45,7 +45,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { batch => jdbcMetadataStore.cleanupMetadataByIdentifier(batch.identifier) } - jdbcMetadataStore.cleanupKubernetesEngineInfoByAge(0) + jdbcMetadataStore.cleanupKubernetesEngineInfoByAge(0, Int.MaxValue) jdbcMetadataStore.close() } @@ -242,7 +242,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { Int.MaxValue).isEmpty) eventually(Timeout(3.seconds)) { - jdbcMetadataStore.cleanupMetadataByAge(1000) + jdbcMetadataStore.cleanupMetadataByAge(1000, Int.MaxValue) assert(jdbcMetadataStore.getMetadata(batchId) == null) } }