This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8491d6ac76f6 fix: Remove catalog access from SparkSQLWriter (#14083)
8491d6ac76f6 is described below

commit 8491d6ac76f6aba3d9c663b22705309c9a988ef5
Author: Lin Liu <[email protected]>
AuthorDate: Fri Oct 17 15:23:09 2025 -0700

    fix: Remove catalog access from SparkSQLWriter (#14083)
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 39 ++++++++--------------
 .../command/InsertIntoHoodieTableCommand.scala     |  8 +++--
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  5 ++-
 .../command/InsertIntoHoodieTableCommand.scala     |  8 +++--
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  5 ++-
 5 files changed, 34 insertions(+), 31 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 2640f55177ec..02546c243458 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -126,9 +126,11 @@ object HoodieSparkSqlWriter {
             optParams: Map[String, String],
             sourceDf: DataFrame,
             streamingWritesParamsOpt: Option[StreamingWriteParams] = 
Option.empty,
-            hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
+            hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,
+            schemaFromCatalog: Option[Schema] = Option.empty):
   (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = {
-    new HoodieSparkSqlWriterInternal().write(sqlContext, mode, optParams, 
sourceDf, streamingWritesParamsOpt, hoodieWriteClient)
+    new HoodieSparkSqlWriterInternal().write(sqlContext, mode, optParams, 
sourceDf, streamingWritesParamsOpt, hoodieWriteClient,
+      schemaFromCatalog = schemaFromCatalog)
   }
 
   def bootstrap(sqlContext: SQLContext,
@@ -175,7 +177,8 @@ class HoodieSparkSqlWriterInternal {
             optParams: Map[String, String],
             sourceDf: DataFrame,
             streamingWritesParamsOpt: Option[StreamingWriteParams] = 
Option.empty,
-            hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
+            hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,
+            schemaFromCatalog: Option[Schema] = Option.empty):
   (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = {
 
     val retryWrite: () => (Boolean, HOption[String], HOption[String], 
HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = () => {
@@ -186,7 +189,8 @@ class HoodieSparkSqlWriterInternal {
 
       while (counter <= maxRetry && !succeeded) {
         try {
-          toReturn = writeInternal(sqlContext, mode, optParams, sourceDf, 
streamingWritesParamsOpt, hoodieWriteClient)
+          toReturn = writeInternal(sqlContext, mode, optParams, sourceDf, 
streamingWritesParamsOpt, hoodieWriteClient,
+            schemaFromCatalog = schemaFromCatalog)
           if (counter > 0) {
             log.info(s"Write Succeeded after $counter attempts")
           }
@@ -212,7 +216,8 @@ class HoodieSparkSqlWriterInternal {
                             optParams: Map[String, String],
                             sourceDf: DataFrame,
                             streamingWritesParamsOpt: 
Option[StreamingWriteParams] = Option.empty,
-                            hoodieWriteClient: Option[SparkRDDWriteClient[_]] 
= Option.empty):
+                            hoodieWriteClient: Option[SparkRDDWriteClient[_]] 
= Option.empty,
+                            schemaFromCatalog: Option[Schema] = Option.empty):
   (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = {
 
     assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), 
"'path' must be set")
@@ -353,7 +358,7 @@ class HoodieSparkSqlWriterInternal {
           classOf[Schema]))
 
       val shouldReconcileSchema = 
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
-      val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, 
tableMetaClient)
+      val latestTableSchemaOpt = getLatestTableSchema(tableMetaClient, 
schemaFromCatalog)
       val df = if (preppedWriteOperation || preppedSparkSqlWrites || 
preppedSparkSqlMergeInto || sourceDf.isStreaming) {
         sourceDf
       } else {
@@ -669,26 +674,10 @@ class HoodieSparkSqlWriterInternal {
     sparkContext.getConf.registerAvroSchemas(targetAvroSchemas: _*)
   }
 
-  private def getLatestTableSchema(spark: SparkSession,
-                                   tableId: TableIdentifier,
-                                   tableMetaClient: HoodieTableMetaClient): 
Option[Schema] = {
+  private def getLatestTableSchema(tableMetaClient: HoodieTableMetaClient, 
schemaFromCatalog: Option[Schema]): Option[Schema] = {
     val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
-    val latestTableSchemaFromCommitMetadata =
-      
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
-    latestTableSchemaFromCommitMetadata.orElse {
-      getCatalogTable(spark, tableId).map { catalogTable =>
-        val (structName, namespace) = 
getAvroRecordNameAndNamespace(tableId.table)
-        convertStructTypeToAvroSchema(catalogTable.schema, structName, 
namespace)
-      }
-    }
-  }
-
-  private def getCatalogTable(spark: SparkSession, tableId: TableIdentifier): 
Option[CatalogTable] = {
-    if (spark.sessionState.catalog.tableExists(tableId)) {
-      Some(spark.sessionState.catalog.getTableMetadata(tableId))
-    } else {
-      None
-    }
+    
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
+      .orElse(schemaFromCatalog)
   }
 
   def bootstrap(sqlContext: SQLContext,
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index da6285a47d0a..454f2a9efbca 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.hudi.command
 
-import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
+import org.apache.hudi.{AvroConversionUtils, HoodieSparkSqlWriter, 
SparkAdapterSupport}
+import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
 import org.apache.hudi.exception.HoodieException
 
 import org.apache.spark.internal.Logging
@@ -111,7 +112,10 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
     val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, 
staticOverwritePartitionPathOpt)
 
     val df = sparkSession.internalCreateDataFrame(query.execute(), 
query.schema)
-    val (success, commitInstantTime, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df)
+    val (structName, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(catalogTable.tableName)
+    val schema = convertStructTypeToAvroSchema(catalogTable.tableSchema, 
structName, namespace)
+    val (success, commitInstantTime, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df,
+      schemaFromCatalog = Option.apply(schema))
 
     if (!success) {
       throw new HoodieException("Insert Into to Hudi table failed")
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index ae8a5e054ed9..d286f97bcdd1 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -493,7 +493,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       PAYLOAD_ORIGINAL_AVRO_PAYLOAD -> 
hoodieCatalogTable.tableConfig.getPayloadClass
     )
 
-    val (success, commitInstantTime, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, 
writeParams, sourceDF)
+    val (structName, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieCatalogTable.tableName)
+    val schema = convertStructTypeToAvroSchema(hoodieCatalogTable.tableSchema, 
structName, namespace)
+    val (success, commitInstantTime, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, 
writeParams, sourceDF,
+      schemaFromCatalog = Option.apply(schema))
     if (!success) {
       throw new HoodieException("Merge into Hoodie table command failed")
     }
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 35e1dadbbe3a..b6585d097a95 100644
--- 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.hudi.command
 
-import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
+import org.apache.hudi.{AvroConversionUtils, HoodieSparkSqlWriter, 
SparkAdapterSupport}
+import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
 import org.apache.hudi.exception.HoodieException
 
 import org.apache.spark.internal.Logging
@@ -111,7 +112,10 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
     val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, 
staticOverwritePartitionPathOpt)
 
     val df = sparkSession.internalCreateDataFrame(query.execute(), 
query.schema)
-    val (success, commitInstantTime, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df)
+    val (structName, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(catalogTable.tableName)
+    val schema = convertStructTypeToAvroSchema(catalogTable.tableSchema, 
structName, namespace)
+    val (success, commitInstantTime, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df,
+      schemaFromCatalog = Option.apply(schema))
 
     if (!success) {
       throw new HoodieException("Insert Into to Hudi table failed")
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 8a1928f93294..18950662b264 100644
--- 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -494,7 +494,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       PAYLOAD_ORIGINAL_AVRO_PAYLOAD -> 
hoodieCatalogTable.tableConfig.getPayloadClass
     )
 
-    val (success, commitInstantTime, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, 
writeParams, sourceDF)
+    val (structName, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieCatalogTable.tableName)
+    val schema = convertStructTypeToAvroSchema(hoodieCatalogTable.tableSchema, 
structName, namespace)
+    val (success, commitInstantTime, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, 
writeParams, sourceDF,
+      schemaFromCatalog = Option.apply(schema))
     if (!success) {
       throw new HoodieException("Merge into Hoodie table command failed")
     }

Reply via email to