This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8491d6ac76f6 fix: Remove catalog access from SparkSQLWriter (#14083)
8491d6ac76f6 is described below
commit 8491d6ac76f6aba3d9c663b22705309c9a988ef5
Author: Lin Liu <[email protected]>
AuthorDate: Fri Oct 17 15:23:09 2025 -0700
fix: Remove catalog access from SparkSQLWriter (#14083)
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 39 ++++++++--------------
.../command/InsertIntoHoodieTableCommand.scala | 8 +++--
.../hudi/command/MergeIntoHoodieTableCommand.scala | 5 ++-
.../command/InsertIntoHoodieTableCommand.scala | 8 +++--
.../hudi/command/MergeIntoHoodieTableCommand.scala | 5 ++-
5 files changed, 34 insertions(+), 31 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 2640f55177ec..02546c243458 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -126,9 +126,11 @@ object HoodieSparkSqlWriter {
optParams: Map[String, String],
sourceDf: DataFrame,
streamingWritesParamsOpt: Option[StreamingWriteParams] =
Option.empty,
- hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
+ hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,
+ schemaFromCatalog: Option[Schema] = Option.empty):
(Boolean, HOption[String], HOption[String], HOption[String],
SparkRDDWriteClient[_], HoodieTableConfig) = {
- new HoodieSparkSqlWriterInternal().write(sqlContext, mode, optParams,
sourceDf, streamingWritesParamsOpt, hoodieWriteClient)
+ new HoodieSparkSqlWriterInternal().write(sqlContext, mode, optParams,
sourceDf, streamingWritesParamsOpt, hoodieWriteClient,
+ schemaFromCatalog = schemaFromCatalog)
}
def bootstrap(sqlContext: SQLContext,
@@ -175,7 +177,8 @@ class HoodieSparkSqlWriterInternal {
optParams: Map[String, String],
sourceDf: DataFrame,
streamingWritesParamsOpt: Option[StreamingWriteParams] =
Option.empty,
- hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
+ hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,
+ schemaFromCatalog: Option[Schema] = Option.empty):
(Boolean, HOption[String], HOption[String], HOption[String],
SparkRDDWriteClient[_], HoodieTableConfig) = {
val retryWrite: () => (Boolean, HOption[String], HOption[String],
HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = () => {
@@ -186,7 +189,8 @@ class HoodieSparkSqlWriterInternal {
while (counter <= maxRetry && !succeeded) {
try {
- toReturn = writeInternal(sqlContext, mode, optParams, sourceDf,
streamingWritesParamsOpt, hoodieWriteClient)
+ toReturn = writeInternal(sqlContext, mode, optParams, sourceDf,
streamingWritesParamsOpt, hoodieWriteClient,
+ schemaFromCatalog = schemaFromCatalog)
if (counter > 0) {
log.info(s"Write Succeeded after $counter attempts")
}
@@ -212,7 +216,8 @@ class HoodieSparkSqlWriterInternal {
optParams: Map[String, String],
sourceDf: DataFrame,
streamingWritesParamsOpt:
Option[StreamingWriteParams] = Option.empty,
- hoodieWriteClient: Option[SparkRDDWriteClient[_]]
= Option.empty):
+ hoodieWriteClient: Option[SparkRDDWriteClient[_]]
= Option.empty,
+ schemaFromCatalog: Option[Schema] = Option.empty):
(Boolean, HOption[String], HOption[String], HOption[String],
SparkRDDWriteClient[_], HoodieTableConfig) = {
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)),
"'path' must be set")
@@ -353,7 +358,7 @@ class HoodieSparkSqlWriterInternal {
classOf[Schema]))
val shouldReconcileSchema =
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
- val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier,
tableMetaClient)
+ val latestTableSchemaOpt = getLatestTableSchema(tableMetaClient,
schemaFromCatalog)
val df = if (preppedWriteOperation || preppedSparkSqlWrites ||
preppedSparkSqlMergeInto || sourceDf.isStreaming) {
sourceDf
} else {
@@ -669,26 +674,10 @@ class HoodieSparkSqlWriterInternal {
sparkContext.getConf.registerAvroSchemas(targetAvroSchemas: _*)
}
- private def getLatestTableSchema(spark: SparkSession,
- tableId: TableIdentifier,
- tableMetaClient: HoodieTableMetaClient):
Option[Schema] = {
+ private def getLatestTableSchema(tableMetaClient: HoodieTableMetaClient,
schemaFromCatalog: Option[Schema]): Option[Schema] = {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
- val latestTableSchemaFromCommitMetadata =
-
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
- latestTableSchemaFromCommitMetadata.orElse {
- getCatalogTable(spark, tableId).map { catalogTable =>
- val (structName, namespace) =
getAvroRecordNameAndNamespace(tableId.table)
- convertStructTypeToAvroSchema(catalogTable.schema, structName,
namespace)
- }
- }
- }
-
- private def getCatalogTable(spark: SparkSession, tableId: TableIdentifier):
Option[CatalogTable] = {
- if (spark.sessionState.catalog.tableExists(tableId)) {
- Some(spark.sessionState.catalog.getTableMetadata(tableId))
- } else {
- None
- }
+
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
+ .orElse(schemaFromCatalog)
}
def bootstrap(sqlContext: SQLContext,
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index da6285a47d0a..454f2a9efbca 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.hudi.command
-import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
+import org.apache.hudi.{AvroConversionUtils, HoodieSparkSqlWriter,
SparkAdapterSupport}
+import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
import org.apache.hudi.exception.HoodieException
import org.apache.spark.internal.Logging
@@ -111,7 +112,10 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
val config = buildHoodieInsertConfig(catalogTable, sparkSession,
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions,
staticOverwritePartitionPathOpt)
val df = sparkSession.internalCreateDataFrame(query.execute(),
query.schema)
- val (success, commitInstantTime, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df)
+ val (structName, namespace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(catalogTable.tableName)
+ val schema = convertStructTypeToAvroSchema(catalogTable.tableSchema,
structName, namespace)
+ val (success, commitInstantTime, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df,
+ schemaFromCatalog = Option.apply(schema))
if (!success) {
throw new HoodieException("Insert Into to Hudi table failed")
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index ae8a5e054ed9..d286f97bcdd1 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -493,7 +493,10 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
PAYLOAD_ORIGINAL_AVRO_PAYLOAD ->
hoodieCatalogTable.tableConfig.getPayloadClass
)
- val (success, commitInstantTime, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append,
writeParams, sourceDF)
+ val (structName, namespace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieCatalogTable.tableName)
+ val schema = convertStructTypeToAvroSchema(hoodieCatalogTable.tableSchema,
structName, namespace)
+ val (success, commitInstantTime, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append,
writeParams, sourceDF,
+ schemaFromCatalog = Option.apply(schema))
if (!success) {
throw new HoodieException("Merge into Hoodie table command failed")
}
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 35e1dadbbe3a..b6585d097a95 100644
---
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.hudi.command
-import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
+import org.apache.hudi.{AvroConversionUtils, HoodieSparkSqlWriter,
SparkAdapterSupport}
+import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
import org.apache.hudi.exception.HoodieException
import org.apache.spark.internal.Logging
@@ -111,7 +112,10 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
val config = buildHoodieInsertConfig(catalogTable, sparkSession,
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions,
staticOverwritePartitionPathOpt)
val df = sparkSession.internalCreateDataFrame(query.execute(),
query.schema)
- val (success, commitInstantTime, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df)
+ val (structName, namespace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(catalogTable.tableName)
+ val schema = convertStructTypeToAvroSchema(catalogTable.tableSchema,
structName, namespace)
+ val (success, commitInstantTime, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df,
+ schemaFromCatalog = Option.apply(schema))
if (!success) {
throw new HoodieException("Insert Into to Hudi table failed")
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 8a1928f93294..18950662b264 100644
---
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -494,7 +494,10 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
PAYLOAD_ORIGINAL_AVRO_PAYLOAD ->
hoodieCatalogTable.tableConfig.getPayloadClass
)
- val (success, commitInstantTime, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append,
writeParams, sourceDF)
+ val (structName, namespace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieCatalogTable.tableName)
+ val schema = convertStructTypeToAvroSchema(hoodieCatalogTable.tableSchema,
structName, namespace)
+ val (success, commitInstantTime, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append,
writeParams, sourceDF,
+ schemaFromCatalog = Option.apply(schema))
if (!success) {
throw new HoodieException("Merge into Hoodie table command failed")
}