This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 245574326ccb5a8d0f21667ae1776b200254e41a
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Dec 7 09:17:27 2022 -0800

    [HUDI-5163] Fix failure handling with spark datasource write (#7140)
---
 .../src/main/scala/org/apache/hudi/DefaultSource.scala            | 8 ++++++--
 .../sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala   | 6 +++++-
 .../spark/sql/hudi/command/TruncateHoodieTableCommand.scala       | 6 +++++-
 .../spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala     | 4 ++++
 .../spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala      | 7 ++++++-
 5 files changed, 26 insertions(+), 5 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 5641a622cbc..160e71a6ec4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -140,11 +140,15 @@ class DefaultSource extends RelationProvider
 
     if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
       HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, 
dfWithoutMetaCols)
+      HoodieSparkSqlWriter.cleanup()
     } else {
-      HoodieSparkSqlWriter.write(sqlContext, mode, optParams, 
dfWithoutMetaCols)
+      val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sqlContext, 
mode, optParams, dfWithoutMetaCols)
+      HoodieSparkSqlWriter.cleanup()
+      if (!success) {
+        throw new HoodieException("Write to Hudi failed")
+      }
     }
 
-    HoodieSparkSqlWriter.cleanup()
     new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
index 628f383b690..c6aa2e7aeda 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hudi.command
 
 import org.apache.hudi.HoodieSparkSqlWriter
+import org.apache.hudi.exception.HoodieException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
@@ -59,11 +60,14 @@ case class AlterHoodieTableDropPartitionCommand(
     // delete partition files by enabling cleaner and setting retention 
policies.
     val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, 
normalizedSpecs)
     val parameters = buildHoodieDropPartitionsConfig(sparkSession, 
hoodieCatalogTable, partitionsToDrop)
-    HoodieSparkSqlWriter.write(
+    val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(
       sparkSession.sqlContext,
       SaveMode.Append,
       parameters,
       sparkSession.emptyDataFrame)
+    if (!success) {
+      throw new HoodieException("Alter table command failed")
+    }
 
     sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
     logInfo(s"Finish execute alter table drop partition command for 
$fullTableName")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
index f5349ee5fee..05f96efdae5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.HoodieSparkSqlWriter
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.exception.HoodieException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, 
HoodieCatalogTable}
@@ -85,11 +86,14 @@ case class TruncateHoodieTableCommand(
       // drop partitions to lazy clean
       val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, 
normalizedSpecs)
       val parameters = buildHoodieDropPartitionsConfig(sparkSession, 
hoodieCatalogTable, partitionsToDrop)
-      HoodieSparkSqlWriter.write(
+      val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(
         sparkSession.sqlContext,
         SaveMode.Append,
         parameters,
         sparkSession.emptyDataFrame)
+      if (!success) {
+        throw new HoodieException("Truncate Hoodie Table command failed")
+      }
     }
 
     // After deleting the data, refresh the table to make sure we don't keep 
around a stale
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 8bd81df3d27..125e8028020 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -100,6 +100,10 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
 
     val (success, _, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, 
Dataset.ofRows(sparkSession, alignedQuery))
 
+    if (!success) {
+      throw new HoodieException("Insert Into to Hudi table failed")
+    }
+
     if (success && refreshTable) {
       sparkSession.catalog.refreshTable(table.identifier.unquotedString)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 9919062cac7..73b20e6f993 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -21,7 +21,9 @@ import org.apache.avro.Schema
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, 
TBL_NAME}
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSparkSqlWriter, SparkAdapterSupport}
@@ -348,7 +350,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
 
     // Remove the meta fields from the sourceDF as we do not need these when 
writing.
     val sourceDFWithoutMetaFields = removeMetaFields(sourceDF)
-    HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, 
writeParams, sourceDFWithoutMetaFields)
+    val (success, _, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, 
writeParams, sourceDFWithoutMetaFields)
+    if (!success) {
+      throw new HoodieException("Merge into Hoodie table command failed")
+    }
   }
 
   private def checkUpdateAssignments(updateActions: Seq[UpdateAction]): Unit = 
{

Reply via email to