This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 331b018d0cfc feat(hive-sync): add Spark-catalog based metastore client
implementation to avoid Hive-on-Spark classloader issues (#18203)
331b018d0cfc is described below
commit 331b018d0cfc06d1888d3c1de8498cb677441e90
Author: Surya Prasanna <[email protected]>
AuthorDate: Sun Mar 22 20:40:47 2026 -0700
feat(hive-sync): add Spark-catalog based metastore client implementation to
avoid Hive-on-Spark classloader issues (#18203)
* Create SparkCatalogSyncMetastoreClient to support hive-sync on
Hive-on-Spark environment
* Add unit test for SparkCatalogMetaStoreClient class
---
.../sql/hive/SparkCatalogMetaStoreClient.scala | 380 +++++++++++++++++++++
.../sql/hive/TestSparkCatalogMetaStoreClient.scala | 246 +++++++++++++
.../spark/sql/hudi/ddl/TestSparkCatalogSync.scala | 162 +++++++++
.../java/org/apache/hudi/hive/HiveSyncConfig.java | 4 +
.../org/apache/hudi/hive/HiveSyncConfigHolder.java | 6 +
.../org/apache/hudi/hive/HoodieHiveSyncClient.java | 18 +-
6 files changed, 815 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hive/SparkCatalogMetaStoreClient.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hive/SparkCatalogMetaStoreClient.scala
new file mode 100644
index 000000000000..d238341bed46
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hive/SparkCatalogMetaStoreClient.scala
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hudi.hive.HiveSyncConfig
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient
+import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext,
FieldSchema, Partition, SerDeInfo, StorageDescriptor, Table}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase,
CatalogStorageFormat, CatalogTable, CatalogTablePartition, CatalogTableType}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.{Metadata, StructField, StructType}
+
+import java.net.URI
+import java.util
+
+import scala.collection.JavaConverters._
+
+/**
+ * IMetaStoreClient implementation backed by Spark catalog/external-catalog
APIs for
+ * methods used by HoodieHiveSyncClient/HMSDDLExecutor.
+ */
+class SparkCatalogMetaStoreClient(syncConfig: HiveSyncConfig)
+ extends IMetaStoreClient {
+
+ private val sparkSession =
SparkSession.getActiveSession.getOrElse(SparkSession.builder()
+ .enableHiveSupport()
+ .getOrCreate())
+
+ private val externalCatalog =
sparkSession.sessionState.catalog.externalCatalog
+
+ override def createDatabase(database: Database): Unit = {
+ val catalogDb = CatalogDatabase(
+ name = database.getName,
+ description = Option(database.getDescription).getOrElse(""),
+ locationUri = Option(database.getLocationUri).map(new URI(_))
+ .getOrElse(new URI(sparkSession.sessionState.conf.warehousePath)),
+ properties =
Option(database.getParameters).map(_.asScala.toMap).getOrElse(Map.empty))
+ externalCatalog.createDatabase(catalogDb, ignoreIfExists = false)
+ }
+
+ override def createTable(table: Table): Unit = {
+ externalCatalog.createTable(toCatalogTable(table), ignoreIfExists = false)
+ }
+
+ override def getTable(dbName: String, tableName: String): Table = {
+ fromCatalogTable(externalCatalog.getTable(dbName, tableName))
+ }
+
+ // scalastyle:off method.name
+ override def alter_table(dbName: String, tableName: String, table: Table):
Unit = {
+ val updated = toCatalogTable(table).copy(identifier =
TableIdentifier(tableName, Some(dbName)))
+ externalCatalog.alterTable(updated)
+ }
+
+ override def alter_table_with_environmentContext(dbName: String,
+ tableName: String,
+ table: Table,
+ environmentContext:
EnvironmentContext): Unit = {
+ alter_table(dbName, tableName, table)
+ }
+
+ override def listPartitions(dbName: String, tableName: String, max: Short):
util.List[Partition] = {
+ val table = getTable(dbName, tableName)
+ val partitionKeys = table.getPartitionKeys.asScala.map(_.getName).toList
+ externalCatalog.listPartitions(dbName, tableName,
None).map(fromCatalogPartition(_, dbName, tableName, partitionKeys)).asJava
+ }
+
+ override def listPartitionsByFilter(dbName: String,
+ tableName: String,
+ filter: String,
+ max: Short): util.List[Partition] = {
+ // Spark external catalog does not expose Hive filter-string API; fall
back to listing all.
+ listPartitions(dbName, tableName, max)
+ }
+
+ override def add_partitions(parts: util.List[Partition], ifNotExists:
Boolean, needResults: Boolean): util.List[Partition] = {
+ if (parts == null || parts.isEmpty) {
+ new util.ArrayList[Partition]()
+ } else {
+ val first = parts.get(0)
+ val db = first.getDbName
+ val tbl = first.getTableName
+ val catalogParts = parts.asScala.map(toCatalogPartition).toSeq
+ externalCatalog.createPartitions(db, tbl, catalogParts, ignoreIfExists =
ifNotExists)
+ if (needResults) parts else new util.ArrayList[Partition]()
+ }
+ }
+
+ override def alter_partitions(dbName: String,
+ tableName: String,
+ newParts: util.List[Partition],
+ environmentContext: EnvironmentContext): Unit
= {
+ externalCatalog.alterPartitions(dbName, tableName,
newParts.asScala.map(toCatalogPartition).toSeq)
+ }
+
+ override def dropPartition(dbName: String, tableName: String, partName:
String, deleteData: Boolean): Boolean = {
+ val spec = parsePartitionClause(partName)
+ externalCatalog.dropPartitions(dbName, tableName, Seq(spec),
ignoreIfNotExists = true, purge = true, retainData = !deleteData)
+ true
+ }
+ // scalastyle:on method.name
+
+ override def tableExists(dbName: String, tableName: String): Boolean = {
+ sparkSession.catalog.tableExists(dbName, tableName)
+ }
+
+ override def getDatabase(dbName: String): Database = {
+ val db = externalCatalog.getDatabase(dbName)
+ new Database(db.name, db.description, db.locationUri.toString,
db.properties.asJava)
+ }
+
+ override def getSchema(dbName: String, tableName: String):
util.List[FieldSchema] = {
+ val table = externalCatalog.getTable(dbName, tableName)
+ val cols = table.schema.fields.map { f =>
+ new FieldSchema(f.name, f.dataType.catalogString,
Option(f.getComment()).map(_.toString).getOrElse(""))
+ }
+ val partitionCols = table.partitionColumnNames.map { name =>
+ val dt = table.partitionSchema.fields.find(_.name ==
name).map(_.dataType.catalogString).getOrElse("string")
+ new FieldSchema(name, dt, "")
+ }
+ (cols ++ partitionCols).toList.asJava
+ }
+
+ override def dropTable(dbName: String, tableName: String): Unit = {
+ externalCatalog.dropTable(dbName, tableName, ignoreIfNotExists = true,
purge = true)
+ }
+
+ // scalastyle:off
+ private def unsupported[T](): T = {
+ throw new UnsupportedOperationException("Method is not supported in
SparkCatalogMetaStoreClient")
+ }
+
+ override def isCompatibleWith(arg0: org.apache.hadoop.hive.conf.HiveConf):
Boolean = unsupported[Boolean]()
+ override def setHiveAddedJars(arg0: String): Unit = unsupported[Unit]()
+ override def isLocalMetaStore(): Boolean = unsupported[Boolean]()
+ override def reconnect(): Unit = unsupported[Unit]()
+ override def close(): Unit = unsupported[Unit]()
+ override def setMetaConf(arg0: String, arg1: String): Unit =
unsupported[Unit]()
+ override def getMetaConf(arg0: String): String = unsupported[String]()
+ override def getDatabases(arg0: String): java.util.List[String] =
unsupported[java.util.List[String]]()
+ override def getAllDatabases(): java.util.List[String] =
unsupported[java.util.List[String]]()
+ override def getTables(arg0: String, arg1: String): java.util.List[String] =
unsupported[java.util.List[String]]()
+ override def getTables(arg0: String, arg1: String, arg2:
org.apache.hadoop.hive.metastore.TableType): java.util.List[String] =
unsupported[java.util.List[String]]()
+ override def getTableMeta(arg0: String, arg1: String, arg2:
java.util.List[String]):
java.util.List[org.apache.hadoop.hive.metastore.api.TableMeta] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.TableMeta]]()
+ override def getAllTables(arg0: String): java.util.List[String] =
unsupported[java.util.List[String]]()
+ override def listTableNamesByFilter(arg0: String, arg1: String, arg2:
Short): java.util.List[String] = unsupported[java.util.List[String]]()
+ override def dropTable(arg0: String, arg1: String, arg2: Boolean, arg3:
Boolean): Unit = unsupported[Unit]()
+ override def dropTable(arg0: String, arg1: String, arg2: Boolean, arg3:
Boolean, arg4: Boolean): Unit = unsupported[Unit]()
+ override def dropTable(arg0: String, arg1: Boolean): Unit =
unsupported[Unit]()
+ override def tableExists(arg0: String): Boolean = unsupported[Boolean]()
+ override def getTable(arg0: String):
org.apache.hadoop.hive.metastore.api.Table =
unsupported[org.apache.hadoop.hive.metastore.api.Table]()
+ override def getTableObjectsByName(arg0: String, arg1:
java.util.List[String]):
java.util.List[org.apache.hadoop.hive.metastore.api.Table] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Table]]()
+ override def appendPartition(arg0: String, arg1: String, arg2:
java.util.List[String]): org.apache.hadoop.hive.metastore.api.Partition =
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+ override def appendPartition(arg0: String, arg1: String, arg2: String):
org.apache.hadoop.hive.metastore.api.Partition =
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+ override def add_partition(arg0:
org.apache.hadoop.hive.metastore.api.Partition):
org.apache.hadoop.hive.metastore.api.Partition =
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+ override def add_partitions(arg0:
java.util.List[org.apache.hadoop.hive.metastore.api.Partition]): Int =
unsupported[Int]()
+ override def add_partitions_pspec(arg0:
org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy): Int =
unsupported[Int]()
+ override def getPartition(arg0: String, arg1: String, arg2:
java.util.List[String]): org.apache.hadoop.hive.metastore.api.Partition =
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+ override def exchange_partition(arg0: java.util.Map[String, String], arg1:
String, arg2: String, arg3: String, arg4: String):
org.apache.hadoop.hive.metastore.api.Partition =
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+ override def exchange_partitions(arg0: java.util.Map[String, String], arg1:
String, arg2: String, arg3: String, arg4: String):
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+ override def getPartition(arg0: String, arg1: String, arg2: String):
org.apache.hadoop.hive.metastore.api.Partition =
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+ override def getPartitionWithAuthInfo(arg0: String, arg1: String, arg2:
java.util.List[String], arg3: String, arg4: java.util.List[String]):
org.apache.hadoop.hive.metastore.api.Partition =
unsupported[org.apache.hadoop.hive.metastore.api.Partition]()
+ override def listPartitionSpecs(arg0: String, arg1: String, arg2: Int):
org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy =
unsupported[org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy]()
+ override def listPartitions(arg0: String, arg1: String, arg2:
java.util.List[String], arg3: Short):
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+ override def listPartitionNames(arg0: String, arg1: String, arg2: Short):
java.util.List[String] = unsupported[java.util.List[String]]()
+ override def listPartitionNames(arg0: String, arg1: String, arg2:
java.util.List[String], arg3: Short): java.util.List[String] =
unsupported[java.util.List[String]]()
+ override def listPartitionValues(arg0:
org.apache.hadoop.hive.metastore.api.PartitionValuesRequest):
org.apache.hadoop.hive.metastore.api.PartitionValuesResponse =
unsupported[org.apache.hadoop.hive.metastore.api.PartitionValuesResponse]()
+ override def getNumPartitionsByFilter(arg0: String, arg1: String, arg2:
String): Int = unsupported[Int]()
+ override def listPartitionSpecsByFilter(arg0: String, arg1: String, arg2:
String, arg3: Int):
org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy =
unsupported[org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy]()
+ override def listPartitionsByExpr(arg0: String, arg1: String, arg2:
Array[Byte], arg3: String, arg4: Short, arg5:
java.util.List[org.apache.hadoop.hive.metastore.api.Partition]): Boolean =
unsupported[Boolean]()
+ override def listPartitionsWithAuthInfo(arg0: String, arg1: String, arg2:
Short, arg3: String, arg4: java.util.List[String]):
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+ override def getPartitionsByNames(arg0: String, arg1: String, arg2:
java.util.List[String]):
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+ override def listPartitionsWithAuthInfo(arg0: String, arg1: String, arg2:
java.util.List[String], arg3: Short, arg4: String, arg5:
java.util.List[String]):
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+ override def markPartitionForEvent(arg0: String, arg1: String, arg2:
java.util.Map[String, String], arg3:
org.apache.hadoop.hive.metastore.api.PartitionEventType): Unit =
unsupported[Unit]()
+ override def isPartitionMarkedForEvent(arg0: String, arg1: String, arg2:
java.util.Map[String, String], arg3:
org.apache.hadoop.hive.metastore.api.PartitionEventType): Boolean =
unsupported[Boolean]()
+ override def validatePartitionNameCharacters(arg0: java.util.List[String]):
Unit = unsupported[Unit]()
+ override def alter_table(arg0: String, arg1: String, arg2:
org.apache.hadoop.hive.metastore.api.Table, arg3: Boolean): Unit =
unsupported[Unit]()
+ override def dropDatabase(arg0: String): Unit = unsupported[Unit]()
+ override def dropDatabase(arg0: String, arg1: Boolean, arg2: Boolean): Unit
= unsupported[Unit]()
+ override def dropDatabase(arg0: String, arg1: Boolean, arg2: Boolean, arg3:
Boolean): Unit = unsupported[Unit]()
+ override def alterDatabase(arg0: String, arg1:
org.apache.hadoop.hive.metastore.api.Database): Unit = unsupported[Unit]()
+ override def dropPartition(arg0: String, arg1: String, arg2:
java.util.List[String], arg3: Boolean): Boolean = unsupported[Boolean]()
+ override def dropPartition(arg0: String, arg1: String, arg2:
java.util.List[String], arg3:
org.apache.hadoop.hive.metastore.PartitionDropOptions): Boolean =
unsupported[Boolean]()
+ override def dropPartitions(arg0: String, arg1: String, arg2:
java.util.List[org.apache.hadoop.hive.common.ObjectPair[java.lang.Integer,
Array[Byte]]], arg3: Boolean, arg4: Boolean):
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+ override def dropPartitions(arg0: String, arg1: String, arg2:
java.util.List[org.apache.hadoop.hive.common.ObjectPair[java.lang.Integer,
Array[Byte]]], arg3: Boolean, arg4: Boolean, arg5: Boolean):
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+ override def dropPartitions(arg0: String, arg1: String, arg2:
java.util.List[org.apache.hadoop.hive.common.ObjectPair[java.lang.Integer,
Array[Byte]]], arg3: org.apache.hadoop.hive.metastore.PartitionDropOptions):
java.util.List[org.apache.hadoop.hive.metastore.api.Partition] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Partition]]()
+ override def alter_partition(arg0: String, arg1: String, arg2:
org.apache.hadoop.hive.metastore.api.Partition): Unit = unsupported[Unit]()
+ override def alter_partition(arg0: String, arg1: String, arg2:
org.apache.hadoop.hive.metastore.api.Partition, arg3:
org.apache.hadoop.hive.metastore.api.EnvironmentContext): Unit =
unsupported[Unit]()
+ override def alter_partitions(arg0: String, arg1: String, arg2:
java.util.List[org.apache.hadoop.hive.metastore.api.Partition]): Unit =
unsupported[Unit]()
+ override def renamePartition(arg0: String, arg1: String, arg2:
java.util.List[String], arg3: org.apache.hadoop.hive.metastore.api.Partition):
Unit = unsupported[Unit]()
+ override def getFields(arg0: String, arg1: String):
java.util.List[org.apache.hadoop.hive.metastore.api.FieldSchema] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.FieldSchema]]()
+ override def getConfigValue(arg0: String, arg1: String): String =
unsupported[String]()
+ override def partitionNameToVals(arg0: String): java.util.List[String] =
unsupported[java.util.List[String]]()
+ override def partitionNameToSpec(arg0: String): java.util.Map[String,
String] = unsupported[java.util.Map[String, String]]()
+ override def createIndex(arg0: org.apache.hadoop.hive.metastore.api.Index,
arg1: org.apache.hadoop.hive.metastore.api.Table): Unit = unsupported[Unit]()
+ override def alter_index(arg0: String, arg1: String, arg2: String, arg3:
org.apache.hadoop.hive.metastore.api.Index): Unit = unsupported[Unit]()
+ override def getIndex(arg0: String, arg1: String, arg2: String):
org.apache.hadoop.hive.metastore.api.Index =
unsupported[org.apache.hadoop.hive.metastore.api.Index]()
+ override def listIndexes(arg0: String, arg1: String, arg2: Short):
java.util.List[org.apache.hadoop.hive.metastore.api.Index] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Index]]()
+ override def listIndexNames(arg0: String, arg1: String, arg2: Short):
java.util.List[String] = unsupported[java.util.List[String]]()
+ override def dropIndex(arg0: String, arg1: String, arg2: String, arg3:
Boolean): Boolean = unsupported[Boolean]()
+ override def updateTableColumnStatistics(arg0:
org.apache.hadoop.hive.metastore.api.ColumnStatistics): Boolean =
unsupported[Boolean]()
+ override def updatePartitionColumnStatistics(arg0:
org.apache.hadoop.hive.metastore.api.ColumnStatistics): Boolean =
unsupported[Boolean]()
+ override def getTableColumnStatistics(arg0: String, arg1: String, arg2:
java.util.List[String]):
java.util.List[org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj]]()
+ override def getPartitionColumnStatistics(arg0: String, arg1: String, arg2:
java.util.List[String], arg3: java.util.List[String]): java.util.Map[String,
java.util.List[org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj]] =
unsupported[java.util.Map[String,
java.util.List[org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj]]]()
+ override def deletePartitionColumnStatistics(arg0: String, arg1: String,
arg2: String, arg3: String): Boolean = unsupported[Boolean]()
+ override def deleteTableColumnStatistics(arg0: String, arg1: String, arg2:
String): Boolean = unsupported[Boolean]()
+ override def create_role(arg0: org.apache.hadoop.hive.metastore.api.Role):
Boolean = unsupported[Boolean]()
+ override def drop_role(arg0: String): Boolean = unsupported[Boolean]()
+ override def listRoleNames(): java.util.List[String] =
unsupported[java.util.List[String]]()
+ override def grant_role(arg0: String, arg1: String, arg2:
org.apache.hadoop.hive.metastore.api.PrincipalType, arg3: String, arg4:
org.apache.hadoop.hive.metastore.api.PrincipalType, arg5: Boolean): Boolean =
unsupported[Boolean]()
+ override def revoke_role(arg0: String, arg1: String, arg2:
org.apache.hadoop.hive.metastore.api.PrincipalType, arg3: Boolean): Boolean =
unsupported[Boolean]()
+ override def list_roles(arg0: String, arg1:
org.apache.hadoop.hive.metastore.api.PrincipalType):
java.util.List[org.apache.hadoop.hive.metastore.api.Role] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.Role]]()
+ override def get_privilege_set(arg0:
org.apache.hadoop.hive.metastore.api.HiveObjectRef, arg1: String, arg2:
java.util.List[String]):
org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet =
unsupported[org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet]()
+ override def list_privileges(arg0: String, arg1:
org.apache.hadoop.hive.metastore.api.PrincipalType, arg2:
org.apache.hadoop.hive.metastore.api.HiveObjectRef):
java.util.List[org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege]]()
+ override def grant_privileges(arg0:
org.apache.hadoop.hive.metastore.api.PrivilegeBag): Boolean =
unsupported[Boolean]()
+ override def revoke_privileges(arg0:
org.apache.hadoop.hive.metastore.api.PrivilegeBag, arg1: Boolean): Boolean =
unsupported[Boolean]()
+ override def getDelegationToken(arg0: String, arg1: String): String =
unsupported[String]()
+ override def renewDelegationToken(arg0: String): Long = unsupported[Long]()
+ override def cancelDelegationToken(arg0: String): Unit = unsupported[Unit]()
+ override def getTokenStrForm(): String = unsupported[String]()
+ override def addToken(arg0: String, arg1: String): Boolean =
unsupported[Boolean]()
+ override def removeToken(arg0: String): Boolean = unsupported[Boolean]()
+ override def getToken(arg0: String): String = unsupported[String]()
+ override def getAllTokenIdentifiers(): java.util.List[String] =
unsupported[java.util.List[String]]()
+ override def addMasterKey(arg0: String): Int = unsupported[Int]()
+ override def updateMasterKey(arg0: java.lang.Integer, arg1: String): Unit =
unsupported[Unit]()
+ override def removeMasterKey(arg0: java.lang.Integer): Boolean =
unsupported[Boolean]()
+ override def getMasterKeys(): Array[String] = unsupported[Array[String]]()
+ override def createFunction(arg0:
org.apache.hadoop.hive.metastore.api.Function): Unit = unsupported[Unit]()
+ override def alterFunction(arg0: String, arg1: String, arg2:
org.apache.hadoop.hive.metastore.api.Function): Unit = unsupported[Unit]()
+ override def dropFunction(arg0: String, arg1: String): Unit =
unsupported[Unit]()
+ override def getFunction(arg0: String, arg1: String):
org.apache.hadoop.hive.metastore.api.Function =
unsupported[org.apache.hadoop.hive.metastore.api.Function]()
+ override def getFunctions(arg0: String, arg1: String):
java.util.List[String] = unsupported[java.util.List[String]]()
+ override def getAllFunctions():
org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse =
unsupported[org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse]()
+ override def getValidTxns(): org.apache.hadoop.hive.common.ValidTxnList =
unsupported[org.apache.hadoop.hive.common.ValidTxnList]()
+ override def getValidTxns(arg0: Long):
org.apache.hadoop.hive.common.ValidTxnList =
unsupported[org.apache.hadoop.hive.common.ValidTxnList]()
+ override def openTxn(arg0: String): Long = unsupported[Long]()
+ override def openTxns(arg0: String, arg1: Int):
org.apache.hadoop.hive.metastore.api.OpenTxnsResponse =
unsupported[org.apache.hadoop.hive.metastore.api.OpenTxnsResponse]()
+ override def rollbackTxn(arg0: Long): Unit = unsupported[Unit]()
+ override def commitTxn(arg0: Long): Unit = unsupported[Unit]()
+ override def abortTxns(arg0: java.util.List[java.lang.Long]): Unit =
unsupported[Unit]()
+ override def showTxns():
org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse =
unsupported[org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse]()
+ override def lock(arg0: org.apache.hadoop.hive.metastore.api.LockRequest):
org.apache.hadoop.hive.metastore.api.LockResponse =
unsupported[org.apache.hadoop.hive.metastore.api.LockResponse]()
+ override def checkLock(arg0: Long):
org.apache.hadoop.hive.metastore.api.LockResponse =
unsupported[org.apache.hadoop.hive.metastore.api.LockResponse]()
+ override def unlock(arg0: Long): Unit = unsupported[Unit]()
+ override def showLocks():
org.apache.hadoop.hive.metastore.api.ShowLocksResponse =
unsupported[org.apache.hadoop.hive.metastore.api.ShowLocksResponse]()
+ override def showLocks(arg0:
org.apache.hadoop.hive.metastore.api.ShowLocksRequest):
org.apache.hadoop.hive.metastore.api.ShowLocksResponse =
unsupported[org.apache.hadoop.hive.metastore.api.ShowLocksResponse]()
+ override def heartbeat(arg0: Long, arg1: Long): Unit = unsupported[Unit]()
+ override def heartbeatTxnRange(arg0: Long, arg1: Long):
org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse =
unsupported[org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse]()
+ override def compact(arg0: String, arg1: String, arg2: String, arg3:
org.apache.hadoop.hive.metastore.api.CompactionType): Unit = unsupported[Unit]()
+ override def compact(arg0: String, arg1: String, arg2: String, arg3:
org.apache.hadoop.hive.metastore.api.CompactionType, arg4:
java.util.Map[String, String]): Unit = unsupported[Unit]()
+ override def compact2(arg0: String, arg1: String, arg2: String, arg3:
org.apache.hadoop.hive.metastore.api.CompactionType, arg4:
java.util.Map[String, String]):
org.apache.hadoop.hive.metastore.api.CompactionResponse =
unsupported[org.apache.hadoop.hive.metastore.api.CompactionResponse]()
+ override def showCompactions():
org.apache.hadoop.hive.metastore.api.ShowCompactResponse =
unsupported[org.apache.hadoop.hive.metastore.api.ShowCompactResponse]()
+ override def addDynamicPartitions(arg0: Long, arg1: String, arg2: String,
arg3: java.util.List[String]): Unit = unsupported[Unit]()
+ override def addDynamicPartitions(arg0: Long, arg1: String, arg2: String,
arg3: java.util.List[String], arg4:
org.apache.hadoop.hive.metastore.api.DataOperationType): Unit =
unsupported[Unit]()
+ override def insertTable(arg0: org.apache.hadoop.hive.metastore.api.Table,
arg1: Boolean): Unit = unsupported[Unit]()
+ override def getNextNotification(arg0: Long, arg1: Int, arg2:
org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter):
org.apache.hadoop.hive.metastore.api.NotificationEventResponse =
unsupported[org.apache.hadoop.hive.metastore.api.NotificationEventResponse]()
+ override def getCurrentNotificationEventId():
org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId =
unsupported[org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId]()
+ override def fireListenerEvent(arg0:
org.apache.hadoop.hive.metastore.api.FireEventRequest):
org.apache.hadoop.hive.metastore.api.FireEventResponse =
unsupported[org.apache.hadoop.hive.metastore.api.FireEventResponse]()
+ override def get_principals_in_role(arg0:
org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest):
org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse =
unsupported[org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse]()
+ override def get_role_grants_for_principal(arg0:
org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest):
org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse =
unsupported[org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse]()
+ override def getAggrColStatsFor(arg0: String, arg1: String, arg2:
java.util.List[String], arg3: java.util.List[String]):
org.apache.hadoop.hive.metastore.api.AggrStats =
unsupported[org.apache.hadoop.hive.metastore.api.AggrStats]()
+ override def setPartitionColumnStatistics(arg0:
org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest): Boolean =
unsupported[Boolean]()
+ override def flushCache(): Unit = unsupported[Unit]()
+ override def getFileMetadata(arg0: java.util.List[java.lang.Long]):
java.lang.Iterable[java.util.Map.Entry[java.lang.Long, java.nio.ByteBuffer]] =
unsupported[java.lang.Iterable[java.util.Map.Entry[java.lang.Long,
java.nio.ByteBuffer]]]()
+ override def getFileMetadataBySarg(arg0: java.util.List[java.lang.Long],
arg1: java.nio.ByteBuffer, arg2: Boolean):
java.lang.Iterable[java.util.Map.Entry[java.lang.Long,
org.apache.hadoop.hive.metastore.api.MetadataPpdResult]] =
unsupported[java.lang.Iterable[java.util.Map.Entry[java.lang.Long,
org.apache.hadoop.hive.metastore.api.MetadataPpdResult]]]()
+ override def clearFileMetadata(arg0: java.util.List[java.lang.Long]): Unit =
unsupported[Unit]()
+ override def putFileMetadata(arg0: java.util.List[java.lang.Long], arg1:
java.util.List[java.nio.ByteBuffer]): Unit = unsupported[Unit]()
+ override def isSameConfObj(arg0: org.apache.hadoop.hive.conf.HiveConf):
Boolean = unsupported[Boolean]()
+ override def cacheFileMetadata(arg0: String, arg1: String, arg2: String,
arg3: Boolean): Boolean = unsupported[Boolean]()
+ override def getPrimaryKeys(arg0:
org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest):
java.util.List[org.apache.hadoop.hive.metastore.api.SQLPrimaryKey] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.SQLPrimaryKey]]()
+ override def getForeignKeys(arg0:
org.apache.hadoop.hive.metastore.api.ForeignKeysRequest):
java.util.List[org.apache.hadoop.hive.metastore.api.SQLForeignKey] =
unsupported[java.util.List[org.apache.hadoop.hive.metastore.api.SQLForeignKey]]()
+ override def createTableWithConstraints(arg0:
org.apache.hadoop.hive.metastore.api.Table, arg1:
java.util.List[org.apache.hadoop.hive.metastore.api.SQLPrimaryKey], arg2:
java.util.List[org.apache.hadoop.hive.metastore.api.SQLForeignKey]): Unit =
unsupported[Unit]()
+ override def dropConstraint(arg0: String, arg1: String, arg2: String): Unit
= unsupported[Unit]()
+ override def addPrimaryKey(arg0:
java.util.List[org.apache.hadoop.hive.metastore.api.SQLPrimaryKey]): Unit =
unsupported[Unit]()
+ override def addForeignKey(arg0:
java.util.List[org.apache.hadoop.hive.metastore.api.SQLForeignKey]): Unit =
unsupported[Unit]()
+ // scalastyle:on
+
+ private def toCatalogTable(table: Table): CatalogTable = {
+ val db = table.getDbName
+ val tbl = table.getTableName
+ val cols =
Option(table.getSd).map(_.getCols).map(_.asScala.toList).getOrElse(Nil)
+ val partCols =
Option(table.getPartitionKeys).map(_.asScala.toList).getOrElse(Nil)
+
+ val dataFields = cols.map(fs => StructField(fs.getName,
CatalystSqlParser.parseDataType(fs.getType), nullable = true, Metadata.empty))
+ val partitionFields = partCols.map(fs => StructField(fs.getName,
CatalystSqlParser.parseDataType(fs.getType), nullable = true, Metadata.empty))
+
+ CatalogTable(
+ identifier = TableIdentifier(tbl, Some(db)),
+ tableType = if ("EXTERNAL_TABLE".equalsIgnoreCase(table.getTableType))
CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat(
+ locationUri = Option(table.getSd).map(_.getLocation).map(new URI(_)),
+ inputFormat = Option(table.getSd).map(_.getInputFormat),
+ outputFormat = Option(table.getSd).map(_.getOutputFormat),
+ serde = Option(table.getSd).flatMap(sd =>
Option(sd.getSerdeInfo)).map(_.getSerializationLib),
+ compressed = false,
+ properties = Option(table.getSd).flatMap(sd =>
Option(sd.getSerdeInfo)).flatMap(si =>
Option(si.getParameters)).map(_.asScala.toMap).getOrElse(Map.empty)),
+ schema = StructType(dataFields ++ partitionFields),
+ provider = Some("hudi"),
+ partitionColumnNames = partCols.map(_.getName),
+ properties =
Option(table.getParameters).map(_.asScala.toMap).getOrElse(Map.empty))
+ }
+
+ private def fromCatalogTable(table: CatalogTable): Table = {
+ val t = new Table()
+ t.setDbName(table.identifier.database.getOrElse("default"))
+ t.setTableName(table.identifier.table)
+ t.setTableType(if (table.tableType == CatalogTableType.EXTERNAL)
"EXTERNAL_TABLE" else "MANAGED_TABLE")
+ t.setParameters(new util.HashMap[String, String](table.properties.asJava))
+
+ val nonPartitionFields = table.schema.fields.filterNot(f =>
table.partitionColumnNames.contains(f.name))
+ val cols = nonPartitionFields.map(f => new FieldSchema(f.name,
f.dataType.catalogString, f.getComment().orNull)).toList.asJava
+ val partCols = table.partitionColumnNames.map { name =>
+ val dt = table.partitionSchema.fields.find(_.name ==
name).map(_.dataType.catalogString).getOrElse("string")
+ new FieldSchema(name, dt, "")
+ }.toList.asJava
+
+ val serdeInfo = new SerDeInfo(null, table.storage.serde.orNull, new
util.HashMap[String, String](table.storage.properties.asJava))
+ val sd = new StorageDescriptor(cols,
table.storage.locationUri.map(_.toString).orNull,
+ table.storage.inputFormat.orNull, table.storage.outputFormat.orNull,
false, 0, serdeInfo, null, null, null)
+ t.setSd(sd)
+ t.setPartitionKeys(partCols)
+ t
+ }
+
+ private def fromCatalogPartition(part: CatalogTablePartition, db: String,
table: String, partitionKeys: List[String]): Partition = {
+ val values = partitionKeys.map(k => part.spec.getOrElse(k, "")).asJava
+ val serdeInfo = new SerDeInfo()
+ val sd = new StorageDescriptor()
+ sd.setLocation(part.storage.locationUri.map(_.toString).orNull)
+ sd.setInputFormat(part.storage.inputFormat.orNull)
+ sd.setOutputFormat(part.storage.outputFormat.orNull)
+ sd.setSerdeInfo(serdeInfo)
+ new Partition(values, db, table, 0, 0, sd, new util.HashMap[String,
String]())
+ }
+
+ private def toCatalogPartition(part: Partition): CatalogTablePartition = {
+ val table = getTable(part.getDbName, part.getTableName)
+ val keys =
Option(table.getPartitionKeys).map(_.asScala.toList).getOrElse(Nil)
+ val values = Option(part.getValues).map(_.asScala.toList).getOrElse(Nil)
+ val spec = keys.zip(values).map { case (k, v) => (k.getName, v) }.toMap
+ CatalogTablePartition(
+ spec = spec,
+ storage = CatalogStorageFormat(
+ locationUri = Option(part.getSd).map(_.getLocation).map(new URI(_)),
+ inputFormat = Option(part.getSd).map(_.getInputFormat),
+ outputFormat = Option(part.getSd).map(_.getOutputFormat),
+ serde = Option(part.getSd).flatMap(sd =>
Option(sd.getSerdeInfo)).map(_.getSerializationLib),
+ compressed = false,
+ properties = Option(part.getSd).flatMap(sd =>
Option(sd.getSerdeInfo)).flatMap(si =>
Option(si.getParameters)).map(_.asScala.toMap).getOrElse(Map.empty)),
+ parameters =
Option(part.getParameters).map(_.asScala.toMap).getOrElse(Map.empty))
+ }
+
+ private def parsePartitionClause(partName: String): Map[String, String] = {
+ partName.split("/").flatMap { token =>
+ token.split("=").toList match {
+ case k :: v :: Nil =>
+ Some(k.trim -> v.trim.stripPrefix("'").stripSuffix("'"))
+ case _ => None
+ }
+ }.toMap
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hive/TestSparkCatalogMetaStoreClient.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hive/TestSparkCatalogMetaStoreClient.scala
new file mode 100644
index 000000000000..e6385ee93528
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hive/TestSparkCatalogMetaStoreClient.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.hive.HiveSyncConfig
+import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
+
+import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext,
FieldSchema, Partition, SerDeInfo, StorageDescriptor, Table}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.Utils
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotNull, assertTrue}
+import org.scalactic.source
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import java.io.File
+import java.nio.file.Files
+import java.util
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+class TestSparkCatalogMetaStoreClient extends FunSuite with BeforeAndAfterAll {
+
+ private val warehouseDir =
Files.createTempDirectory("spark-catalog-metastore-client").toFile
+ private val nameId = new AtomicInteger(0)
+
+ private lazy val spark: SparkSession = {
+ val sparkConf = getSparkConfForTest("TestSparkCatalogMetaStoreClient")
+ .remove("spark.sql.catalog.spark_catalog")
+
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir", warehouseDir.getCanonicalPath)
+ .config("spark.sql.session.timeZone", "UTC")
+ .config(sparkConf)
+ .enableHiveSupport()
+ .getOrCreate()
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ SparkSession.setActiveSession(spark)
+ SparkSession.setDefaultSession(spark)
+ }
+
+ override protected def afterAll(): Unit = {
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
+ if (!spark.sparkContext.isStopped) {
+ spark.stop()
+ }
+ Utils.deleteRecursively(warehouseDir)
+ super.afterAll()
+ }
+
+ override protected def test(testName: String, testTags:
org.scalatest.Tag*)(testFun: => Any)(implicit pos: source.Position): Unit = {
+ super.test(testName, testTags: _*)(
+ try {
+ testFun
+ } finally {
+
spark.sessionState.catalog.listDatabases().filter(_.startsWith("db_")).foreach
{ db =>
+ spark.sql(s"drop database if exists $db cascade")
+ }
+ }
+ )
+ }
+
+ test("exercise supported database and table APIs") {
+ withTempDir { tmp =>
+ val client = newClient()
+ val databaseName = generateName("db")
+ val tableName = generateName("tbl")
+
+ client.createDatabase(new Database(databaseName, "test database", new
File(tmp, databaseName).toURI.toString, new util.HashMap[String, String]()))
+ assertEquals(databaseName, client.getDatabase(databaseName).getName)
+
+ val createdTable = newTable(
+ databaseName,
+ tableName,
+ new File(tmp, tableName).toURI.toString,
+ Seq("id" -> "int", "name" -> "string"),
+ Seq("dt" -> "string"),
+ Map("comment" -> "v1"))
+
+ client.createTable(createdTable)
+
+ assertTrue(client.tableExists(databaseName, tableName))
+ assertEquals(Seq("id", "name", "dt"), client.getSchema(databaseName,
tableName).asScala.map(_.getName).toSeq)
+ assertEquals("v1", client.getTable(databaseName,
tableName).getParameters.get("comment"))
+
+ val alteredTable = newTable(
+ databaseName,
+ tableName,
+ new File(tmp, s"${tableName}_v2").toURI.toString,
+ Seq("id" -> "int", "name" -> "string", "age" -> "int"),
+ Seq("dt" -> "string"),
+ Map("comment" -> "v2"))
+
+ client.alter_table(databaseName, tableName, alteredTable)
+ assertEquals(Seq("id", "name", "age", "dt"),
client.getSchema(databaseName, tableName).asScala.map(_.getName).toSeq)
+ assertEquals("v2", client.getTable(databaseName,
tableName).getParameters.get("comment"))
+
+ val environmentAlteredTable = newTable(
+ databaseName,
+ tableName,
+ new File(tmp, s"${tableName}_v3").toURI.toString,
+ Seq("id" -> "int", "name" -> "string", "age" -> "int"),
+ Seq("dt" -> "string"),
+ Map("comment" -> "env-context"))
+
+ client.alter_table_with_environmentContext(databaseName, tableName,
environmentAlteredTable, new EnvironmentContext())
+ assertEquals("env-context", client.getTable(databaseName,
tableName).getParameters.get("comment"))
+ }
+ }
+
+ test("exercise supported partition and drop APIs") {
+ withTempDir { tmp =>
+ val client = newClient()
+ val databaseName = generateName("db")
+ val tableName = generateName("tbl")
+
+ client.createDatabase(new Database(databaseName, "test database", new
File(tmp, databaseName).toURI.toString, new util.HashMap[String, String]()))
+ client.createTable(newTable(
+ databaseName,
+ tableName,
+ new File(tmp, tableName).toURI.toString,
+ Seq("id" -> "int"),
+ Seq("dt" -> "string")))
+
+ val partitionOne = newPartition(databaseName, tableName,
Seq("2024-01-01"), new File(tmp, s"$tableName/dt=2024-01-01").toURI.toString)
+ val partitionTwo = newPartition(databaseName, tableName,
Seq("2024-01-02"), new File(tmp, s"$tableName/dt=2024-01-02").toURI.toString)
+
+ val added = client.add_partitions(util.Arrays.asList(partitionOne,
partitionTwo), false, true)
+ assertEquals(2, added.size())
+
+ val listedPartitions = client.listPartitions(databaseName, tableName,
(-1).toShort).asScala.toSeq
+ assertEquals(Set("2024-01-01", "2024-01-02"),
listedPartitions.map(_.getValues.get(0)).toSet)
+ assertNotNull(listedPartitions.find(_.getValues.get(0) ==
"2024-01-02").orNull)
+
+ val listedByFilter = client.listPartitionsByFilter(databaseName,
tableName, "dt='2024-01-02'", (-1).toShort).asScala.toSeq
+ assertEquals(Set("2024-01-01", "2024-01-02"),
listedByFilter.map(_.getValues.get(0)).toSet)
+
+ val alteredPartition = newPartition(
+ databaseName,
+ tableName,
+ Seq("2024-01-02"),
+ new File(tmp, s"$tableName/dt=2024-01-02-updated").toURI.toString)
+ client.alter_partitions(databaseName, tableName,
util.Collections.singletonList(alteredPartition), new EnvironmentContext())
+
+ val updatedLocation = client.listPartitions(databaseName, tableName,
(-1).toShort).asScala
+ .find(_.getValues.get(0) == "2024-01-02")
+ .map(_.getSd.getLocation)
+ .orNull
+ assertTrue(updatedLocation.endsWith("dt=2024-01-02-updated"))
+
+ assertTrue(client.dropPartition(databaseName, tableName,
"dt=2024-01-01", false))
+ val remainingPartitions = client.listPartitions(databaseName, tableName,
(-1).toShort).asScala.toSeq
+ assertEquals(Seq("2024-01-02"),
remainingPartitions.map(_.getValues.get(0)))
+
+ client.dropTable(databaseName, tableName)
+ assertFalse(client.tableExists(databaseName, tableName))
+ }
+ }
+
+ private def newClient(): SparkCatalogMetaStoreClient = {
+ SparkSession.setActiveSession(spark)
+ SparkSession.setDefaultSession(spark)
+ new SparkCatalogMetaStoreClient(new HiveSyncConfig(new TypedProperties()))
+ }
+
+ private def newTable(databaseName: String,
+ tableName: String,
+ location: String,
+ columns: Seq[(String, String)],
+ partitionColumns: Seq[(String, String)],
+ parameters: Map[String, String] = Map.empty): Table = {
+ val table = new Table()
+ table.setDbName(databaseName)
+ table.setTableName(tableName)
+ table.setTableType("EXTERNAL_TABLE")
+ table.setParameters(new util.HashMap[String, String](parameters.asJava))
+ table.setPartitionKeys(partitionColumns.map { case (name, dataType) =>
fieldSchema(name, dataType) }.asJava)
+
+ val serdeInfo = new SerDeInfo()
+ serdeInfo.setParameters(new util.HashMap[String, String]())
+
+ val storageDescriptor = new StorageDescriptor()
+ storageDescriptor.setCols(columns.map { case (name, dataType) =>
fieldSchema(name, dataType) }.asJava)
+ storageDescriptor.setLocation(location)
+ storageDescriptor.setSerdeInfo(serdeInfo)
+ table.setSd(storageDescriptor)
+ table
+ }
+
+ private def newPartition(databaseName: String,
+ tableName: String,
+ values: Seq[String],
+ location: String): Partition = {
+ val partition = new Partition()
+ partition.setDbName(databaseName)
+ partition.setTableName(tableName)
+ partition.setValues(values.asJava)
+ partition.setParameters(new util.HashMap[String, String]())
+
+ val serdeInfo = new SerDeInfo()
+ serdeInfo.setParameters(new util.HashMap[String, String]())
+
+ val storageDescriptor = new StorageDescriptor()
+ storageDescriptor.setLocation(location)
+ storageDescriptor.setSerdeInfo(serdeInfo)
+ partition.setSd(storageDescriptor)
+ partition
+ }
+
+ private def fieldSchema(name: String, dataType: String): FieldSchema = {
+ new FieldSchema(name, dataType, "")
+ }
+
+ private def withTempDir(f: File => Unit): Unit = {
+ val tempDir = Utils.createTempDir()
+ try {
+ f(tempDir)
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+
+ private def generateName(prefix: String): String = {
+ s"${prefix}_${nameId.incrementAndGet()}"
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSparkCatalogSync.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSparkCatalogSync.scala
new file mode 100644
index 000000000000..8f644d263a39
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSparkCatalogSync.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.ddl
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hive.{HiveStylePartitionValueExtractor,
HiveSyncConfigHolder, HiveSyncTool}
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH,
META_SYNC_DATABASE_NAME, META_SYNC_PARTITION_EXTRACTOR_CLASS,
META_SYNC_PARTITION_FIELDS, META_SYNC_TABLE_NAME}
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
+
+class TestSparkCatalogSync extends HoodieSparkSqlTestBase {
+
+ private val sparkCatalogSyncKey =
HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG.key()
+
+ test("Test Spark catalog sync with partition lifecycle") {
+ withTempDir { tmp =>
+ import spark.implicits._
+
+ val tableName = generateTableName
+ val databaseName = "testdb"
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ val syncProps = buildSyncProps(databaseName, tableName, basePath)
+ syncProps.setProperty(sparkCatalogSyncKey, "true")
+
+ try {
+ spark.sql(s"create database if not exists $databaseName")
+ writeToHudi(
+ Seq((1, "a1", 1000L, "2024-01-01"), (2, "a2", 1001L, "2024-01-02"))
+ .toDF("id", "name", "ts", "dt"),
+ tableName,
+ basePath,
+ SaveMode.Overwrite)
+
+ assertFalse(spark.catalog.tableExists(databaseName, tableName), "Table
should not exist before sync")
+ syncOnce(syncProps)
+ assertTrue(spark.catalog.tableExists(databaseName, tableName), "Table
should exist after sync")
+ assertTrue(spark.sql(s"show partitions
$databaseName.$tableName").count() == 2, "Initial partitions should be
registered")
+
+ writeToHudi(
+ Seq((3, "a3", 1002L, "2024-01-03")).toDF("id", "name", "ts", "dt"),
+ tableName,
+ basePath,
+ SaveMode.Append)
+ syncOnce(syncProps)
+ assertTrue(spark.sql(s"show partitions
$databaseName.$tableName").count() == 3, "New partition should be registered
after append")
+
+ spark.sql(s"alter table $databaseName.$tableName drop partition
(dt='2024-01-03')")
+ val partitionRows = spark.sql(s"show partitions
$databaseName.$tableName").collect().map(_.getString(0))
+ assertFalse(partitionRows.contains("dt=2024-01-03"), "Dropped
partition should not exist in catalog")
+ } finally {
+ spark.sql(s"drop table if exists $databaseName.$tableName")
+ }
+ }
+ }
+
+ test("Test Spark catalog sync with schema evolution") {
+ withTempDir { tmp =>
+ import spark.implicits._
+
+ val tableName = generateTableName
+ val databaseName = "testdb"
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ val syncProps = buildSyncProps(databaseName, tableName, basePath)
+ syncProps.setProperty(sparkCatalogSyncKey, "true")
+
+ try {
+ spark.sql(s"create database if not exists $databaseName")
+ writeToHudi(
+ Seq((1, "a1", 1000L, "2024-01-01")).toDF("id", "name", "ts", "dt"),
+ tableName,
+ basePath,
+ SaveMode.Overwrite)
+
+ assertFalse(spark.catalog.tableExists(databaseName, tableName), "Table
should not exist before sync")
+ syncOnce(syncProps)
+ assertFalse(hasColumn(databaseName, tableName, "age"), "Initial schema
should not contain age")
+
+ writeToHudi(
+ Seq((2, "a2", 1001L, "2024-01-02", 21)).toDF("id", "name", "ts",
"dt", "age"),
+ tableName,
+ basePath,
+ SaveMode.Append,
+ reconcileSchema = true)
+
+ syncOnce(syncProps)
+ assertTrue(hasColumn(databaseName, tableName, "age"), "Catalog schema
should be updated with new column age")
+ } finally {
+ spark.sql(s"drop table if exists $databaseName.$tableName")
+ }
+ }
+ }
+
+ private def syncOnce(syncProps: TypedProperties): Unit = {
+ val syncTool = new HiveSyncTool(syncProps, new HiveConf())
+ try {
+ syncTool.syncHoodieTable()
+ } finally {
+ syncTool.close()
+ }
+ }
+
+ private def buildSyncProps(databaseName: String, tableName: String,
basePath: String): TypedProperties = {
+ val syncProps = new TypedProperties()
+ syncProps.setProperty(META_SYNC_DATABASE_NAME.key, databaseName)
+ syncProps.setProperty(META_SYNC_TABLE_NAME.key, tableName)
+ syncProps.setProperty(META_SYNC_BASE_PATH.key, basePath)
+ syncProps.setProperty(META_SYNC_PARTITION_FIELDS.key, "dt")
+ syncProps.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key,
classOf[HiveStylePartitionValueExtractor].getName)
+
syncProps.setProperty(org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE.key(),
HiveSyncMode.HMS.name())
+ syncProps.setProperty(sparkCatalogSyncKey, "true")
+ syncProps
+ }
+
+ private def writeToHudi(df: DataFrame,
+ tableName: String,
+ basePath: String,
+ mode: SaveMode,
+ reconcileSchema: Boolean = false): Unit = {
+ val writer = df.write
+ .format("hudi")
+ .option(HoodieWriteConfig.TBL_NAME.key, tableName)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "dt")
+ .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key, "true")
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, "COPY_ON_WRITE")
+
+ val finalWriter = if (reconcileSchema) {
+ writer.option("hoodie.datasource.write.reconcile.schema", "true")
+ } else {
+ writer
+ }
+ finalWriter.mode(mode).save(basePath)
+ }
+
+ private def hasColumn(databaseName: String, tableName: String, columnName:
String): Boolean = {
+ spark.catalog.listColumns(databaseName,
tableName).collect().exists(_.name.equalsIgnoreCase(columnName))
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index adcc27cc2357..af1d5e783976 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -68,6 +68,7 @@ public class HiveSyncConfig extends HoodieSyncConfig {
public static final ConfigProperty<Boolean> HIVE_SYNC_OMIT_METADATA_FIELDS =
HiveSyncConfigHolder.HIVE_SYNC_OMIT_METADATA_FIELDS;
public static final ConfigProperty<Integer> HIVE_BATCH_SYNC_PARTITION_NUM =
HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
public static final ConfigProperty<String> HIVE_SYNC_MODE =
HiveSyncConfigHolder.HIVE_SYNC_MODE;
+ public static final ConfigProperty<Boolean> HIVE_SYNC_USE_SPARK_CATALOG =
HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG;
public static final ConfigProperty<Boolean> HIVE_SYNC_BUCKET_SYNC =
HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
public static final ConfigProperty<String> HIVE_SYNC_BUCKET_SYNC_SPEC =
HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
public static final ConfigProperty<String> HIVE_SYNC_COMMENT =
HiveSyncConfigHolder.HIVE_SYNC_COMMENT;
@@ -145,6 +146,8 @@ public class HiveSyncConfig extends HoodieSyncConfig {
public String metastoreUris;
@Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive
ops. Valid values are hms,glue,jdbc and hiveql")
public String syncMode;
+ @Parameter(names = {"--use-spark-catalog"}, description = "Whether to use
Spark catalog backed metastore client.")
+ public Boolean useSparkCatalog;
@Parameter(names = {"--auto-create-database"}, description = "Auto create
hive database")
public Boolean autoCreateDatabase;
@Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive
exceptions")
@@ -189,6 +192,7 @@ public class HiveSyncConfig extends HoodieSyncConfig {
props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
props.setPropertyIfNonNull(HIVE_USE_JDBC.key(), useJdbc);
props.setPropertyIfNonNull(HIVE_SYNC_MODE.key(), syncMode);
+ props.setPropertyIfNonNull(HIVE_SYNC_USE_SPARK_CATALOG.key(),
useSparkCatalog);
props.setPropertyIfNonNull(METASTORE_URIS.key(), metastoreUris);
props.setPropertyIfNonNull(HIVE_AUTO_CREATE_DATABASE.key(),
autoCreateDatabase);
props.setPropertyIfNonNull(HIVE_IGNORE_EXCEPTIONS.key(),
ignoreExceptions);
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java
index 1eb95cf5529e..418676d59162 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java
@@ -150,4 +150,10 @@ public class HiveSyncConfigHolder {
.markAdvanced()
.sinceVersion("0.13.0")
.withDocumentation("Hive table synchronization strategy. Available
option: RO, RT, ALL.");
+
+ public static final ConfigProperty<Boolean> HIVE_SYNC_USE_SPARK_CATALOG =
ConfigProperty
+ .key("hoodie.datasource.hive_sync.use_spark_catalog")
+ .defaultValue(false)
+ .markAdvanced()
+ .withDocumentation("Use Spark catalog backed IMetaStoreClient
implementation for hive sync.");
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 4da60290d597..892fe9350abe 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.MapUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
@@ -66,6 +67,7 @@ import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormat
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName;
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
+import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
@@ -113,7 +115,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
// Support JDBC, HiveQL and metastore based implementations for backwards
compatibility. Future users should
// disable jdbc and depend on metastore client for all hive registrations
try {
- this.client = IMetaStoreClientUtil.getMSC(config.getHiveConf());
+ this.client = createMetaStoreClient(config);
setMetaConf(config.getHiveConf());
if (!StringUtils.isNullOrEmpty(config.getString(HIVE_SYNC_MODE))) {
HiveSyncMode syncMode =
HiveSyncMode.of(config.getString(HIVE_SYNC_MODE));
@@ -185,6 +187,20 @@ public class HoodieHiveSyncClient extends HoodieSyncClient
{
return false;
}
+ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) {
+ try {
+ if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) {
+ return (IMetaStoreClient) ReflectionUtils.loadClass(
+ "org.apache.spark.sql.hive.SparkCatalogMetaStoreClient",
+ new Class<?>[] {HiveSyncConfig.class},
+ config);
+ }
+ return IMetaStoreClientUtil.getMSC(config.getHiveConf());
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to create
HiveMetaStoreClient", e);
+ }
+ }
+
private Table getInitialTable(String table) {
return initialTableByName.computeIfAbsent(table, t -> {
try {