This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 56cbd582c [KYUUBI #5199] Read all columns of metadata to prevent
column missing
56cbd582c is described below
commit 56cbd582c4ba86e9558957203527c50d2fe18882
Author: fwang12 <[email protected]>
AuthorDate: Fri Aug 25 16:12:17 2023 +0800
[KYUUBI #5199] Read all columns of metadata to prevent column missing
### _Why are the changes needed?_
Since https://github.com/apache/kyuubi/issues/4843, we need the
`requestConf` column to get application manager info.
So now the stateOnly option for metadata select is not needed.
```
private val METADATA_ALL_COLUMNS = Seq(
METADATA_STATE_ONLY_COLUMNS,
"resource",
"class_name",
"request_conf",
"request_args").mkString(",")
```
In this pr, I remove the `stateOnly` flag to select all metadata columns to
prevent column missing and make it easy to maintain.
### _How was this patch tested?_
Existing UT.
### _Was this patch authored or co-authored using generative AI tooling?_
No
Closes #5199 from turboFei/metadata_state_only.
Closes #5199
8e02e6a5e [fwang12] all
Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
.../kyuubi/server/metadata/MetadataManager.scala | 8 +-
.../kyuubi/server/metadata/MetadataStore.scala | 10 +--
.../server/metadata/jdbc/JDBCMetadataStore.scala | 52 +++++--------
.../metadata/jdbc/JDBCMetadataStoreSuite.scala | 87 ++++++++--------------
4 files changed, 52 insertions(+), 105 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index daa3451d9..1da9e1f31 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -129,12 +129,12 @@ class MetadataManager extends
AbstractService("MetadataManager") {
}
def getBatchSessionMetadata(batchId: String): Option[Metadata] = {
- Option(withMetadataRequestMetrics(_metadataStore.getMetadata(batchId,
true)))
+ Option(withMetadataRequestMetrics(_metadataStore.getMetadata(batchId)))
.filter(_.sessionType == SessionType.BATCH)
}
def getBatches(filter: MetadataFilter, from: Int, size: Int): Seq[Batch] = {
- withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from,
size, true)).map(
+ withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from,
size)).map(
buildBatch)
}
@@ -168,7 +168,7 @@ class MetadataManager extends
AbstractService("MetadataManager") {
sessionType = SessionType.BATCH,
state = state,
kyuubiInstance = kyuubiInstance)
- withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from,
size, false))
+ withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from,
size))
}
def getPeerInstanceClosedBatchesMetadata(
@@ -181,7 +181,7 @@ class MetadataManager extends
AbstractService("MetadataManager") {
state = state,
kyuubiInstance = kyuubiInstance,
peerInstanceClosed = true)
- withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from,
size, true))
+ withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from,
size))
}
def updateMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true):
Unit = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
index 1eabb224d..d8258816a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
@@ -48,24 +48,18 @@ trait MetadataStore extends Closeable {
/**
* Get the persisted metadata by batch identifier.
* @param identifier the identifier.
- * @param stateOnly only return the state related column values.
* @return selected metadata.
*/
- def getMetadata(identifier: String, stateOnly: Boolean): Metadata
+ def getMetadata(identifier: String): Metadata
/**
* Get the metadata list with filter conditions, offset and size.
* @param filter the metadata filter conditions.
* @param from the metadata offset.
* @param size the size to get.
- * @param stateOnly only return the state related column values.
* @return selected metadata list.
*/
- def getMetadataList(
- filter: MetadataFilter,
- from: Int,
- size: Int,
- stateOnly: Boolean): Seq[Metadata]
+ def getMetadataList(filter: MetadataFilter, from: Int, size: Int):
Seq[Metadata]
/**
* Count the metadata list with filter conditions.
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
index a79832d38..d32cdc838 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
@@ -215,7 +215,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
stmt.setString(4, OperationState.INITIALIZED.toString)
} == 1
}.map { pickedBatchId =>
- getMetadata(pickedBatchId, stateOnly = false)
+ getMetadata(pickedBatchId)
}
}
@@ -231,30 +231,21 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
}
}
- override def getMetadata(identifier: String, stateOnly: Boolean): Metadata =
{
- val query =
- if (stateOnly) {
- s"SELECT $METADATA_STATE_ONLY_COLUMNS FROM $METADATA_TABLE WHERE
identifier = ?"
- } else {
- s"SELECT $METADATA_ALL_COLUMNS FROM $METADATA_TABLE WHERE identifier =
?"
- }
+ override def getMetadata(identifier: String): Metadata = {
+ val query = s"SELECT $METADATA_COLUMNS FROM $METADATA_TABLE WHERE
identifier = ?"
JdbcUtils.withConnection { connection =>
withResultSet(connection, query, identifier) { rs =>
- buildMetadata(rs, stateOnly).headOption.orNull
+ buildMetadata(rs).headOption.orNull
}
}
}
- override def getMetadataList(
- filter: MetadataFilter,
- from: Int,
- size: Int,
- stateOnly: Boolean): Seq[Metadata] = {
+ override def getMetadataList(filter: MetadataFilter, from: Int, size: Int):
Seq[Metadata] = {
val queryBuilder = new StringBuilder
val params = ListBuffer[Any]()
queryBuilder.append("SELECT ")
- queryBuilder.append(if (stateOnly) METADATA_STATE_ONLY_COLUMNS else
METADATA_ALL_COLUMNS)
+ queryBuilder.append(METADATA_COLUMNS)
queryBuilder.append(s" FROM $METADATA_TABLE")
queryBuilder.append(s" ${assembleWhereClause(filter, params)}")
queryBuilder.append(" ORDER BY key_id ")
@@ -262,7 +253,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
val query = queryBuilder.toString
JdbcUtils.withConnection { connection =>
withResultSet(connection, query, params.toSeq: _*) { rs =>
- buildMetadata(rs, stateOnly)
+ buildMetadata(rs)
}
}
}
@@ -411,7 +402,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
}
}
- private def buildMetadata(resultSet: ResultSet, stateOnly: Boolean):
Seq[Metadata] = {
+ private def buildMetadata(resultSet: ResultSet): Seq[Metadata] = {
try {
val metadataList = ListBuffer[Metadata]()
while (resultSet.next()) {
@@ -422,7 +413,11 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
val ipAddress = resultSet.getString("ip_address")
val kyuubiInstance = resultSet.getString("kyuubi_instance")
val state = resultSet.getString("state")
+ val resource = resultSet.getString("resource")
+ val className = resultSet.getString("class_name")
val requestName = resultSet.getString("request_name")
+ val requestConf = string2Map(resultSet.getString("request_conf"))
+ val requestArgs = string2Seq(resultSet.getString("request_args"))
val createTime = resultSet.getLong("create_time")
val engineType = resultSet.getString("engine_type")
val clusterManager = Option(resultSet.getString("cluster_manager"))
@@ -434,17 +429,6 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
val endTime = resultSet.getLong("end_time")
val peerInstanceClosed = resultSet.getBoolean("peer_instance_closed")
- var resource: String = null
- var className: String = null
- var requestConf: Map[String, String] = Map.empty
- var requestArgs: Seq[String] = Seq.empty
-
- if (!stateOnly) {
- resource = resultSet.getString("resource")
- className = resultSet.getString("class_name")
- requestConf = string2Map(resultSet.getString("request_conf"))
- requestArgs = string2Seq(resultSet.getString("request_args"))
- }
val metadata = Metadata(
identifier = identifier,
sessionType = sessionType,
@@ -583,7 +567,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
object JDBCMetadataStore {
private val SCHEMA_URL_PATTERN =
"""^metadata-store-schema-(\d+)\.(\d+)\.(\d+)\.(.*)\.sql$""".r
private val METADATA_TABLE = "metadata"
- private val METADATA_STATE_ONLY_COLUMNS = Seq(
+ private val METADATA_COLUMNS = Seq(
"identifier",
"session_type",
"real_user",
@@ -591,7 +575,11 @@ object JDBCMetadataStore {
"ip_address",
"kyuubi_instance",
"state",
+ "resource",
+ "class_name",
"request_name",
+ "request_conf",
+ "request_args",
"create_time",
"engine_type",
"cluster_manager",
@@ -602,10 +590,4 @@ object JDBCMetadataStore {
"engine_error",
"end_time",
"peer_instance_closed").mkString(",")
- private val METADATA_ALL_COLUMNS = Seq(
- METADATA_STATE_ONLY_COLUMNS,
- "resource",
- "class_name",
- "request_conf",
- "request_args").mkString(",")
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
index aee4497df..2ee082a1d 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
@@ -39,11 +39,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
override def afterAll(): Unit = {
super.afterAll()
- jdbcMetadataStore.getMetadataList(
- MetadataFilter(),
- 0,
- Int.MaxValue,
- true).foreach {
+ jdbcMetadataStore.getMetadataList(MetadataFilter(), 0,
Int.MaxValue).foreach {
batch =>
jdbcMetadataStore.cleanupMetadataByIdentifier(batch.identifier)
}
@@ -82,28 +78,18 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
engineType = "spark",
clusterManager = Some("local"))
- var batchStateOnlyMetadata = batchMetadata.copy(
- resource = null,
- className = null,
- requestConf = Map.empty,
- requestArgs = Seq.empty)
-
jdbcMetadataStore.insertMetadata(batchMetadata)
- assert(jdbcMetadataStore.getMetadata(batchId, true) !=
batchStateOnlyMetadata)
- assert(jdbcMetadataStore.getMetadata(batchId, false) != batchMetadata)
// the engine type is formatted with UPPER
batchMetadata = batchMetadata.copy(engineType = "SPARK")
- batchStateOnlyMetadata = batchStateOnlyMetadata.copy(engineType = "SPARK")
- assert(jdbcMetadataStore.getMetadata(batchId, true) ==
batchStateOnlyMetadata)
- assert(jdbcMetadataStore.getMetadata(batchId, false) == batchMetadata)
+ assert(jdbcMetadataStore.getMetadata(batchId) == batchMetadata)
jdbcMetadataStore.cleanupMetadataByIdentifier(batchId)
- assert(jdbcMetadataStore.getMetadata(batchId, true) == null)
+ assert(jdbcMetadataStore.getMetadata(batchId) == null)
jdbcMetadataStore.insertMetadata(batchMetadata)
- val batchState2 = batchStateOnlyMetadata.copy(identifier =
UUID.randomUUID().toString)
+ val batchState2 = batchMetadata.copy(identifier =
UUID.randomUUID().toString)
jdbcMetadataStore.insertMetadata(batchState2)
var batches =
@@ -112,9 +98,8 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
sessionType = SessionType.BATCH,
engineType = "Spark"),
0,
- 1,
- true)
- assert(batches == Seq(batchStateOnlyMetadata))
+ 1)
+ assert(batches == Seq(batchMetadata))
batches = jdbcMetadataStore.getMetadataList(
MetadataFilter(
@@ -122,9 +107,8 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
engineType = "Spark",
username = "kyuubi"),
0,
- Int.MaxValue,
- true)
- assert(batches == Seq(batchStateOnlyMetadata, batchState2))
+ Int.MaxValue)
+ assert(batches == Seq(batchMetadata, batchState2))
jdbcMetadataStore.cleanupMetadataByIdentifier(batchState2.identifier)
@@ -135,8 +119,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
username = "kyuubi",
state = "PENDING"),
0,
- Int.MaxValue,
- true)
+ Int.MaxValue)
assert(batches.isEmpty)
batches = jdbcMetadataStore.getMetadataList(
@@ -146,9 +129,8 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
username = "kyuubi",
state = "PENDING"),
0,
- Int.MaxValue,
- true)
- assert(batches == Seq(batchStateOnlyMetadata))
+ Int.MaxValue)
+ assert(batches == Seq(batchMetadata))
batches = jdbcMetadataStore.getMetadataList(
MetadataFilter(
@@ -157,8 +139,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
username = "kyuubi",
state = "RUNNING"),
0,
- Int.MaxValue,
- true)
+ Int.MaxValue)
assert(batches.isEmpty)
batches = jdbcMetadataStore.getMetadataList(
@@ -168,8 +149,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
username = "no_kyuubi",
state = "PENDING"),
0,
- Int.MaxValue,
- true)
+ Int.MaxValue)
assert(batches.isEmpty)
batches = jdbcMetadataStore.getMetadataList(
@@ -178,31 +158,27 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
engineType = "SPARK",
state = "PENDING"),
0,
- Int.MaxValue,
- true)
- assert(batches == Seq(batchStateOnlyMetadata))
+ Int.MaxValue)
+ assert(batches == Seq(batchMetadata))
batches = jdbcMetadataStore.getMetadataList(
MetadataFilter(sessionType = SessionType.BATCH),
0,
- Int.MaxValue,
- true)
- assert(batches == Seq(batchStateOnlyMetadata))
+ Int.MaxValue)
+ assert(batches == Seq(batchMetadata))
batches = jdbcMetadataStore.getMetadataList(
MetadataFilter(
sessionType = SessionType.BATCH,
peerInstanceClosed = true),
0,
- Int.MaxValue,
- true)
+ Int.MaxValue)
assert(batches.isEmpty)
jdbcMetadataStore.updateMetadata(Metadata(
- identifier = batchStateOnlyMetadata.identifier,
+ identifier = batchMetadata.identifier,
peerInstanceClosed = true))
- batchStateOnlyMetadata = batchStateOnlyMetadata.copy(peerInstanceClosed =
true)
batchMetadata = batchMetadata.copy(peerInstanceClosed = true)
batches = jdbcMetadataStore.getMetadataList(
@@ -210,9 +186,8 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
sessionType = SessionType.BATCH,
peerInstanceClosed = true),
0,
- Int.MaxValue,
- true)
- assert(batches === Seq(batchStateOnlyMetadata))
+ Int.MaxValue)
+ assert(batches === Seq(batchMetadata))
var batchesToRecover = jdbcMetadataStore.getMetadataList(
MetadataFilter(
@@ -220,8 +195,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
state = "PENDING",
kyuubiInstance = kyuubiInstance),
0,
- Int.MaxValue,
- false)
+ Int.MaxValue)
assert(batchesToRecover == Seq(batchMetadata))
batchesToRecover = jdbcMetadataStore.getMetadataList(
@@ -230,11 +204,10 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
state = "RUNNING",
kyuubiInstance = kyuubiInstance),
0,
- Int.MaxValue,
- false)
+ Int.MaxValue)
assert(batchesToRecover.isEmpty)
- var newBatchState = batchStateOnlyMetadata.copy(
+ var newBatchState = batchMetadata.copy(
state = "RUNNING",
engineId = "app_id",
engineName = "app_name",
@@ -242,12 +215,12 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
engineState = "RUNNING",
engineError = None)
jdbcMetadataStore.updateMetadata(newBatchState)
- assert(jdbcMetadataStore.getMetadata(batchId, true) == newBatchState)
+ assert(jdbcMetadataStore.getMetadata(batchId) == newBatchState)
newBatchState = newBatchState.copy(state = "FINISHED", endTime =
System.currentTimeMillis())
jdbcMetadataStore.updateMetadata(newBatchState)
- assert(jdbcMetadataStore.getMetadata(batchId, true) == newBatchState)
+ assert(jdbcMetadataStore.getMetadata(batchId) == newBatchState)
assert(jdbcMetadataStore.getMetadataList(
MetadataFilter(
@@ -255,8 +228,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
state = "PENDING",
kyuubiInstance = kyuubiInstance),
0,
- Int.MaxValue,
- false).isEmpty)
+ Int.MaxValue).isEmpty)
assert(jdbcMetadataStore.getMetadataList(
MetadataFilter(
@@ -264,12 +236,11 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
state = "RUNNING",
kyuubiInstance = kyuubiInstance),
0,
- Int.MaxValue,
- false).isEmpty)
+ Int.MaxValue).isEmpty)
eventually(Timeout(3.seconds)) {
jdbcMetadataStore.cleanupMetadataByAge(1000)
- assert(jdbcMetadataStore.getMetadata(batchId, true) == null)
+ assert(jdbcMetadataStore.getMetadata(batchId) == null)
}
}