This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9f4ae4b7834f [HUDI-9594] Allow Hudi to delegate catalog operations to
Apache Polaris (#13558)
9f4ae4b7834f is described below
commit 9f4ae4b7834fccc0a3060b500be461489a68587b
Author: Rahil C <[email protected]>
AuthorDate: Fri Jul 25 06:36:23 2025 -0700
[HUDI-9594] Allow Hudi to delegate catalog operations to Apache Polaris
(#13558)
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 7 +
.../spark/sql/hudi/HoodieSqlCommonUtils.scala | 15 ++
.../spark/sql/hudi/catalog/HoodieCatalog.scala | 6 +
.../hudi/command/CreateHoodieTableCommand.scala | 17 ++-
.../sql/hudi/catalog/MockPolarisSparkCatalog.scala | 58 ++++++++
.../TestPolarisHoodieCatalogDelegation.scala | 151 +++++++++++++++++++++
6 files changed, 247 insertions(+), 7 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 5a0cf17893de..d5bacb78c980 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -254,6 +254,13 @@ object DataSourceReadOptions {
.sinceVersion("1.0.0")
.withDocumentation("A regex under the table's base path to get file
system view information")
+ val POLARIS_CATALOG_CLASS_NAME: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.spark.polaris.catalog.class")
+ .defaultValue("org.apache.polaris.spark.SparkCatalog")
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Fully qualified class name of the catalog that is used
by the Polaris spark client.")
+
/** @deprecated Use {@link QUERY_TYPE} and its methods instead */
@Deprecated
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 942adda6ff9a..8adfc3ac770c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -29,6 +29,7 @@ import
org.apache.hudi.common.table.timeline.TimelineUtils.parseDateFromInstantT
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.storage.{HoodieStorage, StoragePath, StoragePathInfo}
+import org.apache.hudi.util.SparkConfigUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -378,4 +379,18 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
throw new HoodieException(s"Got an invalid instant ($queryInstant)")
}
}
+
+ /**
+ * Check if Polaris catalog is enabled in the Spark session.
+ * @param sparkSession The Spark session
+ * @return true if Polaris catalog is configured, false otherwise
+ */
+ def isUsingPolarisCatalog(sparkSession: SparkSession): Boolean = {
+ val sparkSessionConfigs = sparkSession.conf.getAll
+ val polarisCatalogClassName = SparkConfigUtils.getStringWithAltKeys(
+ sparkSessionConfigs, DataSourceReadOptions.POLARIS_CATALOG_CLASS_NAME)
+ sparkSessionConfigs
+ .filter(_._1.startsWith("spark.sql.catalog."))
+ .exists(_._2 == polarisCatalogClassName)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
index f57e148aef89..d71ec3cbb954 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
@@ -328,6 +328,12 @@ class HoodieCatalog extends DelegatingCatalogExtension
new CreateHoodieTableCommand(tableDesc, false).run(spark)
}
+ // Check if Polaris catalog is enabled
+ if (HoodieSqlCommonUtils.isUsingPolarisCatalog(spark)) {
+ // if so then invoke the delegate catalog by calling super, in this case
it should be PolarisSparkCatalog
+ // this should handle creation of table entry in catalog
+ super.createTable(ident, schema, partitions, allTableProperties)
+ }
loadTable(ident)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index 0667a8294c47..7f0ef968eccf 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isUsingHiveCatalog,
isUsingPolarisCatalog}
import
org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.validateTableSchema
import
org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.StructType
@@ -190,12 +190,15 @@ object CreateHoodieTableCommand {
properties = newTblProperties
)
- // Create table in the catalog
- val enableHive = isUsingHiveCatalog(sparkSession)
- if (enableHive) {
- createHiveDataSourceTable(sparkSession, newTable)
- } else {
- catalog.createTable(newTable, ignoreIfExists = false, validateLocation =
false)
+ // If polaris is not enabled, we should create the table in hive or spark
session catalog
+ // otherwise if enabled, hudi will use the delegate to create the table
+ if (!isUsingPolarisCatalog(sparkSession)) {
+ val enableHive = isUsingHiveCatalog(sparkSession)
+ if (enableHive) {
+ createHiveDataSourceTable(sparkSession, newTable)
+ } else {
+ catalog.createTable(newTable, ignoreIfExists = false, validateLocation
= false)
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/MockPolarisSparkCatalog.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/MockPolarisSparkCatalog.scala
new file mode 100644
index 000000000000..4a6ee1c6a2d2
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/MockPolarisSparkCatalog.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.spark.sql.connector.catalog.{Identifier, Table,
TableCatalog, TableChange}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.mockito.Mockito.mock
+
+import java.util
+
+/**
+ * Mock Polaris Spark Catalog for testing delegation behavior.
+ * Only implements essential methods: createTable and loadTable.
+ */
+class MockPolarisSparkCatalog extends TableCatalog {
+
+ override def initialize(name: String, options: CaseInsensitiveStringMap):
Unit = {}
+
+ override def name(): String = "mock_polaris"
+
+ override def listTables(namespace: Array[String]): Array[Identifier] =
+ throw new UnsupportedOperationException("Not implemented in mock")
+
+ override def loadTable(ident: Identifier): Table = {
+ mock(classOf[Table])
+ }
+
+ override def createTable(ident: Identifier, schema: StructType, partitions:
Array[Transform], properties: util.Map[String, String]): Table = {
+ mock(classOf[Table])
+ }
+
+ override def alterTable(ident: Identifier, changes: TableChange*): Table =
+ throw new UnsupportedOperationException("Not implemented in mock")
+
+ override def dropTable(ident: Identifier): Boolean =
+ throw new UnsupportedOperationException("Not implemented in mock")
+
+ override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
+ throw new UnsupportedOperationException("Not implemented in mock")
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/TestPolarisHoodieCatalogDelegation.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/TestPolarisHoodieCatalogDelegation.scala
new file mode 100644
index 000000000000..f927b7f50086
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/TestPolarisHoodieCatalogDelegation.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{never, spy, times, verify}
+import org.scalatest.funsuite.AnyFunSuite
+
+import java.io.File
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+/**
+ * Test class dedicated to testing Polaris catalog delegation behavior in
HoodieCatalog.
+ */
+class TestPolarisHoodieCatalogDelegation extends AnyFunSuite {
+
+ private def generateTableName: String =
s"hudi_test_table_${UUID.randomUUID().toString.replace("-", "_")}"
+
+ private def withTempDir(f: File => Unit): Unit = {
+ val tempDir = Utils.createTempDir()
+ try {
+ f(tempDir)
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+
+ private def buildCustomSparkSession(tempDir: File, enablePolaris: Boolean =
false): (SparkSession, HoodieCatalog, MockPolarisSparkCatalog) = {
+ val mockPolarisDelegate = spy(new MockPolarisSparkCatalog())
+
+ val sparkBuilder = SparkSession.builder()
+ .appName("TestPolarisHoodieCatalogDelegation")
+ .master("local[*]")
+ .config("spark.sql.warehouse.dir", tempDir.getCanonicalPath)
+ .config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+
+ if (enablePolaris) {
+ // In production the class should be
org.apache.polaris.spark.SparkCatalog
+ // (which is the default value of the config
hoodie.spark.polaris.catalog.class)
+ // However, in testing, verify if config works by using mock catalog
+ val testPolarisCatalogClass =
"org.apache.spark.sql.hudi.catalog.MockPolarisSparkCatalog"
+ sparkBuilder.config("spark.sql.catalog.polaris_catalog",
testPolarisCatalogClass)
+ sparkBuilder.config("hoodie.spark.polaris.catalog.class",
testPolarisCatalogClass)
+ }
+
+ // Create SparkSession first so it becomes the active session
+ val customSession = sparkBuilder.getOrCreate()
+
+ // Get the HoodieCatalog instance from the session
+ val hoodieCatalog =
customSession.sessionState.catalogManager.v2SessionCatalog.asInstanceOf[HoodieCatalog]
+
+ if (enablePolaris) {
+ // If enabled, we mimic Polaris's Spark catalog behavior by setting the
delegate
+ hoodieCatalog.setDelegateCatalog(mockPolarisDelegate)
+ }
+ (customSession, hoodieCatalog, mockPolarisDelegate)
+ }
+
+ test("Test Normal Hudi Catalog Route") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ // Create custom session with HoodieCatalog (with Polaris not enabled)
+ val (customSession, hoodieCatalog, mockPolarisDelegate) =
buildCustomSparkSession(tmp)
+
+ try {
+ // Verify Polaris is not detected
+ assertFalse(HoodieSqlCommonUtils.isUsingPolarisCatalog(customSession))
+
+ hoodieCatalog.createTable(
+ Identifier.of(Array("default"), tableName),
+ StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("name", StringType),
+ StructField("ts", LongType)
+ )),
+ Array.empty[Transform],
+ Map("provider" -> "hudi", "primaryKey" -> "id", "preCombineField" ->
"ts", "path" -> tablePath).asJava
+ )
+
+ // Verification via filesystem
+ assertTrue(new File(s"$tablePath/.hoodie").exists(), "Hudi metadata
directory should exist")
+ assertTrue(new File(s"$tablePath/.hoodie/hoodie.properties").exists(),
"Hudi properties file should exist")
+
+ // Verify delegate was not called
+ verify(mockPolarisDelegate, never()).createTable(any(), any(), any(),
any())
+
+ } finally {
+ customSession.stop()
+ }
+ }
+ }
+
+ test("Test Polaris Detection and Delegation") {
+ withTempDir { tmp =>
+ // Create custom session with HoodieCatalog (with Polaris enabled)
+ val (customSession, hoodieCatalog, mockPolarisDelegate) =
buildCustomSparkSession(tmp, enablePolaris = true)
+
+ try {
+ // Verify Polaris is detected
+ assertTrue(HoodieSqlCommonUtils.isUsingPolarisCatalog(customSession),
"Should detect Polaris with correct delegate and config")
+
+ // Verify delegation works by calling createTable API
+ val tableName = generateTableName
+ hoodieCatalog.createTable(
+ Identifier.of(Array("default"), tableName),
+ StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("name", StringType),
+ StructField("ts", LongType)
+ )),
+ Array.empty[Transform],
+ Map("provider" -> "hudi", "primaryKey" -> "id", "preCombineField" ->
"ts").asJava
+ )
+
+ // Verify delegate was called when Polaris is enabled
+ verify(mockPolarisDelegate, times(1)).createTable(any(), any(), any(),
any())
+
+ } finally {
+ customSession.stop()
+ }
+ }
+ }
+}