This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 3e8b56694bdb77190d139f94169f320a63ef04f6 Author: Lin Liu <[email protected]> AuthorDate: Fri Oct 17 15:23:09 2025 -0700 fix: Remove catalog access from SparkSQLWriter (#14083) --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 39 ++++++++-------------- .../command/InsertIntoHoodieTableCommand.scala | 8 +++-- .../hudi/command/MergeIntoHoodieTableCommand.scala | 5 ++- .../command/InsertIntoHoodieTableCommand.scala | 8 +++-- .../hudi/command/MergeIntoHoodieTableCommand.scala | 5 ++- 5 files changed, 34 insertions(+), 31 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 2640f55177ec..02546c243458 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -126,9 +126,11 @@ object HoodieSparkSqlWriter { optParams: Map[String, String], sourceDf: DataFrame, streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty, - hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): + hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty, + schemaFromCatalog: Option[Schema] = Option.empty): (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = { - new HoodieSparkSqlWriterInternal().write(sqlContext, mode, optParams, sourceDf, streamingWritesParamsOpt, hoodieWriteClient) + new HoodieSparkSqlWriterInternal().write(sqlContext, mode, optParams, sourceDf, streamingWritesParamsOpt, hoodieWriteClient, + schemaFromCatalog = schemaFromCatalog) } def bootstrap(sqlContext: SQLContext, @@ -175,7 +177,8 @@ class HoodieSparkSqlWriterInternal { optParams: Map[String, String], sourceDf: DataFrame, streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty, - hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): + hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty, + schemaFromCatalog: Option[Schema] = Option.empty): (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = { val retryWrite: () => (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = () => { @@ -186,7 +189,8 @@ class HoodieSparkSqlWriterInternal { while (counter <= maxRetry && !succeeded) { try { - toReturn = writeInternal(sqlContext, mode, optParams, sourceDf, streamingWritesParamsOpt, hoodieWriteClient) + toReturn = writeInternal(sqlContext, mode, optParams, sourceDf, streamingWritesParamsOpt, hoodieWriteClient, + schemaFromCatalog = schemaFromCatalog) if (counter > 0) { log.info(s"Write Succeeded after $counter attempts") } @@ -212,7 +216,8 @@ class HoodieSparkSqlWriterInternal { optParams: Map[String, String], sourceDf: DataFrame, streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty, - hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): + hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty, + schemaFromCatalog: Option[Schema] = Option.empty): (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = { assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set") @@ -353,7 +358,7 @@ class HoodieSparkSqlWriterInternal { classOf[Schema])) val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean - val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, tableMetaClient) + val latestTableSchemaOpt = getLatestTableSchema(tableMetaClient, schemaFromCatalog) val df = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto || sourceDf.isStreaming) { sourceDf } else { @@ -669,26 +674,10 @@ class HoodieSparkSqlWriterInternal { sparkContext.getConf.registerAvroSchemas(targetAvroSchemas: _*) } - private def getLatestTableSchema(spark: SparkSession, - tableId: TableIdentifier, - tableMetaClient: HoodieTableMetaClient): Option[Schema] = { + private def getLatestTableSchema(tableMetaClient: HoodieTableMetaClient, schemaFromCatalog: Option[Schema]): Option[Schema] = { val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - val latestTableSchemaFromCommitMetadata = - toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) - latestTableSchemaFromCommitMetadata.orElse { - getCatalogTable(spark, tableId).map { catalogTable => - val (structName, namespace) = getAvroRecordNameAndNamespace(tableId.table) - convertStructTypeToAvroSchema(catalogTable.schema, structName, namespace) - } - } - } - - private def getCatalogTable(spark: SparkSession, tableId: TableIdentifier): Option[CatalogTable] = { - if (spark.sessionState.catalog.tableExists(tableId)) { - Some(spark.sessionState.catalog.getTableMetadata(tableId)) - } else { - None - } + toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) + .orElse(schemaFromCatalog) } def bootstrap(sqlContext: SQLContext, diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index da6285a47d0a..454f2a9efbca 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport} +import org.apache.hudi.{AvroConversionUtils, HoodieSparkSqlWriter, SparkAdapterSupport} +import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema import org.apache.hudi.exception.HoodieException import org.apache.spark.internal.Logging @@ -111,7 +112,10 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, staticOverwritePartitionPathOpt) val df = sparkSession.internalCreateDataFrame(query.execute(), query.schema) - val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df) + val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(catalogTable.tableName) + val schema = convertStructTypeToAvroSchema(catalogTable.tableSchema, structName, namespace) + val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df, + schemaFromCatalog = Option.apply(schema)) if (!success) { throw new HoodieException("Insert Into to Hudi table failed") diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index ae8a5e054ed9..d286f97bcdd1 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -493,7 +493,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie PAYLOAD_ORIGINAL_AVRO_PAYLOAD -> hoodieCatalogTable.tableConfig.getPayloadClass ) - val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDF) + val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieCatalogTable.tableName) + val schema = convertStructTypeToAvroSchema(hoodieCatalogTable.tableSchema, structName, namespace) + val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDF, + schemaFromCatalog = Option.apply(schema)) if (!success) { throw new HoodieException("Merge into Hoodie table command failed") } diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 35e1dadbbe3a..b6585d097a95 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport} +import org.apache.hudi.{AvroConversionUtils, HoodieSparkSqlWriter, SparkAdapterSupport} +import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema import org.apache.hudi.exception.HoodieException import org.apache.spark.internal.Logging @@ -111,7 +112,10 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, staticOverwritePartitionPathOpt) val df = sparkSession.internalCreateDataFrame(query.execute(), query.schema) - val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df) + val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(catalogTable.tableName) + val schema = convertStructTypeToAvroSchema(catalogTable.tableSchema, structName, namespace) + val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df, + schemaFromCatalog = Option.apply(schema)) if (!success) { throw new HoodieException("Insert Into to Hudi table failed") diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 8a1928f93294..18950662b264 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -494,7 +494,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie PAYLOAD_ORIGINAL_AVRO_PAYLOAD -> hoodieCatalogTable.tableConfig.getPayloadClass ) - val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDF) + val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieCatalogTable.tableName) + val schema = convertStructTypeToAvroSchema(hoodieCatalogTable.tableSchema, structName, namespace) + val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDF, + schemaFromCatalog = Option.apply(schema)) if (!success) { throw new HoodieException("Merge into Hoodie table command failed") }
