This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f4ae4b7834f [HUDI-9594] Allow Hudi to delegate catalog operations to 
Apache Polaris (#13558)
9f4ae4b7834f is described below

commit 9f4ae4b7834fccc0a3060b500be461489a68587b
Author: Rahil C <[email protected]>
AuthorDate: Fri Jul 25 06:36:23 2025 -0700

    [HUDI-9594] Allow Hudi to delegate catalog operations to Apache Polaris 
(#13558)
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   7 +
 .../spark/sql/hudi/HoodieSqlCommonUtils.scala      |  15 ++
 .../spark/sql/hudi/catalog/HoodieCatalog.scala     |   6 +
 .../hudi/command/CreateHoodieTableCommand.scala    |  17 ++-
 .../sql/hudi/catalog/MockPolarisSparkCatalog.scala |  58 ++++++++
 .../TestPolarisHoodieCatalogDelegation.scala       | 151 +++++++++++++++++++++
 6 files changed, 247 insertions(+), 7 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 5a0cf17893de..d5bacb78c980 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -254,6 +254,13 @@ object DataSourceReadOptions {
       .sinceVersion("1.0.0")
       .withDocumentation("A regex under the table's base path to get file 
system view information")
 
+  val POLARIS_CATALOG_CLASS_NAME: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.spark.polaris.catalog.class")
+    .defaultValue("org.apache.polaris.spark.SparkCatalog")
+    .markAdvanced()
+    .sinceVersion("1.1.0")
+    .withDocumentation("Fully qualified class name of the catalog that is used 
by the Polaris spark client.")
+
   /** @deprecated Use {@link QUERY_TYPE} and its methods instead */
   @Deprecated
   val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 942adda6ff9a..8adfc3ac770c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -29,6 +29,7 @@ import 
org.apache.hudi.common.table.timeline.TimelineUtils.parseDateFromInstantT
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.storage.{HoodieStorage, StoragePath, StoragePathInfo}
+import org.apache.hudi.util.SparkConfigUtils
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -378,4 +379,18 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
       throw new HoodieException(s"Got an invalid instant ($queryInstant)")
     }
   }
+
+  /**
+   * Check if Polaris catalog is enabled in the Spark session.
+   * @param sparkSession The Spark session
+   * @return true if Polaris catalog is configured, false otherwise
+   */
+  def isUsingPolarisCatalog(sparkSession: SparkSession): Boolean = {
+    val sparkSessionConfigs = sparkSession.conf.getAll
+    val polarisCatalogClassName = SparkConfigUtils.getStringWithAltKeys(
+      sparkSessionConfigs, DataSourceReadOptions.POLARIS_CATALOG_CLASS_NAME)
+    sparkSessionConfigs
+      .filter(_._1.startsWith("spark.sql.catalog."))
+      .exists(_._2 == polarisCatalogClassName)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
index f57e148aef89..d71ec3cbb954 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
@@ -328,6 +328,12 @@ class HoodieCatalog extends DelegatingCatalogExtension
       new CreateHoodieTableCommand(tableDesc, false).run(spark)
     }
 
+    // Check if Polaris catalog is enabled
+    if (HoodieSqlCommonUtils.isUsingPolarisCatalog(spark)) {
+      // if so then invoke the delegate catalog by calling super, in this case 
it should be PolarisSparkCatalog
+      // this should handle creation of table entry in catalog
+      super.createTable(ident, schema, partitions, allTableProperties)
+    }
     loadTable(ident)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index 0667a8294c47..7f0ef968eccf 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast
 import org.apache.spark.sql.hive.HiveClientUtils
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isUsingHiveCatalog, 
isUsingPolarisCatalog}
 import 
org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.validateTableSchema
 import 
org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
 import org.apache.spark.sql.types.StructType
@@ -190,12 +190,15 @@ object CreateHoodieTableCommand {
       properties = newTblProperties
     )
 
-    // Create table in the catalog
-    val enableHive = isUsingHiveCatalog(sparkSession)
-    if (enableHive) {
-      createHiveDataSourceTable(sparkSession, newTable)
-    } else {
-      catalog.createTable(newTable, ignoreIfExists = false, validateLocation = 
false)
+    // If polaris is not enabled, we should create the table in hive or spark 
session catalog
+    // otherwise if enabled, hudi will use the delegate to create the table
+    if (!isUsingPolarisCatalog(sparkSession)) {
+      val enableHive = isUsingHiveCatalog(sparkSession)
+      if (enableHive) {
+        createHiveDataSourceTable(sparkSession, newTable)
+      } else {
+        catalog.createTable(newTable, ignoreIfExists = false, validateLocation 
= false)
+      }
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/MockPolarisSparkCatalog.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/MockPolarisSparkCatalog.scala
new file mode 100644
index 000000000000..4a6ee1c6a2d2
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/MockPolarisSparkCatalog.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, 
TableCatalog, TableChange}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.mockito.Mockito.mock
+
+import java.util
+
+/**
+ * Mock Polaris Spark Catalog for testing delegation behavior.
+ * Only implements essential methods: createTable and loadTable.
+ */
+class MockPolarisSparkCatalog extends TableCatalog {
+
+  override def initialize(name: String, options: CaseInsensitiveStringMap): 
Unit = {}
+
+  override def name(): String = "mock_polaris"
+
+  override def listTables(namespace: Array[String]): Array[Identifier] =
+    throw new UnsupportedOperationException("Not implemented in mock")
+
+  override def loadTable(ident: Identifier): Table = {
+    mock(classOf[Table])
+  }
+
+  override def createTable(ident: Identifier, schema: StructType, partitions: 
Array[Transform], properties: util.Map[String, String]): Table = {
+    mock(classOf[Table])
+  }
+
+  override def alterTable(ident: Identifier, changes: TableChange*): Table =
+    throw new UnsupportedOperationException("Not implemented in mock")
+
+  override def dropTable(ident: Identifier): Boolean =
+    throw new UnsupportedOperationException("Not implemented in mock")
+
+  override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
+    throw new UnsupportedOperationException("Not implemented in mock")
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/TestPolarisHoodieCatalogDelegation.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/TestPolarisHoodieCatalogDelegation.scala
new file mode 100644
index 000000000000..f927b7f50086
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/catalog/TestPolarisHoodieCatalogDelegation.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{never, spy, times, verify}
+import org.scalatest.funsuite.AnyFunSuite
+
+import java.io.File
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+/**
+ * Test class dedicated to testing Polaris catalog delegation behavior in 
HoodieCatalog.
+ */
+class TestPolarisHoodieCatalogDelegation extends AnyFunSuite {
+
+  private def generateTableName: String = 
s"hudi_test_table_${UUID.randomUUID().toString.replace("-", "_")}"
+
+  private def withTempDir(f: File => Unit): Unit = {
+    val tempDir = Utils.createTempDir()
+    try {
+      f(tempDir)
+    } finally {
+      Utils.deleteRecursively(tempDir)
+    }
+  }
+
+  private def buildCustomSparkSession(tempDir: File, enablePolaris: Boolean = 
false): (SparkSession, HoodieCatalog, MockPolarisSparkCatalog) = {
+    val mockPolarisDelegate = spy(new MockPolarisSparkCatalog())
+
+    val sparkBuilder = SparkSession.builder()
+      .appName("TestPolarisHoodieCatalogDelegation")
+      .master("local[*]")
+      .config("spark.sql.warehouse.dir", tempDir.getCanonicalPath)
+      .config("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .config("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+
+    if (enablePolaris) {
+      // In production the class should be 
org.apache.polaris.spark.SparkCatalog
+      // (which is the default value of the config 
hoodie.spark.polaris.catalog.class)
+      // However, in testing, verify if config works by using mock catalog
+      val testPolarisCatalogClass = 
"org.apache.spark.sql.hudi.catalog.MockPolarisSparkCatalog"
+      sparkBuilder.config("spark.sql.catalog.polaris_catalog", 
testPolarisCatalogClass)
+      sparkBuilder.config("hoodie.spark.polaris.catalog.class", 
testPolarisCatalogClass)
+    }
+
+    // Create SparkSession first so it becomes the active session
+    val customSession = sparkBuilder.getOrCreate()
+
+    // Get the HoodieCatalog instance from the session
+    val hoodieCatalog = 
customSession.sessionState.catalogManager.v2SessionCatalog.asInstanceOf[HoodieCatalog]
+
+    if (enablePolaris) {
+      // If enabled, we mimic Polaris's Spark catalog behavior by setting the 
delegate
+      hoodieCatalog.setDelegateCatalog(mockPolarisDelegate)
+    }
+    (customSession, hoodieCatalog, mockPolarisDelegate)
+  }
+
+  test("Test Normal Hudi Catalog Route") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+
+      // Create custom session with HoodieCatalog (with Polaris not enabled)
+      val (customSession, hoodieCatalog, mockPolarisDelegate) = 
buildCustomSparkSession(tmp)
+
+      try {
+        // Verify Polaris is not detected
+        assertFalse(HoodieSqlCommonUtils.isUsingPolarisCatalog(customSession))
+
+        hoodieCatalog.createTable(
+          Identifier.of(Array("default"), tableName),
+          StructType(Seq(
+            StructField("id", IntegerType),
+            StructField("name", StringType),
+            StructField("ts", LongType)
+          )),
+          Array.empty[Transform],
+          Map("provider" -> "hudi", "primaryKey" -> "id", "preCombineField" -> 
"ts", "path" -> tablePath).asJava
+        )
+
+        // Verification via filesystem
+        assertTrue(new File(s"$tablePath/.hoodie").exists(), "Hudi metadata 
directory should exist")
+        assertTrue(new File(s"$tablePath/.hoodie/hoodie.properties").exists(), 
"Hudi properties file should exist")
+
+        // Verify delegate was not called
+        verify(mockPolarisDelegate, never()).createTable(any(), any(), any(), 
any())
+
+      } finally {
+        customSession.stop()
+      }
+    }
+  }
+
+  test("Test Polaris Detection and Delegation") {
+    withTempDir { tmp =>
+      // Create custom session with HoodieCatalog (with Polaris enabled)
+      val (customSession, hoodieCatalog, mockPolarisDelegate) = 
buildCustomSparkSession(tmp, enablePolaris = true)
+
+      try {
+        // Verify Polaris is detected
+        assertTrue(HoodieSqlCommonUtils.isUsingPolarisCatalog(customSession), 
"Should detect Polaris with correct delegate and config")
+
+        // Verify delegation works by calling createTable API
+        val tableName = generateTableName
+        hoodieCatalog.createTable(
+          Identifier.of(Array("default"), tableName),
+          StructType(Seq(
+            StructField("id", IntegerType),
+            StructField("name", StringType),
+            StructField("ts", LongType)
+          )),
+          Array.empty[Transform],
+          Map("provider" -> "hudi", "primaryKey" -> "id", "preCombineField" -> 
"ts").asJava
+        )
+
+        // Verify delegate was called when Polaris is enabled
+        verify(mockPolarisDelegate, times(1)).createTable(any(), any(), any(), 
any())
+
+      } finally {
+        customSession.stop()
+      }
+    }
+  }
+}

Reply via email to