yihua commented on code in PR #10778:
URL: https://github.com/apache/hudi/pull/10778#discussion_r1542096043


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java:
##########
@@ -46,7 +50,9 @@ protected final InputBatch<Dataset<Row>> 
fetchNewData(Option<String> lastCkptStr
       Dataset<Row> sanitizedRows = 
SanitizationUtils.sanitizeColumnNamesForAvro(dsr, props);
       SchemaProvider rowSchemaProvider =
           UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(), 
props, sparkContext);
-      return new InputBatch<>(Option.of(sanitizedRows), res.getValue(), 
rowSchemaProvider);
+      Dataset<Row> wrappedDf = 
HoodieSparkUtils.maybeWrapDataFrameWithException(sanitizedRows, 
HoodieReadFromSourceException.class.getName(),
+          "Failed to read from row source", 
props.getBoolean(EXTRA_ROW_SOURCE_EXCEPTIONS.key(), 
EXTRA_ROW_SOURCE_EXCEPTIONS.defaultValue()));

Review Comment:
   Use `ConfigUtils#getBooleanWithAltKeys` instead of directly referencing the 
config key.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.util
+
+import org.apache.hudi.common.util.ReflectionUtils
+
+class ExceptionWrappingIterator[T](var in: Iterator[T], var exceptionClass: 
String, var msg: String) extends Iterator[T] {

Review Comment:
   all the arguments can be `val`?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java:
##########
@@ -132,4 +132,11 @@ public class HoodieStreamerConfig extends HoodieConfig {
       .sinceVersion("0.14.0")
       .withDocumentation("Number of records to sample from the first write. To 
improve the estimation's accuracy, "
           + "for smaller or more compressable record size, set the sample size 
bigger. For bigger or less compressable record size, set smaller.");
+
+  public static final ConfigProperty<Boolean> EXTRA_ROW_SOURCE_EXCEPTIONS = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + "extra.row.source.exceptions")

Review Comment:
   ```suggestion
         .key(STREAMER_CONFIG_PREFIX + ".row.throw.explicit.exception")
   ```



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -131,6 +132,16 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
   def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
     new SQLConfInjectingRDD(rdd, conf)
 
+  def maybeWrapDataFrameWithException(df: DataFrame, exceptionClass: String, 
msg: String, shouldWrap: Boolean): DataFrame = {
+    if (shouldWrap) {
+      HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, 
injectSQLConf(df.queryExecution.toRdd.mapPartitions {

Review Comment:
   Is SQLConf injection necessary here?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java:
##########
@@ -80,6 +84,7 @@ public SourceFormatAdapter(Source source, 
Option<BaseErrorTableWriter> errorTabl
     if (props.isPresent()) {
       this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get());
       this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get());
+      this.wrapWithException = 
props.get().getBoolean(EXTRA_ROW_SOURCE_EXCEPTIONS.key(), 
EXTRA_ROW_SOURCE_EXCEPTIONS.defaultValue());

Review Comment:
   similar here for fetching the config.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java:
##########
@@ -132,4 +132,11 @@ public class HoodieStreamerConfig extends HoodieConfig {
       .sinceVersion("0.14.0")
       .withDocumentation("Number of records to sample from the first write. To 
improve the estimation's accuracy, "
           + "for smaller or more compressable record size, set the sample size 
bigger. For bigger or less compressable record size, set smaller.");
+
+  public static final ConfigProperty<Boolean> EXTRA_ROW_SOURCE_EXCEPTIONS = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + "extra.row.source.exceptions")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("0.15.0")
+      .withDocumentation("Reads from source to row format will have the 
dataframe wrapped with an exception handler");

Review Comment:
   ```suggestion
         .withDocumentation("When enabled, the dataframe generated from reading 
source data is wrapped with an exception handler to explicitly surface 
exceptions.");
   ```



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -963,15 +966,17 @@ private static Object 
rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem
         }
         return newRecord;
       case ENUM:
-        ValidationUtils.checkArgument(
-            oldSchema.getType() == Schema.Type.STRING || oldSchema.getType() 
== Schema.Type.ENUM,
-            "Only ENUM or STRING type can be converted ENUM type");
+        if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() 
!= Schema.Type.ENUM) {
+          throw new SchemaCompatibilityException(String.format("Only ENUM or 
STRING type can be converted ENUM type. Schema type was %s", 
oldSchema.getType().getName()));
+        }
         if (oldSchema.getType() == Schema.Type.STRING) {
           return new GenericData.EnumSymbol(newSchema, oldRecord);
         }
         return oldRecord;
       case ARRAY:
-        ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot 
rewrite record with different type");
+        if (!(oldRecord instanceof Collection)) {

Review Comment:
   nit: might be good to add another util method, e.g., 
`ValidationUtils.checkArgument(final boolean expression, final Throwable 
throwable)`, to throw specific exception if the condition is not met.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to