This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 56cbd582c [KYUUBI #5199] Read all columns of metadata to prevent 
column missing
56cbd582c is described below

commit 56cbd582c4ba86e9558957203527c50d2fe18882
Author: fwang12 <[email protected]>
AuthorDate: Fri Aug 25 16:12:17 2023 +0800

    [KYUUBI #5199] Read all columns of metadata to prevent column missing
    
    ### _Why are the changes needed?_
    
    Since https://github.com/apache/kyuubi/issues/4843, we need the 
`requestConf` column to get application manager info.
    
    So now the stateOnly option for metadata select is not needed.
    ```
    private val METADATA_ALL_COLUMNS = Seq(
        METADATA_STATE_ONLY_COLUMNS,
        "resource",
        "class_name",
        "request_conf",
        "request_args").mkString(",")
    ```
    In this pr, I remove the `stateOnly` flag to select all metadata columns to 
prevent column missing and make it easy to maintain.
    ### _How was this patch tested?_
    Existing UT.
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    No
    
    Closes #5199 from turboFei/metadata_state_only.
    
    Closes #5199
    
    8e02e6a5e [fwang12] all
    
    Authored-by: fwang12 <[email protected]>
    Signed-off-by: fwang12 <[email protected]>
---
 .../kyuubi/server/metadata/MetadataManager.scala   |  8 +-
 .../kyuubi/server/metadata/MetadataStore.scala     | 10 +--
 .../server/metadata/jdbc/JDBCMetadataStore.scala   | 52 +++++--------
 .../metadata/jdbc/JDBCMetadataStoreSuite.scala     | 87 ++++++++--------------
 4 files changed, 52 insertions(+), 105 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index daa3451d9..1da9e1f31 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -129,12 +129,12 @@ class MetadataManager extends 
AbstractService("MetadataManager") {
   }
 
   def getBatchSessionMetadata(batchId: String): Option[Metadata] = {
-    Option(withMetadataRequestMetrics(_metadataStore.getMetadata(batchId, 
true)))
+    Option(withMetadataRequestMetrics(_metadataStore.getMetadata(batchId)))
       .filter(_.sessionType == SessionType.BATCH)
   }
 
   def getBatches(filter: MetadataFilter, from: Int, size: Int): Seq[Batch] = {
-    withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, 
size, true)).map(
+    withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, 
size)).map(
       buildBatch)
   }
 
@@ -168,7 +168,7 @@ class MetadataManager extends 
AbstractService("MetadataManager") {
       sessionType = SessionType.BATCH,
       state = state,
       kyuubiInstance = kyuubiInstance)
-    withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, 
size, false))
+    withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, 
size))
   }
 
   def getPeerInstanceClosedBatchesMetadata(
@@ -181,7 +181,7 @@ class MetadataManager extends 
AbstractService("MetadataManager") {
       state = state,
       kyuubiInstance = kyuubiInstance,
       peerInstanceClosed = true)
-    withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, 
size, true))
+    withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, 
size))
   }
 
   def updateMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): 
Unit = {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
index 1eabb224d..d8258816a 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
@@ -48,24 +48,18 @@ trait MetadataStore extends Closeable {
   /**
    * Get the persisted metadata by batch identifier.
    * @param identifier the identifier.
-   * @param stateOnly only return the state related column values.
    * @return selected metadata.
    */
-  def getMetadata(identifier: String, stateOnly: Boolean): Metadata
+  def getMetadata(identifier: String): Metadata
 
   /**
    * Get the metadata list with filter conditions, offset and size.
    * @param filter the metadata filter conditions.
    * @param from the metadata offset.
    * @param size the size to get.
-   * @param stateOnly only return the state related column values.
    * @return selected metadata list.
    */
-  def getMetadataList(
-      filter: MetadataFilter,
-      from: Int,
-      size: Int,
-      stateOnly: Boolean): Seq[Metadata]
+  def getMetadataList(filter: MetadataFilter, from: Int, size: Int): 
Seq[Metadata]
 
   /**
    * Count the metadata list with filter conditions.
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
index a79832d38..d32cdc838 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
@@ -215,7 +215,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
         stmt.setString(4, OperationState.INITIALIZED.toString)
       } == 1
     }.map { pickedBatchId =>
-      getMetadata(pickedBatchId, stateOnly = false)
+      getMetadata(pickedBatchId)
     }
   }
 
@@ -231,30 +231,21 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
     }
   }
 
-  override def getMetadata(identifier: String, stateOnly: Boolean): Metadata = 
{
-    val query =
-      if (stateOnly) {
-        s"SELECT $METADATA_STATE_ONLY_COLUMNS FROM $METADATA_TABLE WHERE 
identifier = ?"
-      } else {
-        s"SELECT $METADATA_ALL_COLUMNS FROM $METADATA_TABLE WHERE identifier = 
?"
-      }
+  override def getMetadata(identifier: String): Metadata = {
+    val query = s"SELECT $METADATA_COLUMNS FROM $METADATA_TABLE WHERE 
identifier = ?"
 
     JdbcUtils.withConnection { connection =>
       withResultSet(connection, query, identifier) { rs =>
-        buildMetadata(rs, stateOnly).headOption.orNull
+        buildMetadata(rs).headOption.orNull
       }
     }
   }
 
-  override def getMetadataList(
-      filter: MetadataFilter,
-      from: Int,
-      size: Int,
-      stateOnly: Boolean): Seq[Metadata] = {
+  override def getMetadataList(filter: MetadataFilter, from: Int, size: Int): 
Seq[Metadata] = {
     val queryBuilder = new StringBuilder
     val params = ListBuffer[Any]()
     queryBuilder.append("SELECT ")
-    queryBuilder.append(if (stateOnly) METADATA_STATE_ONLY_COLUMNS else 
METADATA_ALL_COLUMNS)
+    queryBuilder.append(METADATA_COLUMNS)
     queryBuilder.append(s" FROM $METADATA_TABLE")
     queryBuilder.append(s" ${assembleWhereClause(filter, params)}")
     queryBuilder.append(" ORDER BY key_id ")
@@ -262,7 +253,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
     val query = queryBuilder.toString
     JdbcUtils.withConnection { connection =>
       withResultSet(connection, query, params.toSeq: _*) { rs =>
-        buildMetadata(rs, stateOnly)
+        buildMetadata(rs)
       }
     }
   }
@@ -411,7 +402,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
     }
   }
 
-  private def buildMetadata(resultSet: ResultSet, stateOnly: Boolean): 
Seq[Metadata] = {
+  private def buildMetadata(resultSet: ResultSet): Seq[Metadata] = {
     try {
       val metadataList = ListBuffer[Metadata]()
       while (resultSet.next()) {
@@ -422,7 +413,11 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
         val ipAddress = resultSet.getString("ip_address")
         val kyuubiInstance = resultSet.getString("kyuubi_instance")
         val state = resultSet.getString("state")
+        val resource = resultSet.getString("resource")
+        val className = resultSet.getString("class_name")
         val requestName = resultSet.getString("request_name")
+        val requestConf = string2Map(resultSet.getString("request_conf"))
+        val requestArgs = string2Seq(resultSet.getString("request_args"))
         val createTime = resultSet.getLong("create_time")
         val engineType = resultSet.getString("engine_type")
         val clusterManager = Option(resultSet.getString("cluster_manager"))
@@ -434,17 +429,6 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
         val endTime = resultSet.getLong("end_time")
         val peerInstanceClosed = resultSet.getBoolean("peer_instance_closed")
 
-        var resource: String = null
-        var className: String = null
-        var requestConf: Map[String, String] = Map.empty
-        var requestArgs: Seq[String] = Seq.empty
-
-        if (!stateOnly) {
-          resource = resultSet.getString("resource")
-          className = resultSet.getString("class_name")
-          requestConf = string2Map(resultSet.getString("request_conf"))
-          requestArgs = string2Seq(resultSet.getString("request_args"))
-        }
         val metadata = Metadata(
           identifier = identifier,
           sessionType = sessionType,
@@ -583,7 +567,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
 object JDBCMetadataStore {
   private val SCHEMA_URL_PATTERN = 
"""^metadata-store-schema-(\d+)\.(\d+)\.(\d+)\.(.*)\.sql$""".r
   private val METADATA_TABLE = "metadata"
-  private val METADATA_STATE_ONLY_COLUMNS = Seq(
+  private val METADATA_COLUMNS = Seq(
     "identifier",
     "session_type",
     "real_user",
@@ -591,7 +575,11 @@ object JDBCMetadataStore {
     "ip_address",
     "kyuubi_instance",
     "state",
+    "resource",
+    "class_name",
     "request_name",
+    "request_conf",
+    "request_args",
     "create_time",
     "engine_type",
     "cluster_manager",
@@ -602,10 +590,4 @@ object JDBCMetadataStore {
     "engine_error",
     "end_time",
     "peer_instance_closed").mkString(",")
-  private val METADATA_ALL_COLUMNS = Seq(
-    METADATA_STATE_ONLY_COLUMNS,
-    "resource",
-    "class_name",
-    "request_conf",
-    "request_args").mkString(",")
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
index aee4497df..2ee082a1d 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
@@ -39,11 +39,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
 
   override def afterAll(): Unit = {
     super.afterAll()
-    jdbcMetadataStore.getMetadataList(
-      MetadataFilter(),
-      0,
-      Int.MaxValue,
-      true).foreach {
+    jdbcMetadataStore.getMetadataList(MetadataFilter(), 0, 
Int.MaxValue).foreach {
       batch =>
         jdbcMetadataStore.cleanupMetadataByIdentifier(batch.identifier)
     }
@@ -82,28 +78,18 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
       engineType = "spark",
       clusterManager = Some("local"))
 
-    var batchStateOnlyMetadata = batchMetadata.copy(
-      resource = null,
-      className = null,
-      requestConf = Map.empty,
-      requestArgs = Seq.empty)
-
     jdbcMetadataStore.insertMetadata(batchMetadata)
-    assert(jdbcMetadataStore.getMetadata(batchId, true) != 
batchStateOnlyMetadata)
-    assert(jdbcMetadataStore.getMetadata(batchId, false) != batchMetadata)
 
     // the engine type is formatted with UPPER
     batchMetadata = batchMetadata.copy(engineType = "SPARK")
-    batchStateOnlyMetadata = batchStateOnlyMetadata.copy(engineType = "SPARK")
-    assert(jdbcMetadataStore.getMetadata(batchId, true) == 
batchStateOnlyMetadata)
-    assert(jdbcMetadataStore.getMetadata(batchId, false) == batchMetadata)
+    assert(jdbcMetadataStore.getMetadata(batchId) == batchMetadata)
 
     jdbcMetadataStore.cleanupMetadataByIdentifier(batchId)
-    assert(jdbcMetadataStore.getMetadata(batchId, true) == null)
+    assert(jdbcMetadataStore.getMetadata(batchId) == null)
 
     jdbcMetadataStore.insertMetadata(batchMetadata)
 
-    val batchState2 = batchStateOnlyMetadata.copy(identifier = 
UUID.randomUUID().toString)
+    val batchState2 = batchMetadata.copy(identifier = 
UUID.randomUUID().toString)
     jdbcMetadataStore.insertMetadata(batchState2)
 
     var batches =
@@ -112,9 +98,8 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
           sessionType = SessionType.BATCH,
           engineType = "Spark"),
         0,
-        1,
-        true)
-    assert(batches == Seq(batchStateOnlyMetadata))
+        1)
+    assert(batches == Seq(batchMetadata))
 
     batches = jdbcMetadataStore.getMetadataList(
       MetadataFilter(
@@ -122,9 +107,8 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
         engineType = "Spark",
         username = "kyuubi"),
       0,
-      Int.MaxValue,
-      true)
-    assert(batches == Seq(batchStateOnlyMetadata, batchState2))
+      Int.MaxValue)
+    assert(batches == Seq(batchMetadata, batchState2))
 
     jdbcMetadataStore.cleanupMetadataByIdentifier(batchState2.identifier)
 
@@ -135,8 +119,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
         username = "kyuubi",
         state = "PENDING"),
       0,
-      Int.MaxValue,
-      true)
+      Int.MaxValue)
     assert(batches.isEmpty)
 
     batches = jdbcMetadataStore.getMetadataList(
@@ -146,9 +129,8 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
         username = "kyuubi",
         state = "PENDING"),
       0,
-      Int.MaxValue,
-      true)
-    assert(batches == Seq(batchStateOnlyMetadata))
+      Int.MaxValue)
+    assert(batches == Seq(batchMetadata))
 
     batches = jdbcMetadataStore.getMetadataList(
       MetadataFilter(
@@ -157,8 +139,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
         username = "kyuubi",
         state = "RUNNING"),
       0,
-      Int.MaxValue,
-      true)
+      Int.MaxValue)
     assert(batches.isEmpty)
 
     batches = jdbcMetadataStore.getMetadataList(
@@ -168,8 +149,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
         username = "no_kyuubi",
         state = "PENDING"),
       0,
-      Int.MaxValue,
-      true)
+      Int.MaxValue)
     assert(batches.isEmpty)
 
     batches = jdbcMetadataStore.getMetadataList(
@@ -178,31 +158,27 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
         engineType = "SPARK",
         state = "PENDING"),
       0,
-      Int.MaxValue,
-      true)
-    assert(batches == Seq(batchStateOnlyMetadata))
+      Int.MaxValue)
+    assert(batches == Seq(batchMetadata))
 
     batches = jdbcMetadataStore.getMetadataList(
       MetadataFilter(sessionType = SessionType.BATCH),
       0,
-      Int.MaxValue,
-      true)
-    assert(batches == Seq(batchStateOnlyMetadata))
+      Int.MaxValue)
+    assert(batches == Seq(batchMetadata))
 
     batches = jdbcMetadataStore.getMetadataList(
       MetadataFilter(
         sessionType = SessionType.BATCH,
         peerInstanceClosed = true),
       0,
-      Int.MaxValue,
-      true)
+      Int.MaxValue)
     assert(batches.isEmpty)
 
     jdbcMetadataStore.updateMetadata(Metadata(
-      identifier = batchStateOnlyMetadata.identifier,
+      identifier = batchMetadata.identifier,
       peerInstanceClosed = true))
 
-    batchStateOnlyMetadata = batchStateOnlyMetadata.copy(peerInstanceClosed = 
true)
     batchMetadata = batchMetadata.copy(peerInstanceClosed = true)
 
     batches = jdbcMetadataStore.getMetadataList(
@@ -210,9 +186,8 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
         sessionType = SessionType.BATCH,
         peerInstanceClosed = true),
       0,
-      Int.MaxValue,
-      true)
-    assert(batches === Seq(batchStateOnlyMetadata))
+      Int.MaxValue)
+    assert(batches === Seq(batchMetadata))
 
     var batchesToRecover = jdbcMetadataStore.getMetadataList(
       MetadataFilter(
@@ -220,8 +195,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
         state = "PENDING",
         kyuubiInstance = kyuubiInstance),
       0,
-      Int.MaxValue,
-      false)
+      Int.MaxValue)
     assert(batchesToRecover == Seq(batchMetadata))
 
     batchesToRecover = jdbcMetadataStore.getMetadataList(
@@ -230,11 +204,10 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
         state = "RUNNING",
         kyuubiInstance = kyuubiInstance),
       0,
-      Int.MaxValue,
-      false)
+      Int.MaxValue)
     assert(batchesToRecover.isEmpty)
 
-    var newBatchState = batchStateOnlyMetadata.copy(
+    var newBatchState = batchMetadata.copy(
       state = "RUNNING",
       engineId = "app_id",
       engineName = "app_name",
@@ -242,12 +215,12 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
       engineState = "RUNNING",
       engineError = None)
     jdbcMetadataStore.updateMetadata(newBatchState)
-    assert(jdbcMetadataStore.getMetadata(batchId, true) == newBatchState)
+    assert(jdbcMetadataStore.getMetadata(batchId) == newBatchState)
 
     newBatchState = newBatchState.copy(state = "FINISHED", endTime = 
System.currentTimeMillis())
     jdbcMetadataStore.updateMetadata(newBatchState)
 
-    assert(jdbcMetadataStore.getMetadata(batchId, true) == newBatchState)
+    assert(jdbcMetadataStore.getMetadata(batchId) == newBatchState)
 
     assert(jdbcMetadataStore.getMetadataList(
       MetadataFilter(
@@ -255,8 +228,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
         state = "PENDING",
         kyuubiInstance = kyuubiInstance),
       0,
-      Int.MaxValue,
-      false).isEmpty)
+      Int.MaxValue).isEmpty)
 
     assert(jdbcMetadataStore.getMetadataList(
       MetadataFilter(
@@ -264,12 +236,11 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
         state = "RUNNING",
         kyuubiInstance = kyuubiInstance),
       0,
-      Int.MaxValue,
-      false).isEmpty)
+      Int.MaxValue).isEmpty)
 
     eventually(Timeout(3.seconds)) {
       jdbcMetadataStore.cleanupMetadataByAge(1000)
-      assert(jdbcMetadataStore.getMetadata(batchId, true) == null)
+      assert(jdbcMetadataStore.getMetadata(batchId) == null)
     }
   }
 

Reply via email to