This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bf723f56cd0 [HUDI-7486] Classify schema exceptions when converting 
from avro to spark row representation (#10778)
bf723f56cd0 is described below

commit bf723f56cd0d379f951a5a2d535502f326d1bc78
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Apr 3 08:50:12 2024 -0400

    [HUDI-7486] Classify schema exceptions when converting from avro to spark 
row representation (#10778)
    
    * make exceptions more specific
    
    * use hudi avro exception
    
    * Address review comments
    
    * fix unnecessary changes
    
    * add exception wrapping
    
    * style
    
    * address review comments
    
    * remove . from config
    
    * address review comments
    
    * fix merge
    
    * fix checkstyle
    
    * Update 
hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java
    
    Co-authored-by: Y Ethan Guo <[email protected]>
    
    * Update 
hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java
    
    Co-authored-by: Y Ethan Guo <[email protected]>
    
    * add javadoc to exception wrapper
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../org/apache/hudi/AvroConversionUtils.scala      | 14 +++++--
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   | 20 ++++++---
 .../hudi/util/ExceptionWrappingIterator.scala      | 44 +++++++++++++++++++
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 10 ++---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 25 +++++++----
 .../hudi/exception/HoodieAvroSchemaException.java  | 31 ++++++++++++++
 .../exception/HoodieRecordCreationException.java   | 32 ++++++++++++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 14 ++++---
 .../utilities/config/HoodieStreamerConfig.java     |  7 ++++
 .../apache/hudi/utilities/sources/RowSource.java   |  9 +++-
 .../utilities/streamer/HoodieStreamerUtils.java    | 24 +++++++----
 .../utilities/streamer/SourceFormatAdapter.java    |  9 +++-
 .../hudi/utilities/sources/TestAvroDFSSource.java  |  3 +-
 .../hudi/utilities/sources/TestCsvDFSSource.java   |  3 +-
 .../hudi/utilities/sources/TestJsonDFSSource.java  | 49 +++++++++++++++++++++-
 .../utilities/sources/TestParquetDFSSource.java    |  3 +-
 .../sources/AbstractDFSSourceTestBase.java         |  7 +++-
 17 files changed, 257 insertions(+), 47 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 55877938f8c..95962d1ca44 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericRecord
 import org.apache.avro.{JsonProperties, Schema}
 import org.apache.hudi.HoodieSparkUtils.sparkAdapter
 import org.apache.hudi.avro.AvroSchemaUtils
+import org.apache.hudi.exception.SchemaCompatibilityException
 import org.apache.hudi.internal.schema.HoodieSchemaException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -58,9 +59,16 @@ object AvroConversionUtils {
    */
   def createInternalRowToAvroConverter(rootCatalystType: StructType, 
rootAvroType: Schema, nullable: Boolean): InternalRow => GenericRecord = {
     val serializer = sparkAdapter.createAvroSerializer(rootCatalystType, 
rootAvroType, nullable)
-    row => serializer
-      .serialize(row)
-      .asInstanceOf[GenericRecord]
+    row => {
+      try {
+        serializer
+          .serialize(row)
+          .asInstanceOf[GenericRecord]
+      } catch {
+        case e: HoodieSchemaException => throw e
+        case e => throw new SchemaCompatibilityException("Failed to convert 
spark record into avro record", e)
+      }
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 03d977f6fc9..6de5de8842e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -18,25 +18,25 @@
 
 package org.apache.hudi
 
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.fs.Path
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.hadoop.fs.CachingPath
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.hadoop.fs.Path
+import org.apache.hudi.util.ExceptionWrappingIterator
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
 import org.apache.spark.sql.execution.SQLConfInjectingRDD
 import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, HoodieUnsafeUtils}
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters._
@@ -131,6 +131,16 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
   def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
     new SQLConfInjectingRDD(rdd, conf)
 
+  def maybeWrapDataFrameWithException(df: DataFrame, exceptionClass: String, 
msg: String, shouldWrap: Boolean): DataFrame = {
+    if (shouldWrap) {
+      HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, 
injectSQLConf(df.queryExecution.toRdd.mapPartitions {
+        rows => new ExceptionWrappingIterator[InternalRow](rows, 
exceptionClass, msg)
+      }, SQLConf.get), df.schema)
+    } else {
+      df
+    }
+  }
+
   def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: 
String, reconcileToLatestSchema: Boolean,
                     latestTableSchema: 
org.apache.hudi.common.util.Option[Schema] = 
org.apache.hudi.common.util.Option.empty()):
   Tuple2[RDD[GenericRecord], RDD[String]] = {
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala
new file mode 100644
index 00000000000..994e6f0eea2
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.util
+
+import org.apache.hudi.common.util.ReflectionUtils
+
+/**
+ * Used to catch exceptions from an iterator
+ * @param in iterator to catch exceptions from
+ * @param exceptionClass name of exception class to throw when an exception is 
thrown during iteration
+ * @param msg message the thrown exception should have
+ */
+class ExceptionWrappingIterator[T](val in: Iterator[T], val exceptionClass: 
String, val msg: String) extends Iterator[T] {
+  override def hasNext: Boolean = try in.hasNext
+  catch {
+    case e: Throwable => throw createException(e)
+  }
+
+  override def next: T = try in.next
+  catch {
+    case e: Throwable => throw createException(e)
+  }
+
+  private def createException(e: Throwable): Throwable = {
+    ReflectionUtils.loadClass(exceptionClass, Array(classOf[String], 
classOf[Throwable]).asInstanceOf[Array[Class[_]]], msg, 
e).asInstanceOf[Throwable]
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 027f6ccb37d..ba747a63cbc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -19,12 +19,12 @@
 package org.apache.hudi.avro;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.exception.InvalidUnionTypeException;
 import org.apache.hudi.exception.MissingSchemaFieldException;
 import org.apache.hudi.exception.SchemaBackwardsCompatibilityException;
 import org.apache.hudi.exception.SchemaCompatibilityException;
-import org.apache.hudi.exception.InvalidUnionTypeException;
 
-import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaCompatibility;
 
@@ -317,7 +317,7 @@ public class AvroSchemaUtils {
             .orElse(null);
 
     if (nonNullType == null) {
-      throw new AvroRuntimeException(
+      throw new HoodieAvroSchemaException(
           String.format("Unsupported Avro UNION type %s: Only UNION of a null 
type and a non-null type is supported", schema));
     }
 
@@ -349,14 +349,14 @@ public class AvroSchemaUtils {
     List<Schema> innerTypes = schema.getTypes();
 
     if (innerTypes.size() != 2) {
-      throw new AvroRuntimeException(
+      throw new HoodieAvroSchemaException(
           String.format("Unsupported Avro UNION type %s: Only UNION of a null 
type and a non-null type is supported", schema));
     }
     Schema firstInnerType = innerTypes.get(0);
     Schema secondInnerType = innerTypes.get(1);
     if ((firstInnerType.getType() != Schema.Type.NULL && 
secondInnerType.getType() != Schema.Type.NULL)
         || (firstInnerType.getType() == Schema.Type.NULL && 
secondInnerType.getType() == Schema.Type.NULL)) {
-      throw new AvroRuntimeException(
+      throw new HoodieAvroSchemaException(
           String.format("Unsupported Avro UNION type %s: Only UNION of a null 
type and a non-null type is supported", schema));
     }
     return firstInnerType.getType() == Schema.Type.NULL ? secondInnerType : 
firstInnerType;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index b352099cb1e..a7b3f5ae197 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.SchemaCompatibilityException;
@@ -931,7 +932,9 @@ public class HoodieAvroUtils {
   private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, 
Schema oldSchema, Schema newSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
     switch (newSchema.getType()) {
       case RECORD:
-        ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, 
"cannot rewrite record with different type");
+        if (!(oldRecord instanceof IndexedRecord)) {
+          throw new SchemaCompatibilityException("cannot rewrite record with 
different type");
+        }
         IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
         List<Schema.Field> fields = newSchema.getFields();
         GenericData.Record newRecord = new GenericData.Record(newSchema);
@@ -963,15 +966,17 @@ public class HoodieAvroUtils {
         }
         return newRecord;
       case ENUM:
-        ValidationUtils.checkArgument(
-            oldSchema.getType() == Schema.Type.STRING || oldSchema.getType() 
== Schema.Type.ENUM,
-            "Only ENUM or STRING type can be converted ENUM type");
+        if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() 
!= Schema.Type.ENUM) {
+          throw new SchemaCompatibilityException(String.format("Only ENUM or 
STRING type can be converted ENUM type. Schema type was %s", 
oldSchema.getType().getName()));
+        }
         if (oldSchema.getType() == Schema.Type.STRING) {
           return new GenericData.EnumSymbol(newSchema, oldRecord);
         }
         return oldRecord;
       case ARRAY:
-        ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot 
rewrite record with different type");
+        if (!(oldRecord instanceof Collection)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as an array", oldRecord.getClass().getName()));
+        }
         Collection array = (Collection) oldRecord;
         List<Object> newArray = new ArrayList<>(array.size());
         fieldNames.push("element");
@@ -981,7 +986,9 @@ public class HoodieAvroUtils {
         fieldNames.pop();
         return newArray;
       case MAP:
-        ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot 
rewrite record with different type");
+        if (!(oldRecord instanceof Map)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as a map", oldRecord.getClass().getName()));
+        }
         Map<Object, Object> map = (Map<Object, Object>) oldRecord;
         Map<Object, Object> newMap = new HashMap<>(map.size(), 1.0f);
         fieldNames.push("value");
@@ -1029,7 +1036,7 @@ public class HoodieAvroUtils {
               BigDecimal bd = new BigDecimal(new BigInteger(bytes), 
decimal.getScale()).setScale(((Decimal) newSchema.getLogicalType()).getScale());
               return DECIMAL_CONVERSION.toFixed(bd, newSchema, 
newSchema.getLogicalType());
             } else {
-              throw new UnsupportedOperationException("Fixed type size change 
is not currently supported");
+              throw new HoodieAvroSchemaException("Fixed type size change is 
not currently supported");
             }
           }
 
@@ -1045,7 +1052,7 @@ public class HoodieAvroUtils {
           }
 
         default:
-          throw new AvroRuntimeException("Unknown schema type: " + 
newSchema.getType());
+          throw new HoodieAvroSchemaException("Unknown schema type: " + 
newSchema.getType());
       }
     } else {
       return rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, 
newSchema);
@@ -1130,7 +1137,7 @@ public class HoodieAvroUtils {
         break;
       default:
     }
-    throw new AvroRuntimeException(String.format("cannot support rewrite value 
for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
+    throw new HoodieAvroSchemaException(String.format("cannot support rewrite 
value for schema type: %s since the old schema type is: %s", newSchema, 
oldSchema));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java
new file mode 100644
index 00000000000..c19c88c15c8
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Thrown when we detect in Hudi code that a record schema
+ * violates Avro rules. This can happen even when using Spark
+ * because we use Avro schema internally
+ */
+public class HoodieAvroSchemaException extends SchemaCompatibilityException {
+  public HoodieAvroSchemaException(String message) {
+    super(message);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java
new file mode 100644
index 00000000000..dec70b369da
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Exception thrown during HoodieRecord construction for any failure
+ * that is not a KeyGeneration failure. An example of a failure would be if the
+ * record is malformed.
+ */
+public class HoodieRecordCreationException extends HoodieException {
+
+  public HoodieRecordCreationException(String message, Throwable t) {
+    super(message, t);
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index d9ac8ae798f..98c7c5d29f5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -51,7 +51,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, 
Option => HOption}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
 import 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, 
HoodieWriteConfig}
-import org.apache.hudi.exception.{HoodieException, 
HoodieWriteConflictException}
+import org.apache.hudi.exception.{HoodieException, 
HoodieRecordCreationException, HoodieWriteConflictException}
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
@@ -78,6 +78,7 @@ import java.util.function.BiConsumer
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters.setAsJavaSetConverter
 import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
 
 object HoodieSparkSqlWriter {
 
@@ -478,10 +479,13 @@ class HoodieSparkSqlWriterInternal {
             }
             instantTime = client.createNewInstantTime()
             // Convert to RDD[HoodieRecord]
-            val hoodieRecords =
-              
HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
-                writeConfig, parameters, avroRecordName, avroRecordNamespace, 
writerSchema,
-                processedDataSchema, operation, instantTime, 
preppedSparkSqlWrites, preppedSparkSqlMergeInto, preppedWriteOperation))
+            val hoodieRecords = 
Try(HoodieCreateRecordUtils.createHoodieRecordRdd(
+              HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, 
writeConfig, parameters, avroRecordName,
+                avroRecordNamespace, writerSchema, processedDataSchema, 
operation, instantTime, preppedSparkSqlWrites,
+                preppedSparkSqlMergeInto, preppedWriteOperation))) match {
+              case Success(recs) => recs
+              case Failure(e) => throw new 
HoodieRecordCreationException("Failed to create Hoodie Spark Record", e)
+            }
 
             val dedupedHoodieRecords =
               if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) && operation != 
WriteOperationType.INSERT_OVERWRITE_TABLE && operation != 
WriteOperationType.INSERT_OVERWRITE) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
index b3b64cff905..e50e7fa0612 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
@@ -132,4 +132,11 @@ public class HoodieStreamerConfig extends HoodieConfig {
       .sinceVersion("0.14.0")
       .withDocumentation("Number of records to sample from the first write. To 
improve the estimation's accuracy, "
           + "for smaller or more compressable record size, set the sample size 
bigger. For bigger or less compressable record size, set smaller.");
+
+  public static final ConfigProperty<Boolean> ROW_THROW_EXPLICIT_EXCEPTIONS = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + "row.throw.explicit.exceptions")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("0.15.0")
+      .withDocumentation("When enabled, the dataframe generated from reading 
source data is wrapped with an exception handler to explicitly surface 
exceptions.");
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
index f2cc48f280c..1c7e9d99098 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
@@ -18,10 +18,13 @@
 
 package org.apache.hudi.utilities.sources;
 
+import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
 import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
@@ -30,6 +33,8 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import static 
org.apache.hudi.utilities.config.HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS;
+
 public abstract class RowSource extends Source<Dataset<Row>> {
 
   public RowSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
@@ -46,7 +51,9 @@ public abstract class RowSource extends Source<Dataset<Row>> {
       Dataset<Row> sanitizedRows = 
SanitizationUtils.sanitizeColumnNamesForAvro(dsr, props);
       SchemaProvider rowSchemaProvider =
           UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(), 
props, sparkContext);
-      return new InputBatch<>(Option.of(sanitizedRows), res.getValue(), 
rowSchemaProvider);
+      Dataset<Row> wrappedDf = 
HoodieSparkUtils.maybeWrapDataFrameWithException(sanitizedRows, 
HoodieReadFromSourceException.class.getName(),
+          "Failed to read from row source", 
ConfigUtils.getBooleanWithAltKeys(props, ROW_THROW_EXPLICIT_EXCEPTIONS));
+      return new InputBatch<>(Option.of(wrappedDf), res.getValue(), 
rowSchemaProvider);
     }).orElseGet(() -> new InputBatch<>(res.getKey(), res.getValue()));
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 90315bc9764..61d7793e6ad 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -36,6 +36,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.exception.HoodieRecordCreationException;
 import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -104,10 +107,7 @@ public class HoodieStreamerUtils {
                       : DataSourceUtils.createPayload(cfg.payloadClassName, 
gr);
                   avroRecords.add(Either.left(new 
HoodieAvroRecord<>(hoodieKey, payload)));
                 } catch (Exception e) {
-                  if (!shouldErrorTable) {
-                    throw e;
-                  }
-                  avroRecords.add(generateErrorRecord(genRec));
+                  avroRecords.add(generateErrorRecordOrThrowException(genRec, 
e, shouldErrorTable));
                 }
               }
               return avroRecords.iterator();
@@ -135,10 +135,7 @@ public class HoodieStreamerUtils {
               return Either.left(new HoodieSparkRecord(new 
HoodieKey(recordKey, partitionPath),
                   
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false));
             } catch (Exception e) {
-              if (!shouldErrorTable) {
-                throw e;
-              }
-              return generateErrorRecord(rec);
+              return generateErrorRecordOrThrowException(rec, e, 
shouldErrorTable);
             }
           });
 
@@ -159,7 +156,16 @@ public class HoodieStreamerUtils {
    * @return the representation of error record (empty {@link HoodieRecord} 
and the error record
    * String) for writing to error table.
    */
-  private static Either<HoodieRecord, String> 
generateErrorRecord(GenericRecord genRec) {
+  private static Either<HoodieRecord, String> 
generateErrorRecordOrThrowException(GenericRecord genRec, Exception e, boolean 
shouldErrorTable) {
+    if (!shouldErrorTable) {
+      if (e instanceof HoodieKeyException) {
+        throw (HoodieKeyException) e;
+      } else if (e instanceof HoodieKeyGeneratorException) {
+        throw (HoodieKeyGeneratorException) e;
+      } else {
+        throw new HoodieRecordCreationException("Failed to create Hoodie 
Record", e);
+      }
+    }
     try {
       return Either.right(HoodieAvroUtils.avroToJsonString(genRec, false));
     } catch (Exception ex) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index 1796c96dab8..c379472b26e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -23,8 +23,10 @@ import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.MercifulJsonConverter;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -53,6 +55,7 @@ import java.util.stream.Collectors;
 
 import scala.util.Either;
 
+import static 
org.apache.hudi.utilities.config.HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS;
 import static 
org.apache.hudi.utilities.config.HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES;
 import static 
org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
 import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -66,6 +69,8 @@ public class SourceFormatAdapter implements Closeable {
 
   private final Source source;
   private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue();
+
+  private  boolean wrapWithException = 
ROW_THROW_EXPLICIT_EXCEPTIONS.defaultValue();
   private String invalidCharMask = 
SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue();
 
   private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
@@ -80,6 +85,7 @@ public class SourceFormatAdapter implements Closeable {
     if (props.isPresent()) {
       this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get());
       this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get());
+      this.wrapWithException = ConfigUtils.getBooleanWithAltKeys(props.get(), 
ROW_THROW_EXPLICIT_EXCEPTIONS);
     }
     if (this.shouldSanitize && source.getSourceType() == 
Source.SourceType.PROTO) {
       throw new IllegalArgumentException("PROTO cannot be sanitized");
@@ -244,7 +250,8 @@ public class SourceFormatAdapter implements Closeable {
           StructType dataType = 
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
           return new InputBatch<>(
               Option.ofNullable(
-                  r.getBatch().map(rdd -> 
source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
+                  r.getBatch().map(rdd -> 
HoodieSparkUtils.maybeWrapDataFrameWithException(source.getSparkSession().read().schema(dataType).json(rdd),
+                      SchemaCompatibilityException.class.getName(), "Schema 
does not match json data", wrapWithException)).orElse(null)),
               r.getCheckpointForNextBatch(), r.getSchemaProvider());
         }
       }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
index 5ccf9ad2b29..808a4ca57ce 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
@@ -39,8 +39,7 @@ public class TestAvroDFSSource extends 
AbstractDFSSourceTestBase {
   }
 
   @Override
-  protected Source prepareDFSSource() {
-    TypedProperties props = new TypedProperties();
+  protected Source prepareDFSSource(TypedProperties props) {
     props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
     try {
       return new AvroDFSSource(props, jsc, sparkSession, schemaProvider);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
index 6a2bbcd0136..c4bb59ff812 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
@@ -46,8 +46,7 @@ public class TestCsvDFSSource extends 
AbstractDFSSourceTestBase {
   }
 
   @Override
-  public Source prepareDFSSource() {
-    TypedProperties props = new TypedProperties();
+  public Source prepareDFSSource(TypedProperties props) {
     props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
     props.setProperty("hoodie.streamer.csv.header", Boolean.toString(true));
     props.setProperty("hoodie.streamer.csv.sep", "\t");
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
index 24a341fe9c3..ae134e862be 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
@@ -20,15 +20,29 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.SchemaCompatibilityException;
+import org.apache.hudi.utilities.config.HoodieStreamerConfig;
+import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.List;
 
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 /**
  * Basic tests for {@link JsonDFSSource}.
  */
@@ -42,8 +56,7 @@ public class TestJsonDFSSource extends 
AbstractDFSSourceTestBase {
   }
 
   @Override
-  public Source prepareDFSSource() {
-    TypedProperties props = new TypedProperties();
+  public Source prepareDFSSource(TypedProperties props) {
     props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
     return new JsonDFSSource(props, jsc, sparkSession, schemaProvider);
   }
@@ -53,4 +66,36 @@ public class TestJsonDFSSource extends 
AbstractDFSSourceTestBase {
     UtilitiesTestBase.Helpers.saveStringsToDFS(
         Helpers.jsonifyRecords(records), fs, path.toString());
   }
+
+  @Test
+  public void testCorruptedSourceFile() throws IOException {
+    fs.mkdirs(new Path(dfsRoot));
+    TypedProperties props = new TypedProperties();
+    
props.setProperty(HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS.key(), 
"true");
+    SourceFormatAdapter sourceFormatAdapter = new 
SourceFormatAdapter(prepareDFSSource(props), Option.empty(), Option.of(props));
+    generateOneFile("1", "000", 10);
+    generateOneFile("2", "000", 10);
+    RemoteIterator<LocatedFileStatus> files = 
fs.listFiles(generateOneFile("3", "000", 10), true);
+
+    FileStatus file1Status = files.next();
+    InputBatch<Dataset<Row>> batch = 
sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
+    corruptFile(file1Status.getPath());
+    assertTrue(batch.getBatch().isPresent());
+    Throwable t = assertThrows(Exception.class,
+        () -> batch.getBatch().get().show(30));
+    while (t != null) {
+      if (t instanceof SchemaCompatibilityException) {
+        return;
+      }
+      t = t.getCause();
+    }
+    throw new AssertionError("Exception does not have SchemaCompatibility in 
its trace", t);
+  }
+
+  protected void corruptFile(Path path) throws IOException {
+    PrintStream os = new PrintStream(fs.appendFile(path).build());
+    os.println("🤷‍");
+    os.flush();
+    os.close();
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
index 159ababcf47..a9c448748c9 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
@@ -41,8 +41,7 @@ public class TestParquetDFSSource extends 
AbstractDFSSourceTestBase {
   }
 
   @Override
-  public Source prepareDFSSource() {
-    TypedProperties props = new TypedProperties();
+  public Source prepareDFSSource(TypedProperties props) {
     props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
     return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider);
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
index 0de087ece73..76a1a645367 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.testutils.sources;
 
 import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
@@ -74,7 +75,11 @@ public abstract class AbstractDFSSourceTestBase extends 
UtilitiesTestBase {
    *
    * @return A {@link Source} using DFS as the file system.
    */
-  protected abstract Source prepareDFSSource();
+  protected final Source prepareDFSSource() {
+    return prepareDFSSource(new TypedProperties());
+  }
+
+  protected abstract Source prepareDFSSource(TypedProperties props);
 
   /**
    * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file 
on DFS.


Reply via email to