This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 331b018d0cfc feat(hive-sync): add Spark-catalog based metastore client 
implementation to avoid Hive-on-Spark classloader issues (#18203)
331b018d0cfc is described below

commit 331b018d0cfc06d1888d3c1de8498cb677441e90
Author: Surya Prasanna <[email protected]>
AuthorDate: Sun Mar 22 20:40:47 2026 -0700

    feat(hive-sync): add Spark-catalog based metastore client implementation to 
avoid Hive-on-Spark classloader issues (#18203)
    
    * Create SparkCatalogSyncMetastoreClient to support hive-sync on 
Hive-on-Spark environment
    * Add unit test for SparkCatalogMetaStoreClient class
---
 .../sql/hive/SparkCatalogMetaStoreClient.scala     | 380 +++++++++++++++++++++
 .../sql/hive/TestSparkCatalogMetaStoreClient.scala | 246 +++++++++++++
 .../spark/sql/hudi/ddl/TestSparkCatalogSync.scala  | 162 +++++++++
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  |   4 +
 .../org/apache/hudi/hive/HiveSyncConfigHolder.java |   6 +
 .../org/apache/hudi/hive/HoodieHiveSyncClient.java |  18 +-
 6 files changed, 815 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hive/SparkCatalogMetaStoreClient.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hive/SparkCatalogMetaStoreClient.scala
new file mode 100644
index 000000000000..d238341bed46
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hive/SparkCatalogMetaStoreClient.scala
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hudi.hive.HiveSyncConfig
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient
+import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, 
FieldSchema, Partition, SerDeInfo, StorageDescriptor, Table}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, 
CatalogStorageFormat, CatalogTable, CatalogTablePartition, CatalogTableType}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.{Metadata, StructField, StructType}
+
+import java.net.URI
+import java.util
+
+import scala.collection.JavaConverters._
+
+/**
+ * IMetaStoreClient implementation backed by Spark catalog/external-catalog 
APIs for
+ * methods used by HoodieHiveSyncClient/HMSDDLExecutor.
+ */
+class SparkCatalogMetaStoreClient(syncConfig: HiveSyncConfig)
+  extends IMetaStoreClient {
+
+  private val sparkSession = 
SparkSession.getActiveSession.getOrElse(SparkSession.builder()
+    .enableHiveSupport()
+    .getOrCreate())
+
+  private val externalCatalog = 
sparkSession.sessionState.catalog.externalCatalog
+
+  override def createDatabase(database: Database): Unit = {
+    val catalogDb = CatalogDatabase(
+      name = database.getName,
+      description = Option(database.getDescription).getOrElse(""),
+      locationUri = Option(database.getLocationUri).map(new URI(_))
+        .getOrElse(new URI(sparkSession.sessionState.conf.warehousePath)),
+      properties = 
Option(database.getParameters).map(_.asScala.toMap).getOrElse(Map.empty))
+    externalCatalog.createDatabase(catalogDb, ignoreIfExists = false)
+  }
+
+  override def createTable(table: Table): Unit = {
+    externalCatalog.createTable(toCatalogTable(table), ignoreIfExists = false)
+  }
+
+  override def getTable(dbName: String, tableName: String): Table = {
+    fromCatalogTable(externalCatalog.getTable(dbName, tableName))
+  }
+
+  // scalastyle:off method.name
+  override def alter_table(dbName: String, tableName: String, table: Table): 
Unit = {
+    val updated = toCatalogTable(table).copy(identifier = 
TableIdentifier(tableName, Some(dbName)))
+    externalCatalog.alterTable(updated)
+  }
+
+  override def alter_table_with_environmentContext(dbName: String,
+                                                   tableName: String,
+                                                   table: Table,
+                                                   environmentContext: 
EnvironmentContext): Unit = {
+    alter_table(dbName, tableName, table)
+  }
+
+  override def listPartitions(dbName: String, tableName: String, max: Short): 
util.List[Partition] = {
+    val table = getTable(dbName, tableName)
+    val partitionKeys = table.getPartitionKeys.asScala.map(_.getName).toList
+    externalCatalog.listPartitions(dbName, tableName, 
None).map(fromCatalogPartition(_, dbName, tableName, partitionKeys)).asJava
+  }
+
+  override def listPartitionsByFilter(dbName: String,
+                                      tableName: String,
+                                      filter: String,
+                                      max: Short): util.List[Partition] = {
+    // Spark external catalog does not expose Hive filter-string API; fall 
back to listing all.
+    listPartitions(dbName, tableName, max)
+  }
+
+  override def add_partitions(parts: util.List[Partition], ifNotExists: 
Boolean, needResults: Boolean): util.List[Partition] = {
+    if (parts == null || parts.isEmpty) {
+      new util.ArrayList[Partition]()
+    } else {
+      val first = parts.get(0)
+      val db = first.getDbName
+      val tbl = first.getTableName
+      val catalogParts = parts.asScala.map(toCatalogPartition).toSeq
+      externalCatalog.createPartitions(db, tbl, catalogParts, ignoreIfExists = 
ifNotExists)
+      if (needResults) parts else new util.ArrayList[Partition]()
+    }
+  }
+
+  override def alter_partitions(dbName: String,
+                                tableName: String,
+                                newParts: util.List[Partition],
+                                environmentContext: EnvironmentContext): Unit 
= {
+    externalCatalog.alterPartitions(dbName, tableName, 
newParts.asScala.map(toCatalogPartition).toSeq)
+  }
+
+  override def dropPartition(dbName: String, tableName: String, partName: 
String, deleteData: Boolean): Boolean = {
+    val spec = parsePartitionClause(partName)
+    externalCatalog.dropPartitions(dbName, tableName, Seq(spec), 
ignoreIfNotExists = true, purge = true, retainData = !deleteData)
+    true
+  }
+  // scalastyle:on method.name
+
+  override def tableExists(dbName: String, tableName: String): Boolean = {
+    sparkSession.catalog.tableExists(dbName, tableName)
+  }
+
+  override def getDatabase(dbName: String): Database = {
+    val db = externalCatalog.getDatabase(dbName)
+    new Database(db.name, db.description, db.locationUri.toString, 
db.properties.asJava)
+  }
+
+  override def getSchema(dbName: String, tableName: String): 
util.List[FieldSchema] = {
+    val table = externalCatalog.getTable(dbName, tableName)
+    val cols = table.schema.fields.map { f =>
+      new FieldSchema(f.name, f.dataType.catalogString, 
Option(f.getComment()).map(_.toString).getOrElse(""))
+    }
+    val partitionCols = table.partitionColumnNames.map { name =>
+      val dt = table.partitionSchema.fields.find(_.name == 
name).map(_.dataType.catalogString).getOrElse("string")
+      new FieldSchema(name, dt, "")
+    }
+    (cols ++ partitionCols).toList.asJava
+  }
+
+  override def dropTable(dbName: String, tableName: String): Unit = {
+    externalCatalog.dropTable(dbName, tableName, ignoreIfNotExists = true, 
purge = true)
+  }
+
+  // scalastyle:off
+  private def unsupported[T](): T = {
+    throw new UnsupportedOperationException("Method is not supported in 
SparkCatalogMetaStoreClient")
+  }
+
+  override def isCompatibleWith(arg0: org.apache.hadoop.hive.conf.HiveConf): 
Boolean = unsupported[Boolean]()
+  override def setHiveAddedJars(arg0: String): Unit = unsupported[Unit]()
+  override def isLocalMetaStore(): Boolean = unsupported[Boolean]()
+  override def reconnect(): Unit = unsupported[Unit]()
+  override def close(): Unit = unsupported[Unit]()
+  override def setMetaConf(arg0: String, arg1: String): Unit = 
unsupported[Unit]()
+  override def getMetaConf(arg0: String): String = unsupported[String]()
+  override def getDatabases(arg0: String): java.util.List[String] = 
unsupported[java.util.List[String]]()
+  override def getAllDatabases(): java.util.List[String] = 
unsupported[java.util.List[String]]()
+  override def getTables(arg0: String, arg1: String): java.util.List[String] = 
unsupported[java.util.List[String]]()
+  override def getTables(arg0: String, arg1: String, arg2: 
org.apache.hadoop.hive.metastore.TableType): java.util.List[String] = 
unsupported[java.util.List[String]]()
+  override def getTableMeta(arg0: String, arg1: String, arg2: 
java.util.List[String]): 
java.util.List[org.apache.hadoop.hive.metastore.api.TableMeta] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.TableMeta]]()
+  override def getAllTables(arg0: String): java.util.List[String] = 
unsupported[java.util.List[String]]()
+  override def listTableNamesByFilter(arg0: String, arg1: String, arg2: 
Short): java.util.List[String] = unsupported[java.util.List[String]]()
+  override def dropTable(arg0: String, arg1: String, arg2: Boolean, arg3: 
Boolean): Unit = unsupported[Unit]()
+  override def dropTable(arg0: String, arg1: String, arg2: Boolean, arg3: 
Boolean, arg4: Boolean): Unit = unsupported[Unit]()
+  override def dropTable(arg0: String, arg1: Boolean): Unit = 
unsupported[Unit]()
+  override def tableExists(arg0: String): Boolean = unsupported[Boolean]()
+  override def getTable(arg0: String): 
org.apache.hadoop.hive.metastore.api.Table = 
unsupported[org.apache.hadoop.hive.metastore.api.Table]()
+  override def getTableObjectsByName(arg0: String, arg1: 
java.util.List[String]): 
java.util.List[org.apache.hadoop.hive.metastore.api.Table] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Table]]()
+  override def appendPartition(arg0: String, arg1: String, arg2: 
java.util.List[String]): org.apache.hadoop.hive.metastore.api.Partition = 
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+  override def appendPartition(arg0: String, arg1: String, arg2: String): 
org.apache.hadoop.hive.metastore.api.Partition = 
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+  override def add_partition(arg0: 
org.apache.hadoop.hive.metastore.api.Partition): 
org.apache.hadoop.hive.metastore.api.Partition = 
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+  override def add_partitions(arg0: 
java.util.List[org.apache.hadoop.hive.metastore.api.Partition]): Int = 
unsupported[Int]()
+  override def add_partitions_pspec(arg0: 
org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy): Int = 
unsupported[Int]()
+  override def getPartition(arg0: String, arg1: String, arg2: 
java.util.List[String]): org.apache.hadoop.hive.metastore.api.Partition = 
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+  override def exchange_partition(arg0: java.util.Map[String, String], arg1: 
String, arg2: String, arg3: String, arg4: String): 
org.apache.hadoop.hive.metastore.api.Partition = 
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+  override def exchange_partitions(arg0: java.util.Map[String, String], arg1: 
String, arg2: String, arg3: String, arg4: String): 
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+  override def getPartition(arg0: String, arg1: String, arg2: String): 
org.apache.hadoop.hive.metastore.api.Partition = 
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+  override def getPartitionWithAuthInfo(arg0: String, arg1: String, arg2: 
java.util.List[String], arg3: String, arg4: java.util.List[String]): 
org.apache.hadoop.hive.metastore.api.Partition = 
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+  override def listPartitionSpecs(arg0: String, arg1: String, arg2: Int): 
org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy = 
unsupported[org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy]()
+  override def listPartitions(arg0: String, arg1: String, arg2: 
java.util.List[String], arg3: Short): 
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+  override def listPartitionNames(arg0: String, arg1: String, arg2: Short): 
java.util.List[String] = unsupported[java.util.List[String]]()
+  override def listPartitionNames(arg0: String, arg1: String, arg2: 
java.util.List[String], arg3: Short): java.util.List[String] = 
unsupported[java.util.List[String]]()
+  override def listPartitionValues(arg0: 
org.apache.hadoop.hive.metastore.api.PartitionValuesRequest): 
org.apache.hadoop.hive.metastore.api.PartitionValuesResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.PartitionValuesResponse]()
+  override def getNumPartitionsByFilter(arg0: String, arg1: String, arg2: 
String): Int = unsupported[Int]()
+  override def listPartitionSpecsByFilter(arg0: String, arg1: String, arg2: 
String, arg3: Int): 
org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy = 
unsupported[org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy]()
+  override def listPartitionsByExpr(arg0: String, arg1: String, arg2: 
Array[Byte], arg3: String, arg4: Short, arg5: 
java.util.List[org.apache.hadoop.hive.metastore.api.Partition]): Boolean = 
unsupported[Boolean]()
+  override def listPartitionsWithAuthInfo(arg0: String, arg1: String, arg2: 
Short, arg3: String, arg4: java.util.List[String]): 
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+  override def getPartitionsByNames(arg0: String, arg1: String, arg2: 
java.util.List[String]): 
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+  override def listPartitionsWithAuthInfo(arg0: String, arg1: String, arg2: 
java.util.List[String], arg3: Short, arg4: String, arg5: 
java.util.List[String]): 
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+  override def markPartitionForEvent(arg0: String, arg1: String, arg2: 
java.util.Map[String, String], arg3: 
org.apache.hadoop.hive.metastore.api.PartitionEventType): Unit = 
unsupported[Unit]()
+  override def isPartitionMarkedForEvent(arg0: String, arg1: String, arg2: 
java.util.Map[String, String], arg3: 
org.apache.hadoop.hive.metastore.api.PartitionEventType): Boolean = 
unsupported[Boolean]()
+  override def validatePartitionNameCharacters(arg0: java.util.List[String]): 
Unit = unsupported[Unit]()
+  override def alter_table(arg0: String, arg1: String, arg2: 
org.apache.hadoop.hive.metastore.api.Table, arg3: Boolean): Unit = 
unsupported[Unit]()
+  override def dropDatabase(arg0: String): Unit = unsupported[Unit]()
+  override def dropDatabase(arg0: String, arg1: Boolean, arg2: Boolean): Unit 
= unsupported[Unit]()
+  override def dropDatabase(arg0: String, arg1: Boolean, arg2: Boolean, arg3: 
Boolean): Unit = unsupported[Unit]()
+  override def alterDatabase(arg0: String, arg1: 
org.apache.hadoop.hive.metastore.api.Database): Unit = unsupported[Unit]()
+  override def dropPartition(arg0: String, arg1: String, arg2: 
java.util.List[String], arg3: Boolean): Boolean = unsupported[Boolean]()
+  override def dropPartition(arg0: String, arg1: String, arg2: 
java.util.List[String], arg3: 
org.apache.hadoop.hive.metastore.PartitionDropOptions): Boolean = 
unsupported[Boolean]()
+  override def dropPartitions(arg0: String, arg1: String, arg2: 
java.util.List[org.apache.hadoop.hive.common.ObjectPair[java.lang.Integer, 
Array[Byte]]], arg3: Boolean, arg4: Boolean): 
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+  override def dropPartitions(arg0: String, arg1: String, arg2: 
java.util.List[org.apache.hadoop.hive.common.ObjectPair[java.lang.Integer, 
Array[Byte]]], arg3: Boolean, arg4: Boolean, arg5: Boolean): 
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+  override def dropPartitions(arg0: String, arg1: String, arg2: 
java.util.List[org.apache.hadoop.hive.common.ObjectPair[java.lang.Integer, 
Array[Byte]]], arg3: org.apache.hadoop.hive.metastore.PartitionDropOptions): 
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+  override def alter_partition(arg0: String, arg1: String, arg2: 
org.apache.hadoop.hive.metastore.api.Partition): Unit = unsupported[Unit]()
+  override def alter_partition(arg0: String, arg1: String, arg2: 
org.apache.hadoop.hive.metastore.api.Partition, arg3: 
org.apache.hadoop.hive.metastore.api.EnvironmentContext): Unit = 
unsupported[Unit]()
+  override def alter_partitions(arg0: String, arg1: String, arg2: 
java.util.List[org.apache.hadoop.hive.metastore.api.Partition]): Unit = 
unsupported[Unit]()
+  override def renamePartition(arg0: String, arg1: String, arg2: 
java.util.List[String], arg3: org.apache.hadoop.hive.metastore.api.Partition): 
Unit = unsupported[Unit]()
+  override def getFields(arg0: String, arg1: String): 
java.util.List[org.apache.hadoop.hive.metastore.api.FieldSchema] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.FieldSchema]]()
+  override def getConfigValue(arg0: String, arg1: String): String = 
unsupported[String]()
+  override def partitionNameToVals(arg0: String): java.util.List[String] = 
unsupported[java.util.List[String]]()
+  override def partitionNameToSpec(arg0: String): java.util.Map[String, 
String] = unsupported[java.util.Map[String, String]]()
+  override def createIndex(arg0: org.apache.hadoop.hive.metastore.api.Index, 
arg1: org.apache.hadoop.hive.metastore.api.Table): Unit = unsupported[Unit]()
+  override def alter_index(arg0: String, arg1: String, arg2: String, arg3: 
org.apache.hadoop.hive.metastore.api.Index): Unit = unsupported[Unit]()
+  override def getIndex(arg0: String, arg1: String, arg2: String): 
org.apache.hadoop.hive.metastore.api.Index = 
unsupported[org.apache.hadoop.hive.metastore.api.Index]()
+  override def listIndexes(arg0: String, arg1: String, arg2: Short): 
java.util.List[org.apache.hadoop.hive.metastore.api.Index] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Index]]()
+  override def listIndexNames(arg0: String, arg1: String, arg2: Short): 
java.util.List[String] = unsupported[java.util.List[String]]()
+  override def dropIndex(arg0: String, arg1: String, arg2: String, arg3: 
Boolean): Boolean = unsupported[Boolean]()
+  override def updateTableColumnStatistics(arg0: 
org.apache.hadoop.hive.metastore.api.ColumnStatistics): Boolean = 
unsupported[Boolean]()
+  override def updatePartitionColumnStatistics(arg0: 
org.apache.hadoop.hive.metastore.api.ColumnStatistics): Boolean = 
unsupported[Boolean]()
+  override def getTableColumnStatistics(arg0: String, arg1: String, arg2: 
java.util.List[String]): 
java.util.List[org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj]]()
+  override def getPartitionColumnStatistics(arg0: String, arg1: String, arg2: 
java.util.List[String], arg3: java.util.List[String]): java.util.Map[String, 
java.util.List[org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj]] = 
unsupported[java.util.Map[String, 
java.util.List[org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj]]]()
+  override def deletePartitionColumnStatistics(arg0: String, arg1: String, 
arg2: String, arg3: String): Boolean = unsupported[Boolean]()
+  override def deleteTableColumnStatistics(arg0: String, arg1: String, arg2: 
String): Boolean = unsupported[Boolean]()
+  override def create_role(arg0: org.apache.hadoop.hive.metastore.api.Role): 
Boolean = unsupported[Boolean]()
+  override def drop_role(arg0: String): Boolean = unsupported[Boolean]()
+  override def listRoleNames(): java.util.List[String] = 
unsupported[java.util.List[String]]()
+  override def grant_role(arg0: String, arg1: String, arg2: 
org.apache.hadoop.hive.metastore.api.PrincipalType, arg3: String, arg4: 
org.apache.hadoop.hive.metastore.api.PrincipalType, arg5: Boolean): Boolean = 
unsupported[Boolean]()
+  override def revoke_role(arg0: String, arg1: String, arg2: 
org.apache.hadoop.hive.metastore.api.PrincipalType, arg3: Boolean): Boolean = 
unsupported[Boolean]()
+  override def list_roles(arg0: String, arg1: 
org.apache.hadoop.hive.metastore.api.PrincipalType): 
java.util.List[org.apache.hadoop.hive.metastore.api.Role] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Role]]()
+  override def get_privilege_set(arg0: 
org.apache.hadoop.hive.metastore.api.HiveObjectRef, arg1: String, arg2: 
java.util.List[String]): 
org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet = 
unsupported[org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet]()
+  override def list_privileges(arg0: String, arg1: 
org.apache.hadoop.hive.metastore.api.PrincipalType, arg2: 
org.apache.hadoop.hive.metastore.api.HiveObjectRef): 
java.util.List[org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege]]()
+  override def grant_privileges(arg0: 
org.apache.hadoop.hive.metastore.api.PrivilegeBag): Boolean = 
unsupported[Boolean]()
+  override def revoke_privileges(arg0: 
org.apache.hadoop.hive.metastore.api.PrivilegeBag, arg1: Boolean): Boolean = 
unsupported[Boolean]()
+  override def getDelegationToken(arg0: String, arg1: String): String = 
unsupported[String]()
+  override def renewDelegationToken(arg0: String): Long = unsupported[Long]()
+  override def cancelDelegationToken(arg0: String): Unit = unsupported[Unit]()
+  override def getTokenStrForm(): String = unsupported[String]()
+  override def addToken(arg0: String, arg1: String): Boolean = 
unsupported[Boolean]()
+  override def removeToken(arg0: String): Boolean = unsupported[Boolean]()
+  override def getToken(arg0: String): String = unsupported[String]()
+  override def getAllTokenIdentifiers(): java.util.List[String] = 
unsupported[java.util.List[String]]()
+  override def addMasterKey(arg0: String): Int = unsupported[Int]()
+  override def updateMasterKey(arg0: java.lang.Integer, arg1: String): Unit = 
unsupported[Unit]()
+  override def removeMasterKey(arg0: java.lang.Integer): Boolean = 
unsupported[Boolean]()
+  override def getMasterKeys(): Array[String] = unsupported[Array[String]]()
+  override def createFunction(arg0: 
org.apache.hadoop.hive.metastore.api.Function): Unit = unsupported[Unit]()
+  override def alterFunction(arg0: String, arg1: String, arg2: 
org.apache.hadoop.hive.metastore.api.Function): Unit = unsupported[Unit]()
+  override def dropFunction(arg0: String, arg1: String): Unit = 
unsupported[Unit]()
+  override def getFunction(arg0: String, arg1: String): 
org.apache.hadoop.hive.metastore.api.Function = 
unsupported[org.apache.hadoop.hive.metastore.api.Function]()
+  override def getFunctions(arg0: String, arg1: String): 
java.util.List[String] = unsupported[java.util.List[String]]()
+  override def getAllFunctions(): 
org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse]()
+  override def getValidTxns(): org.apache.hadoop.hive.common.ValidTxnList = 
unsupported[org.apache.hadoop.hive.common.ValidTxnList]()
+  override def getValidTxns(arg0: Long): 
org.apache.hadoop.hive.common.ValidTxnList = 
unsupported[org.apache.hadoop.hive.common.ValidTxnList]()
+  override def openTxn(arg0: String): Long = unsupported[Long]()
+  override def openTxns(arg0: String, arg1: Int): 
org.apache.hadoop.hive.metastore.api.OpenTxnsResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.OpenTxnsResponse]()
+  override def rollbackTxn(arg0: Long): Unit = unsupported[Unit]()
+  override def commitTxn(arg0: Long): Unit = unsupported[Unit]()
+  override def abortTxns(arg0: java.util.List[java.lang.Long]): Unit = 
unsupported[Unit]()
+  override def showTxns(): 
org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse]()
+  override def lock(arg0: org.apache.hadoop.hive.metastore.api.LockRequest): 
org.apache.hadoop.hive.metastore.api.LockResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.LockResponse]()
+  override def checkLock(arg0: Long): 
org.apache.hadoop.hive.metastore.api.LockResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.LockResponse]()
+  override def unlock(arg0: Long): Unit = unsupported[Unit]()
+  override def showLocks(): 
org.apache.hadoop.hive.metastore.api.ShowLocksResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.ShowLocksResponse]()
+  override def showLocks(arg0: 
org.apache.hadoop.hive.metastore.api.ShowLocksRequest): 
org.apache.hadoop.hive.metastore.api.ShowLocksResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.ShowLocksResponse]()
+  override def heartbeat(arg0: Long, arg1: Long): Unit = unsupported[Unit]()
+  override def heartbeatTxnRange(arg0: Long, arg1: Long): 
org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse]()
+  override def compact(arg0: String, arg1: String, arg2: String, arg3: 
org.apache.hadoop.hive.metastore.api.CompactionType): Unit = unsupported[Unit]()
+  override def compact(arg0: String, arg1: String, arg2: String, arg3: 
org.apache.hadoop.hive.metastore.api.CompactionType, arg4: 
java.util.Map[String, String]): Unit = unsupported[Unit]()
+  override def compact2(arg0: String, arg1: String, arg2: String, arg3: 
org.apache.hadoop.hive.metastore.api.CompactionType, arg4: 
java.util.Map[String, String]): 
org.apache.hadoop.hive.metastore.api.CompactionResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.CompactionResponse]()
+  override def showCompactions(): 
org.apache.hadoop.hive.metastore.api.ShowCompactResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.ShowCompactResponse]()
+  override def addDynamicPartitions(arg0: Long, arg1: String, arg2: String, 
arg3: java.util.List[String]): Unit = unsupported[Unit]()
+  override def addDynamicPartitions(arg0: Long, arg1: String, arg2: String, 
arg3: java.util.List[String], arg4: 
org.apache.hadoop.hive.metastore.api.DataOperationType): Unit = 
unsupported[Unit]()
+  override def insertTable(arg0: org.apache.hadoop.hive.metastore.api.Table, 
arg1: Boolean): Unit = unsupported[Unit]()
+  override def getNextNotification(arg0: Long, arg1: Int, arg2: 
org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter): 
org.apache.hadoop.hive.metastore.api.NotificationEventResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.NotificationEventResponse]()
+  override def getCurrentNotificationEventId(): 
org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId = 
unsupported[org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId]()
+  override def fireListenerEvent(arg0: 
org.apache.hadoop.hive.metastore.api.FireEventRequest): 
org.apache.hadoop.hive.metastore.api.FireEventResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.FireEventResponse]()
+  override def get_principals_in_role(arg0: 
org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest): 
org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse]()
+  override def get_role_grants_for_principal(arg0: 
org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest): 
org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse = 
unsupported[org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse]()
+  override def getAggrColStatsFor(arg0: String, arg1: String, arg2: 
java.util.List[String], arg3: java.util.List[String]): 
org.apache.hadoop.hive.metastore.api.AggrStats = 
unsupported[org.apache.hadoop.hive.metastore.api.AggrStats]()
+  override def setPartitionColumnStatistics(arg0: 
org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest): Boolean = 
unsupported[Boolean]()
+  override def flushCache(): Unit = unsupported[Unit]()
+  override def getFileMetadata(arg0: java.util.List[java.lang.Long]): 
java.lang.Iterable[java.util.Map.Entry[java.lang.Long, java.nio.ByteBuffer]] = 
unsupported[java.lang.Iterable[java.util.Map.Entry[java.lang.Long, 
java.nio.ByteBuffer]]]()
+  override def getFileMetadataBySarg(arg0: java.util.List[java.lang.Long], 
arg1: java.nio.ByteBuffer, arg2: Boolean): 
java.lang.Iterable[java.util.Map.Entry[java.lang.Long, 
org.apache.hadoop.hive.metastore.api.MetadataPpdResult]] = 
unsupported[java.lang.Iterable[java.util.Map.Entry[java.lang.Long, 
org.apache.hadoop.hive.metastore.api.MetadataPpdResult]]]()
+  override def clearFileMetadata(arg0: java.util.List[java.lang.Long]): Unit = 
unsupported[Unit]()
+  override def putFileMetadata(arg0: java.util.List[java.lang.Long], arg1: 
java.util.List[java.nio.ByteBuffer]): Unit = unsupported[Unit]()
+  override def isSameConfObj(arg0: org.apache.hadoop.hive.conf.HiveConf): 
Boolean = unsupported[Boolean]()
+  override def cacheFileMetadata(arg0: String, arg1: String, arg2: String, 
arg3: Boolean): Boolean = unsupported[Boolean]()
+  override def getPrimaryKeys(arg0: 
org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest): 
java.util.List[org.apache.hadoop.hive.metastore.api.SQLPrimaryKey] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.SQLPrimaryKey]]()
+  override def getForeignKeys(arg0: 
org.apache.hadoop.hive.metastore.api.ForeignKeysRequest): 
java.util.List[org.apache.hadoop.hive.metastore.api.SQLForeignKey] = 
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.SQLForeignKey]]()
+  override def createTableWithConstraints(arg0: 
org.apache.hadoop.hive.metastore.api.Table, arg1: 
java.util.List[org.apache.hadoop.hive.metastore.api.SQLPrimaryKey], arg2: 
java.util.List[org.apache.hadoop.hive.metastore.api.SQLForeignKey]): Unit = 
unsupported[Unit]()
+  override def dropConstraint(arg0: String, arg1: String, arg2: String): Unit 
= unsupported[Unit]()
+  override def addPrimaryKey(arg0: 
java.util.List[org.apache.hadoop.hive.metastore.api.SQLPrimaryKey]): Unit = 
unsupported[Unit]()
+  override def addForeignKey(arg0: 
java.util.List[org.apache.hadoop.hive.metastore.api.SQLForeignKey]): Unit = 
unsupported[Unit]()
+  // scalastyle:on
+
+  private def toCatalogTable(table: Table): CatalogTable = {
+    val db = table.getDbName
+    val tbl = table.getTableName
+    val cols = 
Option(table.getSd).map(_.getCols).map(_.asScala.toList).getOrElse(Nil)
+    val partCols = 
Option(table.getPartitionKeys).map(_.asScala.toList).getOrElse(Nil)
+
+    val dataFields = cols.map(fs => StructField(fs.getName, 
CatalystSqlParser.parseDataType(fs.getType), nullable = true, Metadata.empty))
+    val partitionFields = partCols.map(fs => StructField(fs.getName, 
CatalystSqlParser.parseDataType(fs.getType), nullable = true, Metadata.empty))
+
+    CatalogTable(
+      identifier = TableIdentifier(tbl, Some(db)),
+      tableType = if ("EXTERNAL_TABLE".equalsIgnoreCase(table.getTableType)) 
CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat(
+        locationUri = Option(table.getSd).map(_.getLocation).map(new URI(_)),
+        inputFormat = Option(table.getSd).map(_.getInputFormat),
+        outputFormat = Option(table.getSd).map(_.getOutputFormat),
+        serde = Option(table.getSd).flatMap(sd => 
Option(sd.getSerdeInfo)).map(_.getSerializationLib),
+        compressed = false,
+        properties = Option(table.getSd).flatMap(sd => 
Option(sd.getSerdeInfo)).flatMap(si => 
Option(si.getParameters)).map(_.asScala.toMap).getOrElse(Map.empty)),
+      schema = StructType(dataFields ++ partitionFields),
+      provider = Some("hudi"),
+      partitionColumnNames = partCols.map(_.getName),
+      properties = 
Option(table.getParameters).map(_.asScala.toMap).getOrElse(Map.empty))
+  }
+
+  private def fromCatalogTable(table: CatalogTable): Table = {
+    val t = new Table()
+    t.setDbName(table.identifier.database.getOrElse("default"))
+    t.setTableName(table.identifier.table)
+    t.setTableType(if (table.tableType == CatalogTableType.EXTERNAL) 
"EXTERNAL_TABLE" else "MANAGED_TABLE")
+    t.setParameters(new util.HashMap[String, String](table.properties.asJava))
+
+    val nonPartitionFields = table.schema.fields.filterNot(f => 
table.partitionColumnNames.contains(f.name))
+    val cols = nonPartitionFields.map(f => new FieldSchema(f.name, 
f.dataType.catalogString, f.getComment().orNull)).toList.asJava
+    val partCols = table.partitionColumnNames.map { name =>
+      val dt = table.partitionSchema.fields.find(_.name == 
name).map(_.dataType.catalogString).getOrElse("string")
+      new FieldSchema(name, dt, "")
+    }.toList.asJava
+
+    val serdeInfo = new SerDeInfo(null, table.storage.serde.orNull, new 
util.HashMap[String, String](table.storage.properties.asJava))
+    val sd = new StorageDescriptor(cols, 
table.storage.locationUri.map(_.toString).orNull,
+      table.storage.inputFormat.orNull, table.storage.outputFormat.orNull, 
false, 0, serdeInfo, null, null, null)
+    t.setSd(sd)
+    t.setPartitionKeys(partCols)
+    t
+  }
+
+  private def fromCatalogPartition(part: CatalogTablePartition, db: String, 
table: String, partitionKeys: List[String]): Partition = {
+    val values = partitionKeys.map(k => part.spec.getOrElse(k, "")).asJava
+    val serdeInfo = new SerDeInfo()
+    val sd = new StorageDescriptor()
+    sd.setLocation(part.storage.locationUri.map(_.toString).orNull)
+    sd.setInputFormat(part.storage.inputFormat.orNull)
+    sd.setOutputFormat(part.storage.outputFormat.orNull)
+    sd.setSerdeInfo(serdeInfo)
+    new Partition(values, db, table, 0, 0, sd, new util.HashMap[String, 
String]())
+  }
+
+  private def toCatalogPartition(part: Partition): CatalogTablePartition = {
+    val table = getTable(part.getDbName, part.getTableName)
+    val keys = 
Option(table.getPartitionKeys).map(_.asScala.toList).getOrElse(Nil)
+    val values = Option(part.getValues).map(_.asScala.toList).getOrElse(Nil)
+    val spec = keys.zip(values).map { case (k, v) => (k.getName, v) }.toMap
+    CatalogTablePartition(
+      spec = spec,
+      storage = CatalogStorageFormat(
+        locationUri = Option(part.getSd).map(_.getLocation).map(new URI(_)),
+        inputFormat = Option(part.getSd).map(_.getInputFormat),
+        outputFormat = Option(part.getSd).map(_.getOutputFormat),
+        serde = Option(part.getSd).flatMap(sd => 
Option(sd.getSerdeInfo)).map(_.getSerializationLib),
+        compressed = false,
+        properties = Option(part.getSd).flatMap(sd => 
Option(sd.getSerdeInfo)).flatMap(si => 
Option(si.getParameters)).map(_.asScala.toMap).getOrElse(Map.empty)),
+      parameters = 
Option(part.getParameters).map(_.asScala.toMap).getOrElse(Map.empty))
+  }
+
+  private def parsePartitionClause(partName: String): Map[String, String] = {
+    partName.split("/").flatMap { token =>
+      token.split("=").toList match {
+        case k :: v :: Nil =>
+          Some(k.trim -> v.trim.stripPrefix("'").stripSuffix("'"))
+        case _ => None
+      }
+    }.toMap
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hive/TestSparkCatalogMetaStoreClient.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hive/TestSparkCatalogMetaStoreClient.scala
new file mode 100644
index 000000000000..e6385ee93528
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hive/TestSparkCatalogMetaStoreClient.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.hive.HiveSyncConfig
+import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
+
+import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, 
FieldSchema, Partition, SerDeInfo, StorageDescriptor, Table}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.Utils
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertTrue}
+import org.scalactic.source
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import java.io.File
+import java.nio.file.Files
+import java.util
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+class TestSparkCatalogMetaStoreClient extends FunSuite with BeforeAndAfterAll {
+
+  private val warehouseDir = 
Files.createTempDirectory("spark-catalog-metastore-client").toFile
+  private val nameId = new AtomicInteger(0)
+
+  private lazy val spark: SparkSession = {
+    val sparkConf = getSparkConfForTest("TestSparkCatalogMetaStoreClient")
+      .remove("spark.sql.catalog.spark_catalog")
+
+    SparkSession.builder()
+      .config("spark.sql.warehouse.dir", warehouseDir.getCanonicalPath)
+      .config("spark.sql.session.timeZone", "UTC")
+      .config(sparkConf)
+      .enableHiveSupport()
+      .getOrCreate()
+  }
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    SparkSession.setActiveSession(spark)
+    SparkSession.setDefaultSession(spark)
+  }
+
+  override protected def afterAll(): Unit = {
+    SparkSession.clearActiveSession()
+    SparkSession.clearDefaultSession()
+    if (!spark.sparkContext.isStopped) {
+      spark.stop()
+    }
+    Utils.deleteRecursively(warehouseDir)
+    super.afterAll()
+  }
+
+  override protected def test(testName: String, testTags: 
org.scalatest.Tag*)(testFun: => Any)(implicit pos: source.Position): Unit = {
+    super.test(testName, testTags: _*)(
+      try {
+        testFun
+      } finally {
+        
spark.sessionState.catalog.listDatabases().filter(_.startsWith("db_")).foreach 
{ db =>
+          spark.sql(s"drop database if exists $db cascade")
+        }
+      }
+    )
+  }
+
+  test("exercise supported database and table APIs") {
+    withTempDir { tmp =>
+      val client = newClient()
+      val databaseName = generateName("db")
+      val tableName = generateName("tbl")
+
+      client.createDatabase(new Database(databaseName, "test database", new 
File(tmp, databaseName).toURI.toString, new util.HashMap[String, String]()))
+      assertEquals(databaseName, client.getDatabase(databaseName).getName)
+
+      val createdTable = newTable(
+        databaseName,
+        tableName,
+        new File(tmp, tableName).toURI.toString,
+        Seq("id" -> "int", "name" -> "string"),
+        Seq("dt" -> "string"),
+        Map("comment" -> "v1"))
+
+      client.createTable(createdTable)
+
+      assertTrue(client.tableExists(databaseName, tableName))
+      assertEquals(Seq("id", "name", "dt"), client.getSchema(databaseName, 
tableName).asScala.map(_.getName).toSeq)
+      assertEquals("v1", client.getTable(databaseName, 
tableName).getParameters.get("comment"))
+
+      val alteredTable = newTable(
+        databaseName,
+        tableName,
+        new File(tmp, s"${tableName}_v2").toURI.toString,
+        Seq("id" -> "int", "name" -> "string", "age" -> "int"),
+        Seq("dt" -> "string"),
+        Map("comment" -> "v2"))
+
+      client.alter_table(databaseName, tableName, alteredTable)
+      assertEquals(Seq("id", "name", "age", "dt"), 
client.getSchema(databaseName, tableName).asScala.map(_.getName).toSeq)
+      assertEquals("v2", client.getTable(databaseName, 
tableName).getParameters.get("comment"))
+
+      val environmentAlteredTable = newTable(
+        databaseName,
+        tableName,
+        new File(tmp, s"${tableName}_v3").toURI.toString,
+        Seq("id" -> "int", "name" -> "string", "age" -> "int"),
+        Seq("dt" -> "string"),
+        Map("comment" -> "env-context"))
+
+      client.alter_table_with_environmentContext(databaseName, tableName, 
environmentAlteredTable, new EnvironmentContext())
+      assertEquals("env-context", client.getTable(databaseName, 
tableName).getParameters.get("comment"))
+    }
+  }
+
+  test("exercise supported partition and drop APIs") {
+    withTempDir { tmp =>
+      val client = newClient()
+      val databaseName = generateName("db")
+      val tableName = generateName("tbl")
+
+      client.createDatabase(new Database(databaseName, "test database", new 
File(tmp, databaseName).toURI.toString, new util.HashMap[String, String]()))
+      client.createTable(newTable(
+        databaseName,
+        tableName,
+        new File(tmp, tableName).toURI.toString,
+        Seq("id" -> "int"),
+        Seq("dt" -> "string")))
+
+      val partitionOne = newPartition(databaseName, tableName, 
Seq("2024-01-01"), new File(tmp, s"$tableName/dt=2024-01-01").toURI.toString)
+      val partitionTwo = newPartition(databaseName, tableName, 
Seq("2024-01-02"), new File(tmp, s"$tableName/dt=2024-01-02").toURI.toString)
+
+      val added = client.add_partitions(util.Arrays.asList(partitionOne, 
partitionTwo), false, true)
+      assertEquals(2, added.size())
+
+      val listedPartitions = client.listPartitions(databaseName, tableName, 
(-1).toShort).asScala.toSeq
+      assertEquals(Set("2024-01-01", "2024-01-02"), 
listedPartitions.map(_.getValues.get(0)).toSet)
+      assertNotNull(listedPartitions.find(_.getValues.get(0) == 
"2024-01-02").orNull)
+
+      val listedByFilter = client.listPartitionsByFilter(databaseName, 
tableName, "dt='2024-01-02'", (-1).toShort).asScala.toSeq
+      assertEquals(Set("2024-01-01", "2024-01-02"), 
listedByFilter.map(_.getValues.get(0)).toSet)
+
+      val alteredPartition = newPartition(
+        databaseName,
+        tableName,
+        Seq("2024-01-02"),
+        new File(tmp, s"$tableName/dt=2024-01-02-updated").toURI.toString)
+      client.alter_partitions(databaseName, tableName, 
util.Collections.singletonList(alteredPartition), new EnvironmentContext())
+
+      val updatedLocation = client.listPartitions(databaseName, tableName, 
(-1).toShort).asScala
+        .find(_.getValues.get(0) == "2024-01-02")
+        .map(_.getSd.getLocation)
+        .orNull
+      assertTrue(updatedLocation.endsWith("dt=2024-01-02-updated"))
+
+      assertTrue(client.dropPartition(databaseName, tableName, 
"dt=2024-01-01", false))
+      val remainingPartitions = client.listPartitions(databaseName, tableName, 
(-1).toShort).asScala.toSeq
+      assertEquals(Seq("2024-01-02"), 
remainingPartitions.map(_.getValues.get(0)))
+
+      client.dropTable(databaseName, tableName)
+      assertFalse(client.tableExists(databaseName, tableName))
+    }
+  }
+
+  private def newClient(): SparkCatalogMetaStoreClient = {
+    SparkSession.setActiveSession(spark)
+    SparkSession.setDefaultSession(spark)
+    new SparkCatalogMetaStoreClient(new HiveSyncConfig(new TypedProperties()))
+  }
+
+  private def newTable(databaseName: String,
+                       tableName: String,
+                       location: String,
+                       columns: Seq[(String, String)],
+                       partitionColumns: Seq[(String, String)],
+                       parameters: Map[String, String] = Map.empty): Table = {
+    val table = new Table()
+    table.setDbName(databaseName)
+    table.setTableName(tableName)
+    table.setTableType("EXTERNAL_TABLE")
+    table.setParameters(new util.HashMap[String, String](parameters.asJava))
+    table.setPartitionKeys(partitionColumns.map { case (name, dataType) => 
fieldSchema(name, dataType) }.asJava)
+
+    val serdeInfo = new SerDeInfo()
+    serdeInfo.setParameters(new util.HashMap[String, String]())
+
+    val storageDescriptor = new StorageDescriptor()
+    storageDescriptor.setCols(columns.map { case (name, dataType) => 
fieldSchema(name, dataType) }.asJava)
+    storageDescriptor.setLocation(location)
+    storageDescriptor.setSerdeInfo(serdeInfo)
+    table.setSd(storageDescriptor)
+    table
+  }
+
+  private def newPartition(databaseName: String,
+                           tableName: String,
+                           values: Seq[String],
+                           location: String): Partition = {
+    val partition = new Partition()
+    partition.setDbName(databaseName)
+    partition.setTableName(tableName)
+    partition.setValues(values.asJava)
+    partition.setParameters(new util.HashMap[String, String]())
+
+    val serdeInfo = new SerDeInfo()
+    serdeInfo.setParameters(new util.HashMap[String, String]())
+
+    val storageDescriptor = new StorageDescriptor()
+    storageDescriptor.setLocation(location)
+    storageDescriptor.setSerdeInfo(serdeInfo)
+    partition.setSd(storageDescriptor)
+    partition
+  }
+
+  private def fieldSchema(name: String, dataType: String): FieldSchema = {
+    new FieldSchema(name, dataType, "")
+  }
+
+  private def withTempDir(f: File => Unit): Unit = {
+    val tempDir = Utils.createTempDir()
+    try {
+      f(tempDir)
+    } finally {
+      Utils.deleteRecursively(tempDir)
+    }
+  }
+
+  private def generateName(prefix: String): String = {
+    s"${prefix}_${nameId.incrementAndGet()}"
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSparkCatalogSync.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSparkCatalogSync.scala
new file mode 100644
index 000000000000..8f644d263a39
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSparkCatalogSync.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.ddl
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hive.{HiveStylePartitionValueExtractor, 
HiveSyncConfigHolder, HiveSyncTool}
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH, 
META_SYNC_DATABASE_NAME, META_SYNC_PARTITION_EXTRACTOR_CLASS, 
META_SYNC_PARTITION_FIELDS, META_SYNC_TABLE_NAME}
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
+
+class TestSparkCatalogSync extends HoodieSparkSqlTestBase {
+
+  private val sparkCatalogSyncKey = 
HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG.key()
+
+  test("Test Spark catalog sync with partition lifecycle") {
+    withTempDir { tmp =>
+      import spark.implicits._
+
+      val tableName = generateTableName
+      val databaseName = "testdb"
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+      val syncProps = buildSyncProps(databaseName, tableName, basePath)
+      syncProps.setProperty(sparkCatalogSyncKey, "true")
+
+      try {
+        spark.sql(s"create database if not exists $databaseName")
+        writeToHudi(
+          Seq((1, "a1", 1000L, "2024-01-01"), (2, "a2", 1001L, "2024-01-02"))
+            .toDF("id", "name", "ts", "dt"),
+          tableName,
+          basePath,
+          SaveMode.Overwrite)
+
+        assertFalse(spark.catalog.tableExists(databaseName, tableName), "Table 
should not exist before sync")
+        syncOnce(syncProps)
+        assertTrue(spark.catalog.tableExists(databaseName, tableName), "Table 
should exist after sync")
+        assertTrue(spark.sql(s"show partitions 
$databaseName.$tableName").count() == 2, "Initial partitions should be 
registered")
+
+        writeToHudi(
+          Seq((3, "a3", 1002L, "2024-01-03")).toDF("id", "name", "ts", "dt"),
+          tableName,
+          basePath,
+          SaveMode.Append)
+        syncOnce(syncProps)
+        assertTrue(spark.sql(s"show partitions 
$databaseName.$tableName").count() == 3, "New partition should be registered 
after append")
+
+        spark.sql(s"alter table $databaseName.$tableName drop partition 
(dt='2024-01-03')")
+        val partitionRows = spark.sql(s"show partitions 
$databaseName.$tableName").collect().map(_.getString(0))
+        assertFalse(partitionRows.contains("dt=2024-01-03"), "Dropped 
partition should not exist in catalog")
+      } finally {
+        spark.sql(s"drop table if exists $databaseName.$tableName")
+      }
+    }
+  }
+
+  test("Test Spark catalog sync with schema evolution") {
+    withTempDir { tmp =>
+      import spark.implicits._
+
+      val tableName = generateTableName
+      val databaseName = "testdb"
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+      val syncProps = buildSyncProps(databaseName, tableName, basePath)
+      syncProps.setProperty(sparkCatalogSyncKey, "true")
+
+      try {
+        spark.sql(s"create database if not exists $databaseName")
+        writeToHudi(
+          Seq((1, "a1", 1000L, "2024-01-01")).toDF("id", "name", "ts", "dt"),
+          tableName,
+          basePath,
+          SaveMode.Overwrite)
+
+        assertFalse(spark.catalog.tableExists(databaseName, tableName), "Table 
should not exist before sync")
+        syncOnce(syncProps)
+        assertFalse(hasColumn(databaseName, tableName, "age"), "Initial schema 
should not contain age")
+
+        writeToHudi(
+          Seq((2, "a2", 1001L, "2024-01-02", 21)).toDF("id", "name", "ts", 
"dt", "age"),
+          tableName,
+          basePath,
+          SaveMode.Append,
+          reconcileSchema = true)
+
+        syncOnce(syncProps)
+        assertTrue(hasColumn(databaseName, tableName, "age"), "Catalog schema 
should be updated with new column age")
+      } finally {
+        spark.sql(s"drop table if exists $databaseName.$tableName")
+      }
+    }
+  }
+
+  private def syncOnce(syncProps: TypedProperties): Unit = {
+    val syncTool = new HiveSyncTool(syncProps, new HiveConf())
+    try {
+      syncTool.syncHoodieTable()
+    } finally {
+      syncTool.close()
+    }
+  }
+
+  private def buildSyncProps(databaseName: String, tableName: String, 
basePath: String): TypedProperties = {
+    val syncProps = new TypedProperties()
+    syncProps.setProperty(META_SYNC_DATABASE_NAME.key, databaseName)
+    syncProps.setProperty(META_SYNC_TABLE_NAME.key, tableName)
+    syncProps.setProperty(META_SYNC_BASE_PATH.key, basePath)
+    syncProps.setProperty(META_SYNC_PARTITION_FIELDS.key, "dt")
+    syncProps.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key, 
classOf[HiveStylePartitionValueExtractor].getName)
+    
syncProps.setProperty(org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE.key(),
 HiveSyncMode.HMS.name())
+    syncProps.setProperty(sparkCatalogSyncKey, "true")
+    syncProps
+  }
+
+  private def writeToHudi(df: DataFrame,
+                          tableName: String,
+                          basePath: String,
+                          mode: SaveMode,
+                          reconcileSchema: Boolean = false): Unit = {
+    val writer = df.write
+      .format("hudi")
+      .option(HoodieWriteConfig.TBL_NAME.key, tableName)
+      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
+      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
+      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "dt")
+      .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key, "true")
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, "COPY_ON_WRITE")
+
+    val finalWriter = if (reconcileSchema) {
+      writer.option("hoodie.datasource.write.reconcile.schema", "true")
+    } else {
+      writer
+    }
+    finalWriter.mode(mode).save(basePath)
+  }
+
+  private def hasColumn(databaseName: String, tableName: String, columnName: 
String): Boolean = {
+    spark.catalog.listColumns(databaseName, 
tableName).collect().exists(_.name.equalsIgnoreCase(columnName))
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index adcc27cc2357..af1d5e783976 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -68,6 +68,7 @@ public class HiveSyncConfig extends HoodieSyncConfig {
   public static final ConfigProperty<Boolean> HIVE_SYNC_OMIT_METADATA_FIELDS = 
HiveSyncConfigHolder.HIVE_SYNC_OMIT_METADATA_FIELDS;
   public static final ConfigProperty<Integer> HIVE_BATCH_SYNC_PARTITION_NUM = 
HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
   public static final ConfigProperty<String> HIVE_SYNC_MODE = 
HiveSyncConfigHolder.HIVE_SYNC_MODE;
+  public static final ConfigProperty<Boolean> HIVE_SYNC_USE_SPARK_CATALOG = 
HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG;
   public static final ConfigProperty<Boolean> HIVE_SYNC_BUCKET_SYNC = 
HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
   public static final ConfigProperty<String> HIVE_SYNC_BUCKET_SYNC_SPEC = 
HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
   public static final ConfigProperty<String> HIVE_SYNC_COMMENT = 
HiveSyncConfigHolder.HIVE_SYNC_COMMENT;
@@ -145,6 +146,8 @@ public class HiveSyncConfig extends HoodieSyncConfig {
     public String metastoreUris;
     @Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive 
ops. Valid values are hms,glue,jdbc and hiveql")
     public String syncMode;
+    @Parameter(names = {"--use-spark-catalog"}, description = "Whether to use 
Spark catalog backed metastore client.")
+    public Boolean useSparkCatalog;
     @Parameter(names = {"--auto-create-database"}, description = "Auto create 
hive database")
     public Boolean autoCreateDatabase;
     @Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive 
exceptions")
@@ -189,6 +192,7 @@ public class HiveSyncConfig extends HoodieSyncConfig {
       props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
       props.setPropertyIfNonNull(HIVE_USE_JDBC.key(), useJdbc);
       props.setPropertyIfNonNull(HIVE_SYNC_MODE.key(), syncMode);
+      props.setPropertyIfNonNull(HIVE_SYNC_USE_SPARK_CATALOG.key(), 
useSparkCatalog);
       props.setPropertyIfNonNull(METASTORE_URIS.key(), metastoreUris);
       props.setPropertyIfNonNull(HIVE_AUTO_CREATE_DATABASE.key(), 
autoCreateDatabase);
       props.setPropertyIfNonNull(HIVE_IGNORE_EXCEPTIONS.key(), 
ignoreExceptions);
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java
index 1eb95cf5529e..418676d59162 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java
@@ -150,4 +150,10 @@ public class HiveSyncConfigHolder {
       .markAdvanced()
       .sinceVersion("0.13.0")
       .withDocumentation("Hive table synchronization strategy. Available 
option: RO, RT, ALL.");
+
+  public static final ConfigProperty<Boolean> HIVE_SYNC_USE_SPARK_CATALOG = 
ConfigProperty
+      .key("hoodie.datasource.hive_sync.use_spark_catalog")
+      .defaultValue(false)
+      .markAdvanced()
+      .withDocumentation("Use Spark catalog backed IMetaStoreClient 
implementation for hive sync.");
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 4da60290d597..892fe9350abe 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.MapUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
@@ -66,6 +67,7 @@ import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormat
 import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName;
 import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
+import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
 import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
@@ -113,7 +115,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
     // Support JDBC, HiveQL and metastore based implementations for backwards 
compatibility. Future users should
     // disable jdbc and depend on metastore client for all hive registrations
     try {
-      this.client = IMetaStoreClientUtil.getMSC(config.getHiveConf());
+      this.client = createMetaStoreClient(config);
       setMetaConf(config.getHiveConf());
       if (!StringUtils.isNullOrEmpty(config.getString(HIVE_SYNC_MODE))) {
         HiveSyncMode syncMode = 
HiveSyncMode.of(config.getString(HIVE_SYNC_MODE));
@@ -185,6 +187,20 @@ public class HoodieHiveSyncClient extends HoodieSyncClient 
{
     return false;
   }
 
+  private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) {
+    try {
+      if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) {
+        return (IMetaStoreClient) ReflectionUtils.loadClass(
+            "org.apache.spark.sql.hive.SparkCatalogMetaStoreClient",
+            new Class<?>[] {HiveSyncConfig.class},
+            config);
+      }
+      return IMetaStoreClientUtil.getMSC(config.getHiveConf());
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to create 
HiveMetaStoreClient", e);
+    }
+  }
+
   private Table getInitialTable(String table) {
     return initialTableByName.computeIfAbsent(table, t -> {
       try {

Reply via email to