This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 160624d61 [KYUUBI #5328] Batch supports priority scheduling
160624d61 is described below

commit 160624d61d39bd499edcf48c50b57f201cb07afe
Author: zwangsheng <[email protected]>
AuthorDate: Mon Oct 16 22:43:02 2023 +0800

    [KYUUBI #5328] Batch supports priority scheduling
    
    ### _Why are the changes needed?_
    
    Follow #5329 and close #5328:
    
    1. Add new config `kyuubi.metadata.store.jdbc.priority.enabled` to control 
whether enable priority scheduling, due to users may experience performance 
issues when using MySQL5.7 as metastore backend and enabling kyuubi batch v2 
priority feature.
    2. When priority scheduling is enabled, `KyuubiBatchService` picks metadata 
job with `ORDER BY priority DESC, create_time ASC`.
    3. Insert metadata with priority field, default priority value is `10`.
    4. Add new config `kyuubi.batch.priority` for each batch priority.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No
    
    Closes #5352 from zwangsheng/KYUUBI#5328.
    
    Closes #5328
    
    687ed1ed6 [Cheng Pan] Update 
kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
    58621b557 [zwangsheng] fix comments
    1bf81e75c [zwangsheng] fix style
    7ed2551b3 [zwangsheng] update default priority desc & improve UT
    21ceccb01 [zwangsheng] fix doc
    27fc5e825 [zwangsheng] enrich desc
    c0bbc0dfd [zwangsheng] fix style
    6b8d0f091 [zwangsheng] fix comment
    67eb2524d [zwangsheng] fix comment
    e1705c34d [zwangsheng] Add config to control whether pick order by priority 
or not
    129a46729 [zwangsheng] Add unit test for pickBatchForSubmitting
    fcaf85d92 [zwangsheng] Fix unit test
    f7ca2219e [zwangsheng] Fix unit test
    8d4b276ff [wangsheng] fix code style
    4c6b99090 [wangsheng] fix comments
    654ad843a [zwangsheng] [KYUUBI #5328][V2] Kyuubi Server Pick Metadata job 
with priority
    
    Lead-authored-by: zwangsheng <[email protected]>
    Co-authored-by: wangsheng <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit b24d94e74f3cc6648c09d0d44ead781b1b2e70b1)
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/configuration/settings.md                     |  1 +
 .../apache/kyuubi/config/KyuubiReservedKeys.scala  |  3 +
 .../kyuubi/server/metadata/api/Metadata.scala      |  3 +
 .../server/metadata/jdbc/JDBCMetadataStore.scala   | 12 ++--
 .../metadata/jdbc/JDBCMetadataStoreConf.scala      | 12 ++++
 .../apache/kyuubi/session/KyuubiBatchSession.scala |  4 +-
 .../kyuubi/session/KyuubiSessionManager.scala      |  5 +-
 .../server/metadata/MetadataManagerSuite.scala     | 67 ++++++++++++++++++++--
 8 files changed, 96 insertions(+), 11 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 5ab7ed90c..0dc4922f9 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -347,6 +347,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.metadata.store.jdbc.database.type        | SQLITE                     
                              | The database type for server jdbc metadata 
store.<ul> <li>(Deprecated) DERBY: Apache Derby, JDBC driver 
`org.apache.derby.jdbc.AutoloadedDriver`.</li> <li>SQLITE: SQLite3, JDBC driver 
`org.sqlite.JDBC`.</li> <li>MYSQL: MySQL, JDBC driver 
`com.mysql.jdbc.Driver`.</li> <li>CUSTOM: User-defined database type, need to 
specify corresponding JDBC driver.</li> Note that: The JDBC datas [...]
 | kyuubi.metadata.store.jdbc.driver               | &lt;undefined&gt;          
                              | JDBC driver class name for server jdbc metadata 
store.                                                                          
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.metadata.store.jdbc.password                                          
                             || The password for server JDBC metadata store.    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| kyuubi.metadata.store.jdbc.priority.enabled     | false                      
                              | Whether to enable the priority scheduling for 
batch impl v2. When false, ignore kyuubi.batch.priority and use the FIFO 
ordering strategy for batch job scheduling. Note: this feature may cause 
significant performance issues when using MySQL 5.7 as the metastore backend 
due to the lack of support for mixed order index. See more details at KYUUBI 
#5329.                              [...]
 | kyuubi.metadata.store.jdbc.url                  | 
jdbc:sqlite:kyuubi_state_store.db                        | The JDBC url for 
server JDBC metadata store. By default, it is a SQLite database url, and the 
state information is not shared across kyuubi instances. To enable high 
availability for multiple kyuubi instances, please specify a production JDBC 
url.                                                                            
                                                           [...]
 | kyuubi.metadata.store.jdbc.user                                              
                             || The username for server JDBC metadata store.    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index eb209caec..592425a4b 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -26,6 +26,9 @@ object KyuubiReservedKeys {
   final val KYUUBI_SESSION_USER_SIGN = "kyuubi.session.user.sign"
   final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
   final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
+  // default priority is 10, higher priority will be scheduled first
+  // when enabled metadata store priority feature
+  final val KYUUBI_BATCH_PRIORITY = "kyuubi.batch.priority"
   final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY = 
"kyuubi.batch.resource.uploaded"
   final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
   final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
index 3e3d94828..0553cf90b 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
@@ -78,6 +78,9 @@ case class Metadata(
     engineState: String = null,
     engineError: Option[String] = None,
     endTime: Long = 0L,
+    // keep consistent with table creation DDL
+    // find why we set 10 as default in KYUUBI #5329
+    priority: Int = 10,
     peerInstanceClosed: Boolean = false) {
   def appMgrInfo: ApplicationManagerInfo = {
     ApplicationManagerInfo(
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
index dcb9c0f66..419fa8447 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
@@ -61,6 +61,8 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
     case CUSTOM => new GenericDatabaseDialect
   }
 
+  private val priorityEnabled = conf.get(METADATA_STORE_JDBC_PRIORITY_ENABLED)
+
   private val datasourceProperties =
     JDBCMetadataStoreConf.getMetadataStoreJDBCDataSourceProperties(conf)
   private val hikariConfig = new HikariConfig(datasourceProperties)
@@ -167,9 +169,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
          |request_args,
          |create_time,
          |engine_type,
-         |cluster_manager
+         |cluster_manager,
+         |priority
          |)
-         |VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+         |VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
          |""".stripMargin
 
     JdbcUtils.withConnection { connection =>
@@ -190,7 +193,8 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
         valueAsString(metadata.requestArgs),
         metadata.createTime,
         Option(metadata.engineType).map(_.toUpperCase(Locale.ROOT)).orNull,
-        metadata.clusterManager.orNull)
+        metadata.clusterManager.orNull,
+        metadata.priority)
     }
   }
 
@@ -198,7 +202,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
     JdbcUtils.executeQueryWithRowMapper(
       s"""SELECT identifier FROM $METADATA_TABLE
          |WHERE state=?
-         |ORDER BY create_time ASC LIMIT 1
+         |ORDER BY ${if (priorityEnabled) "priority DESC, " else 
""}create_time ASC LIMIT 1
          |""".stripMargin) { stmt =>
       stmt.setString(1, OperationState.INITIALIZED.toString)
     } { resultSet =>
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala
index dd5d74138..292cf4174 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala
@@ -93,4 +93,16 @@ object JDBCMetadataStoreConf {
       .serverOnly
       .stringConf
       .createWithDefault("")
+
+  val METADATA_STORE_JDBC_PRIORITY_ENABLED: ConfigEntry[Boolean] =
+    buildConf("kyuubi.metadata.store.jdbc.priority.enabled")
+      .doc("Whether to enable the priority scheduling for batch impl v2. " +
+        "When false, ignore kyuubi.batch.priority and use the FIFO ordering 
strategy " +
+        "for batch job scheduling. Note: this feature may cause significant 
performance issues " +
+        "when using MySQL 5.7 as the metastore backend due to the lack of 
support " +
+        "for mixed order index. See more details at KYUUBI #5329.")
+      .version("1.8.0")
+      .serverOnly
+      .booleanConf
+      .createWithDefault(false)
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index 8e4c5137f..8489e6d30 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -23,6 +23,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_BATCH_PRIORITY
 import org.apache.kyuubi.engine.KyuubiApplicationManager
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
 import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
@@ -181,7 +182,8 @@ class KyuubiBatchSession(
           requestArgs = batchArgs,
           createTime = createTime,
           engineType = batchType,
-          clusterManager = batchJobSubmissionOp.builder.clusterManager())
+          clusterManager = batchJobSubmissionOp.builder.clusterManager(),
+          priority = 
conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))
 
         // there is a chance that operation failed w/ duplicated key error
         sessionManager.insertMetadata(newMetadata)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 8d3234699..02a3ee32c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.client.api.v1.dto.{Batch, 
BatchRequest}
 import org.apache.kyuubi.client.util.BatchUtils.KYUUBI_BATCH_ID_KEY
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_REAL_USER_KEY
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_BATCH_PRIORITY, 
KYUUBI_SESSION_REAL_USER_KEY}
 import org.apache.kyuubi.credentials.HadoopCredentialsManager
 import org.apache.kyuubi.engine.KyuubiApplicationManager
 import org.apache.kyuubi.metrics.MetricsConstants._
@@ -237,7 +237,8 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       requestConf = conf,
       requestArgs = batchRequest.getArgs.asScala.toSeq,
       createTime = System.currentTimeMillis(),
-      engineType = batchRequest.getBatchType)
+      engineType = batchRequest.getBatchType,
+      priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))
 
     // there is a chance that operation failed w/ duplicated key error
     metadataManager.foreach(_.insertMetadata(metadata, asyncRetryOnError = 
false))
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
index 564b5ebe9..fe7fa5868 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
@@ -28,7 +28,9 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import org.apache.kyuubi.metrics.MetricsConstants._
+import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
+import 
org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf.METADATA_STORE_JDBC_PRIORITY_ENABLED
 import org.apache.kyuubi.session.SessionType
 
 class MetadataManagerSuite extends KyuubiFunSuite {
@@ -142,6 +144,58 @@ class MetadataManagerSuite extends KyuubiFunSuite {
     }
   }
 
+  test("[KYUUBI #5328] Test MetadataManager#pickBatchForSubmitting in order") {
+    // build mock batch jobs
+    val mockKyuubiInstance = "mock_kyuubi_instance"
+    val time = System.currentTimeMillis()
+    val mockBatchJob1 = newMetadata(
+      identifier = "mock_batch_job_1",
+      state = OperationState.INITIALIZED.toString,
+      createTime = time + 10000,
+      // larger than default priority 10
+      priority = 20)
+    val mockBatchJob2 = newMetadata(
+      identifier = "mock_batch_job_2",
+      state = OperationState.INITIALIZED.toString,
+      createTime = time)
+    val mockBatchJob3 = newMetadata(
+      identifier = "mock_batch_job_3",
+      state = OperationState.INITIALIZED.toString,
+      createTime = time + 5000)
+
+    withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> 
"true")) {
+      metadataManager =>
+        metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = 
false)
+        metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = 
false)
+        metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = 
false)
+
+        // pick the highest priority batch job
+        val metadata1 = 
metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
+        assert(metadata1.exists(m => m.identifier === "mock_batch_job_1"))
+
+        // pick the oldest batch job when same priority
+        val metadata2 = 
metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
+        assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))
+
+        val metadata3 = 
metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
+        assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
+    }
+
+    withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> 
"false")) {
+      metadataManager =>
+        metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = 
false)
+        metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = 
false)
+        metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = 
false)
+
+        // pick the oldest batch job
+        val metadata2 = 
metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
+        assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))
+
+        val metadata3 = 
metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
+        assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
+    }
+  }
+
   private def withMetadataManager(
       confOverlay: Map[String, String],
       newMetadataMgr: () => MetadataManager = () => new MetadataManager())(
@@ -169,22 +223,27 @@ class MetadataManagerSuite extends KyuubiFunSuite {
     }
   }
 
-  private def newMetadata(): Metadata = {
+  private def newMetadata(
+      identifier: String = UUID.randomUUID().toString,
+      state: String = OperationState.PENDING.toString,
+      createTime: Long = System.currentTimeMillis(),
+      priority: Int = 10): Metadata = {
     Metadata(
-      identifier = UUID.randomUUID().toString,
+      identifier = identifier,
       sessionType = SessionType.BATCH,
       realUser = "kyuubi",
       username = "kyuubi",
       ipAddress = "127.0.0.1",
       kyuubiInstance = "localhost:10009",
-      state = "PENDING",
+      state = state,
       resource = "intern",
       className = "org.apache.kyuubi.SparkWC",
       requestName = "kyuubi_batch",
       requestConf = Map("spark.master" -> "local"),
       requestArgs = Seq("100"),
-      createTime = System.currentTimeMillis(),
+      createTime = createTime,
       engineType = "spark",
+      priority = priority,
       clusterManager = Some("local"))
   }
 }

Reply via email to