This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new 160624d61 [KYUUBI #5328] Batch supports priority scheduling
160624d61 is described below
commit 160624d61d39bd499edcf48c50b57f201cb07afe
Author: zwangsheng <[email protected]>
AuthorDate: Mon Oct 16 22:43:02 2023 +0800
[KYUUBI #5328] Batch supports priority scheduling
### _Why are the changes needed?_
Follow #5329 and close #5328:
1. Add new config `kyuubi.metadata.store.jdbc.priority.enabled` to control
whether enable priority scheduling, due to users may experience performance
issues when using MySQL5.7 as metastore backend and enabling kyuubi batch v2
priority feature.
2. When priority scheduling is enabled, `KyuubiBatchService` picks metadata
job with `ORDER BY priority DESC, create_time ASC`.
3. Insert metadata with priority field, default priority value is `10`.
4. Add new config `kyuubi.batch.priority` for each batch priority.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No
Closes #5352 from zwangsheng/KYUUBI#5328.
Closes #5328
687ed1ed6 [Cheng Pan] Update
kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
58621b557 [zwangsheng] fix comments
1bf81e75c [zwangsheng] fix style
7ed2551b3 [zwangsheng] update default priority desc & improve UT
21ceccb01 [zwangsheng] fix doc
27fc5e825 [zwangsheng] enrich desc
c0bbc0dfd [zwangsheng] fix style
6b8d0f091 [zwangsheng] fix comment
67eb2524d [zwangsheng] fix comment
e1705c34d [zwangsheng] Add config to control whether pick order by priority
or not
129a46729 [zwangsheng] Add unit test for pickBatchForSubmitting
fcaf85d92 [zwangsheng] Fix unit test
f7ca2219e [zwangsheng] Fix unit test
8d4b276ff [wangsheng] fix code style
4c6b99090 [wangsheng] fix comments
654ad843a [zwangsheng] [KYUUBI #5328][V2] Kyuubi Server Pick Metadata job
with priority
Lead-authored-by: zwangsheng <[email protected]>
Co-authored-by: wangsheng <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit b24d94e74f3cc6648c09d0d44ead781b1b2e70b1)
Signed-off-by: Cheng Pan <[email protected]>
---
docs/configuration/settings.md | 1 +
.../apache/kyuubi/config/KyuubiReservedKeys.scala | 3 +
.../kyuubi/server/metadata/api/Metadata.scala | 3 +
.../server/metadata/jdbc/JDBCMetadataStore.scala | 12 ++--
.../metadata/jdbc/JDBCMetadataStoreConf.scala | 12 ++++
.../apache/kyuubi/session/KyuubiBatchSession.scala | 4 +-
.../kyuubi/session/KyuubiSessionManager.scala | 5 +-
.../server/metadata/MetadataManagerSuite.scala | 67 ++++++++++++++++++++--
8 files changed, 96 insertions(+), 11 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 5ab7ed90c..0dc4922f9 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -347,6 +347,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.metadata.store.jdbc.database.type | SQLITE
| The database type for server jdbc metadata
store.<ul> <li>(Deprecated) DERBY: Apache Derby, JDBC driver
`org.apache.derby.jdbc.AutoloadedDriver`.</li> <li>SQLITE: SQLite3, JDBC driver
`org.sqlite.JDBC`.</li> <li>MYSQL: MySQL, JDBC driver
`com.mysql.jdbc.Driver`.</li> <li>CUSTOM: User-defined database type, need to
specify corresponding JDBC driver.</li> Note that: The JDBC datas [...]
| kyuubi.metadata.store.jdbc.driver | <undefined>
| JDBC driver class name for server jdbc metadata
store.
[...]
| kyuubi.metadata.store.jdbc.password
|| The password for server JDBC metadata store.
[...]
+| kyuubi.metadata.store.jdbc.priority.enabled | false
| Whether to enable the priority scheduling for
batch impl v2. When false, ignore kyuubi.batch.priority and use the FIFO
ordering strategy for batch job scheduling. Note: this feature may cause
significant performance issues when using MySQL 5.7 as the metastore backend
due to the lack of support for mixed order index. See more details at KYUUBI
#5329. [...]
| kyuubi.metadata.store.jdbc.url |
jdbc:sqlite:kyuubi_state_store.db | The JDBC url for
server JDBC metadata store. By default, it is a SQLite database url, and the
state information is not shared across kyuubi instances. To enable high
availability for multiple kyuubi instances, please specify a production JDBC
url.
[...]
| kyuubi.metadata.store.jdbc.user
|| The username for server JDBC metadata store.
[...]
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index eb209caec..592425a4b 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -26,6 +26,9 @@ object KyuubiReservedKeys {
final val KYUUBI_SESSION_USER_SIGN = "kyuubi.session.user.sign"
final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
+ // default priority is 10, higher priority will be scheduled first
+ // when enabled metadata store priority feature
+ final val KYUUBI_BATCH_PRIORITY = "kyuubi.batch.priority"
final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY =
"kyuubi.batch.resource.uploaded"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
index 3e3d94828..0553cf90b 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
@@ -78,6 +78,9 @@ case class Metadata(
engineState: String = null,
engineError: Option[String] = None,
endTime: Long = 0L,
+ // keep consistent with table creation DDL
+ // find why we set 10 as default in KYUUBI #5329
+ priority: Int = 10,
peerInstanceClosed: Boolean = false) {
def appMgrInfo: ApplicationManagerInfo = {
ApplicationManagerInfo(
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
index dcb9c0f66..419fa8447 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
@@ -61,6 +61,8 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
case CUSTOM => new GenericDatabaseDialect
}
+ private val priorityEnabled = conf.get(METADATA_STORE_JDBC_PRIORITY_ENABLED)
+
private val datasourceProperties =
JDBCMetadataStoreConf.getMetadataStoreJDBCDataSourceProperties(conf)
private val hikariConfig = new HikariConfig(datasourceProperties)
@@ -167,9 +169,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
|request_args,
|create_time,
|engine_type,
- |cluster_manager
+ |cluster_manager,
+ |priority
|)
- |VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ |VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|""".stripMargin
JdbcUtils.withConnection { connection =>
@@ -190,7 +193,8 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
valueAsString(metadata.requestArgs),
metadata.createTime,
Option(metadata.engineType).map(_.toUpperCase(Locale.ROOT)).orNull,
- metadata.clusterManager.orNull)
+ metadata.clusterManager.orNull,
+ metadata.priority)
}
}
@@ -198,7 +202,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
JdbcUtils.executeQueryWithRowMapper(
s"""SELECT identifier FROM $METADATA_TABLE
|WHERE state=?
- |ORDER BY create_time ASC LIMIT 1
+ |ORDER BY ${if (priorityEnabled) "priority DESC, " else
""}create_time ASC LIMIT 1
|""".stripMargin) { stmt =>
stmt.setString(1, OperationState.INITIALIZED.toString)
} { resultSet =>
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala
index dd5d74138..292cf4174 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala
@@ -93,4 +93,16 @@ object JDBCMetadataStoreConf {
.serverOnly
.stringConf
.createWithDefault("")
+
+ val METADATA_STORE_JDBC_PRIORITY_ENABLED: ConfigEntry[Boolean] =
+ buildConf("kyuubi.metadata.store.jdbc.priority.enabled")
+ .doc("Whether to enable the priority scheduling for batch impl v2. " +
+ "When false, ignore kyuubi.batch.priority and use the FIFO ordering
strategy " +
+ "for batch job scheduling. Note: this feature may cause significant
performance issues " +
+ "when using MySQL 5.7 as the metastore backend due to the lack of
support " +
+ "for mixed order index. See more details at KYUUBI #5329.")
+ .version("1.8.0")
+ .serverOnly
+ .booleanConf
+ .createWithDefault(false)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index 8e4c5137f..8489e6d30 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -23,6 +23,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_BATCH_PRIORITY
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
@@ -181,7 +182,8 @@ class KyuubiBatchSession(
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,
- clusterManager = batchJobSubmissionOp.builder.clusterManager())
+ clusterManager = batchJobSubmissionOp.builder.clusterManager(),
+ priority =
conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))
// there is a chance that operation failed w/ duplicated key error
sessionManager.insertMetadata(newMetadata)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 8d3234699..02a3ee32c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.client.api.v1.dto.{Batch,
BatchRequest}
import org.apache.kyuubi.client.util.BatchUtils.KYUUBI_BATCH_ID_KEY
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_REAL_USER_KEY
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_BATCH_PRIORITY,
KYUUBI_SESSION_REAL_USER_KEY}
import org.apache.kyuubi.credentials.HadoopCredentialsManager
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.metrics.MetricsConstants._
@@ -237,7 +237,8 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
requestConf = conf,
requestArgs = batchRequest.getArgs.asScala.toSeq,
createTime = System.currentTimeMillis(),
- engineType = batchRequest.getBatchType)
+ engineType = batchRequest.getBatchType,
+ priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))
// there is a chance that operation failed w/ duplicated key error
metadataManager.foreach(_.insertMetadata(metadata, asyncRetryOnError =
false))
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
index 564b5ebe9..fe7fa5868 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
@@ -28,7 +28,9 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.metrics.MetricsConstants._
+import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
+import
org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf.METADATA_STORE_JDBC_PRIORITY_ENABLED
import org.apache.kyuubi.session.SessionType
class MetadataManagerSuite extends KyuubiFunSuite {
@@ -142,6 +144,58 @@ class MetadataManagerSuite extends KyuubiFunSuite {
}
}
+ test("[KYUUBI #5328] Test MetadataManager#pickBatchForSubmitting in order") {
+ // build mock batch jobs
+ val mockKyuubiInstance = "mock_kyuubi_instance"
+ val time = System.currentTimeMillis()
+ val mockBatchJob1 = newMetadata(
+ identifier = "mock_batch_job_1",
+ state = OperationState.INITIALIZED.toString,
+ createTime = time + 10000,
+ // larger than default priority 10
+ priority = 20)
+ val mockBatchJob2 = newMetadata(
+ identifier = "mock_batch_job_2",
+ state = OperationState.INITIALIZED.toString,
+ createTime = time)
+ val mockBatchJob3 = newMetadata(
+ identifier = "mock_batch_job_3",
+ state = OperationState.INITIALIZED.toString,
+ createTime = time + 5000)
+
+ withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key ->
"true")) {
+ metadataManager =>
+ metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError =
false)
+ metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError =
false)
+ metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError =
false)
+
+ // pick the highest priority batch job
+ val metadata1 =
metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
+ assert(metadata1.exists(m => m.identifier === "mock_batch_job_1"))
+
+ // pick the oldest batch job when same priority
+ val metadata2 =
metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
+ assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))
+
+ val metadata3 =
metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
+ assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
+ }
+
+ withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key ->
"false")) {
+ metadataManager =>
+ metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError =
false)
+ metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError =
false)
+ metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError =
false)
+
+ // pick the oldest batch job
+ val metadata2 =
metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
+ assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))
+
+ val metadata3 =
metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
+ assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
+ }
+ }
+
private def withMetadataManager(
confOverlay: Map[String, String],
newMetadataMgr: () => MetadataManager = () => new MetadataManager())(
@@ -169,22 +223,27 @@ class MetadataManagerSuite extends KyuubiFunSuite {
}
}
- private def newMetadata(): Metadata = {
+ private def newMetadata(
+ identifier: String = UUID.randomUUID().toString,
+ state: String = OperationState.PENDING.toString,
+ createTime: Long = System.currentTimeMillis(),
+ priority: Int = 10): Metadata = {
Metadata(
- identifier = UUID.randomUUID().toString,
+ identifier = identifier,
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
ipAddress = "127.0.0.1",
kyuubiInstance = "localhost:10009",
- state = "PENDING",
+ state = state,
resource = "intern",
className = "org.apache.kyuubi.SparkWC",
requestName = "kyuubi_batch",
requestConf = Map("spark.master" -> "local"),
requestArgs = Seq("100"),
- createTime = System.currentTimeMillis(),
+ createTime = createTime,
engineType = "spark",
+ priority = priority,
clusterManager = Some("local"))
}
}