This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 245574326ccb5a8d0f21667ae1776b200254e41a Author: Sivabalan Narayanan <[email protected]> AuthorDate: Wed Dec 7 09:17:27 2022 -0800 [HUDI-5163] Fix failure handling with spark datasource write (#7140) --- .../src/main/scala/org/apache/hudi/DefaultSource.scala | 8 ++++++-- .../sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala | 6 +++++- .../spark/sql/hudi/command/TruncateHoodieTableCommand.scala | 6 +++++- .../spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala | 4 ++++ .../spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala | 7 ++++++- 5 files changed, 26 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 5641a622cbc..160e71a6ec4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -140,11 +140,15 @@ class DefaultSource extends RelationProvider if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) { HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols) + HoodieSparkSqlWriter.cleanup() } else { - HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols) + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols) + HoodieSparkSqlWriter.cleanup() + if (!success) { + throw new HoodieException("Write to Hudi failed") + } } - HoodieSparkSqlWriter.cleanup() new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index 628f383b690..c6aa2e7aeda 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.HoodieSparkSqlWriter +import org.apache.hudi.exception.HoodieException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable @@ -59,11 +60,14 @@ case class AlterHoodieTableDropPartitionCommand( // delete partition files by enabling cleaner and setting retention policies. val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs) val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop) - HoodieSparkSqlWriter.write( + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write( sparkSession.sqlContext, SaveMode.Append, parameters, sparkSession.emptyDataFrame) + if (!success) { + throw new HoodieException("Alter table command failed") + } sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) logInfo(s"Finish execute alter table drop partition command for $fullTableName") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala index f5349ee5fee..05f96efdae5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala @@ -22,6 +22,7 @@ import org.apache.hudi.HoodieSparkSqlWriter import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.exception.HoodieException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable} @@ -85,11 +86,14 @@ case class TruncateHoodieTableCommand( // drop partitions to lazy clean val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs) val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop) - HoodieSparkSqlWriter.write( + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write( sparkSession.sqlContext, SaveMode.Append, parameters, sparkSession.emptyDataFrame) + if (!success) { + throw new HoodieException("Truncate Hoodie Table command failed") + } } // After deleting the data, refresh the table to make sure we don't keep around a stale diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 8bd81df3d27..125e8028020 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -100,6 +100,10 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery)) + if (!success) { + throw new HoodieException("Insert Into to Hudi table failed") + } + if (success && refreshTable) { sparkSession.catalog.refreshTable(table.identifier.unquotedString) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 9919062cac7..73b20e6f993 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -21,7 +21,9 @@ import org.apache.avro.Schema import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TBL_NAME} import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME +import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport} @@ -348,7 +350,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // Remove the meta fields from the sourceDF as we do not need these when writing. val sourceDFWithoutMetaFields = removeMetaFields(sourceDF) - HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDFWithoutMetaFields) + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDFWithoutMetaFields) + if (!success) { + throw new HoodieException("Merge into Hoodie table command failed") + } } private def checkUpdateAssignments(updateActions: Seq[UpdateAction]): Unit = {
