This is an automated email from the ASF dual-hosted git repository.
jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bf723f56cd0 [HUDI-7486] Classify schema exceptions when converting
from avro to spark row representation (#10778)
bf723f56cd0 is described below
commit bf723f56cd0d379f951a5a2d535502f326d1bc78
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Apr 3 08:50:12 2024 -0400
[HUDI-7486] Classify schema exceptions when converting from avro to spark
row representation (#10778)
* make exceptions more specific
* use hudi avro exception
* Address review comments
* fix unnecessary changes
* add exception wrapping
* style
* address review comments
* remove . from config
* address review comments
* fix merge
* fix checkstyle
* Update
hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java
Co-authored-by: Y Ethan Guo <[email protected]>
* Update
hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java
Co-authored-by: Y Ethan Guo <[email protected]>
* add javadoc to exception wrapper
---------
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../org/apache/hudi/AvroConversionUtils.scala | 14 +++++--
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 20 ++++++---
.../hudi/util/ExceptionWrappingIterator.scala | 44 +++++++++++++++++++
.../java/org/apache/hudi/avro/AvroSchemaUtils.java | 10 ++---
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 25 +++++++----
.../hudi/exception/HoodieAvroSchemaException.java | 31 ++++++++++++++
.../exception/HoodieRecordCreationException.java | 32 ++++++++++++++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 14 ++++---
.../utilities/config/HoodieStreamerConfig.java | 7 ++++
.../apache/hudi/utilities/sources/RowSource.java | 9 +++-
.../utilities/streamer/HoodieStreamerUtils.java | 24 +++++++----
.../utilities/streamer/SourceFormatAdapter.java | 9 +++-
.../hudi/utilities/sources/TestAvroDFSSource.java | 3 +-
.../hudi/utilities/sources/TestCsvDFSSource.java | 3 +-
.../hudi/utilities/sources/TestJsonDFSSource.java | 49 +++++++++++++++++++++-
.../utilities/sources/TestParquetDFSSource.java | 3 +-
.../sources/AbstractDFSSourceTestBase.java | 7 +++-
17 files changed, 257 insertions(+), 47 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 55877938f8c..95962d1ca44 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.avro.{JsonProperties, Schema}
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.avro.AvroSchemaUtils
+import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.internal.schema.HoodieSchemaException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -58,9 +59,16 @@ object AvroConversionUtils {
*/
def createInternalRowToAvroConverter(rootCatalystType: StructType,
rootAvroType: Schema, nullable: Boolean): InternalRow => GenericRecord = {
val serializer = sparkAdapter.createAvroSerializer(rootCatalystType,
rootAvroType, nullable)
- row => serializer
- .serialize(row)
- .asInstanceOf[GenericRecord]
+ row => {
+ try {
+ serializer
+ .serialize(row)
+ .asInstanceOf[GenericRecord]
+ } catch {
+ case e: HoodieSchemaException => throw e
+ case e => throw new SchemaCompatibilityException("Failed to convert
spark record into avro record", e)
+ }
+ }
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 03d977f6fc9..6de5de8842e 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -18,25 +18,25 @@
package org.apache.hudi
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.hadoop.fs.CachingPath
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.hadoop.fs.Path
+import org.apache.hudi.util.ExceptionWrappingIterator
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.execution.SQLConfInjectingRDD
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, HoodieUnsafeUtils}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
@@ -131,6 +131,16 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
new SQLConfInjectingRDD(rdd, conf)
+ def maybeWrapDataFrameWithException(df: DataFrame, exceptionClass: String,
msg: String, shouldWrap: Boolean): DataFrame = {
+ if (shouldWrap) {
+ HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession,
injectSQLConf(df.queryExecution.toRdd.mapPartitions {
+ rows => new ExceptionWrappingIterator[InternalRow](rows,
exceptionClass, msg)
+ }, SQLConf.get), df.schema)
+ } else {
+ df
+ }
+ }
+
def safeCreateRDD(df: DataFrame, structName: String, recordNamespace:
String, reconcileToLatestSchema: Boolean,
latestTableSchema:
org.apache.hudi.common.util.Option[Schema] =
org.apache.hudi.common.util.Option.empty()):
Tuple2[RDD[GenericRecord], RDD[String]] = {
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala
new file mode 100644
index 00000000000..994e6f0eea2
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.util
+
+import org.apache.hudi.common.util.ReflectionUtils
+
+/**
+ * Used to catch exceptions from an iterator
+ * @param in iterator to catch exceptions from
+ * @param exceptionClass name of exception class to throw when an exception is
thrown during iteration
+ * @param msg message the thrown exception should have
+ */
+class ExceptionWrappingIterator[T](val in: Iterator[T], val exceptionClass:
String, val msg: String) extends Iterator[T] {
+ override def hasNext: Boolean = try in.hasNext
+ catch {
+ case e: Throwable => throw createException(e)
+ }
+
+ override def next: T = try in.next
+ catch {
+ case e: Throwable => throw createException(e)
+ }
+
+ private def createException(e: Throwable): Throwable = {
+ ReflectionUtils.loadClass(exceptionClass, Array(classOf[String],
classOf[Throwable]).asInstanceOf[Array[Class[_]]], msg,
e).asInstanceOf[Throwable]
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 027f6ccb37d..ba747a63cbc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -19,12 +19,12 @@
package org.apache.hudi.avro;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.exception.InvalidUnionTypeException;
import org.apache.hudi.exception.MissingSchemaFieldException;
import org.apache.hudi.exception.SchemaBackwardsCompatibilityException;
import org.apache.hudi.exception.SchemaCompatibilityException;
-import org.apache.hudi.exception.InvalidUnionTypeException;
-import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
@@ -317,7 +317,7 @@ public class AvroSchemaUtils {
.orElse(null);
if (nonNullType == null) {
- throw new AvroRuntimeException(
+ throw new HoodieAvroSchemaException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null
type and a non-null type is supported", schema));
}
@@ -349,14 +349,14 @@ public class AvroSchemaUtils {
List<Schema> innerTypes = schema.getTypes();
if (innerTypes.size() != 2) {
- throw new AvroRuntimeException(
+ throw new HoodieAvroSchemaException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null
type and a non-null type is supported", schema));
}
Schema firstInnerType = innerTypes.get(0);
Schema secondInnerType = innerTypes.get(1);
if ((firstInnerType.getType() != Schema.Type.NULL &&
secondInnerType.getType() != Schema.Type.NULL)
|| (firstInnerType.getType() == Schema.Type.NULL &&
secondInnerType.getType() == Schema.Type.NULL)) {
- throw new AvroRuntimeException(
+ throw new HoodieAvroSchemaException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null
type and a non-null type is supported", schema));
}
return firstInnerType.getType() == Schema.Type.NULL ? secondInnerType :
firstInnerType;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index b352099cb1e..a7b3f5ae197 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;
@@ -931,7 +932,9 @@ public class HoodieAvroUtils {
private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord,
Schema oldSchema, Schema newSchema, Map<String, String> renameCols,
Deque<String> fieldNames) {
switch (newSchema.getType()) {
case RECORD:
- ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord,
"cannot rewrite record with different type");
+ if (!(oldRecord instanceof IndexedRecord)) {
+ throw new SchemaCompatibilityException("cannot rewrite record with
different type");
+ }
IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
List<Schema.Field> fields = newSchema.getFields();
GenericData.Record newRecord = new GenericData.Record(newSchema);
@@ -963,15 +966,17 @@ public class HoodieAvroUtils {
}
return newRecord;
case ENUM:
- ValidationUtils.checkArgument(
- oldSchema.getType() == Schema.Type.STRING || oldSchema.getType()
== Schema.Type.ENUM,
- "Only ENUM or STRING type can be converted ENUM type");
+ if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType()
!= Schema.Type.ENUM) {
+ throw new SchemaCompatibilityException(String.format("Only ENUM or
STRING type can be converted ENUM type. Schema type was %s",
oldSchema.getType().getName()));
+ }
if (oldSchema.getType() == Schema.Type.STRING) {
return new GenericData.EnumSymbol(newSchema, oldRecord);
}
return oldRecord;
case ARRAY:
- ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot
rewrite record with different type");
+ if (!(oldRecord instanceof Collection)) {
+ throw new SchemaCompatibilityException(String.format("Cannot rewrite
%s as an array", oldRecord.getClass().getName()));
+ }
Collection array = (Collection) oldRecord;
List<Object> newArray = new ArrayList<>(array.size());
fieldNames.push("element");
@@ -981,7 +986,9 @@ public class HoodieAvroUtils {
fieldNames.pop();
return newArray;
case MAP:
- ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot
rewrite record with different type");
+ if (!(oldRecord instanceof Map)) {
+ throw new SchemaCompatibilityException(String.format("Cannot rewrite
%s as a map", oldRecord.getClass().getName()));
+ }
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
Map<Object, Object> newMap = new HashMap<>(map.size(), 1.0f);
fieldNames.push("value");
@@ -1029,7 +1036,7 @@ public class HoodieAvroUtils {
BigDecimal bd = new BigDecimal(new BigInteger(bytes),
decimal.getScale()).setScale(((Decimal) newSchema.getLogicalType()).getScale());
return DECIMAL_CONVERSION.toFixed(bd, newSchema,
newSchema.getLogicalType());
} else {
- throw new UnsupportedOperationException("Fixed type size change
is not currently supported");
+ throw new HoodieAvroSchemaException("Fixed type size change is
not currently supported");
}
}
@@ -1045,7 +1052,7 @@ public class HoodieAvroUtils {
}
default:
- throw new AvroRuntimeException("Unknown schema type: " +
newSchema.getType());
+ throw new HoodieAvroSchemaException("Unknown schema type: " +
newSchema.getType());
}
} else {
return rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema,
newSchema);
@@ -1130,7 +1137,7 @@ public class HoodieAvroUtils {
break;
default:
}
- throw new AvroRuntimeException(String.format("cannot support rewrite value
for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
+ throw new HoodieAvroSchemaException(String.format("cannot support rewrite
value for schema type: %s since the old schema type is: %s", newSchema,
oldSchema));
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java
new file mode 100644
index 00000000000..c19c88c15c8
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Thrown when we detect in Hudi code that a record schema
+ * violates Avro rules. This can happen even when using Spark
+ * because we use Avro schema internally
+ */
+public class HoodieAvroSchemaException extends SchemaCompatibilityException {
+ public HoodieAvroSchemaException(String message) {
+ super(message);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java
new file mode 100644
index 00000000000..dec70b369da
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Exception thrown during HoodieRecord construction for any failure
+ * that is not a KeyGeneration failure. An example of a failure would be if the
+ * record is malformed.
+ */
+public class HoodieRecordCreationException extends HoodieException {
+
+ public HoodieRecordCreationException(String message, Throwable t) {
+ super(message, t);
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index d9ac8ae798f..98c7c5d29f5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -51,7 +51,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils,
Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME}
import
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig,
HoodieWriteConfig}
-import org.apache.hudi.exception.{HoodieException,
HoodieWriteConflictException}
+import org.apache.hudi.exception.{HoodieException,
HoodieRecordCreationException, HoodieWriteConflictException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
@@ -78,6 +78,7 @@ import java.util.function.BiConsumer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters.setAsJavaSetConverter
import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
object HoodieSparkSqlWriter {
@@ -478,10 +479,13 @@ class HoodieSparkSqlWriterInternal {
}
instantTime = client.createNewInstantTime()
// Convert to RDD[HoodieRecord]
- val hoodieRecords =
-
HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
- writeConfig, parameters, avroRecordName, avroRecordNamespace,
writerSchema,
- processedDataSchema, operation, instantTime,
preppedSparkSqlWrites, preppedSparkSqlMergeInto, preppedWriteOperation))
+ val hoodieRecords =
Try(HoodieCreateRecordUtils.createHoodieRecordRdd(
+ HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
writeConfig, parameters, avroRecordName,
+ avroRecordNamespace, writerSchema, processedDataSchema,
operation, instantTime, preppedSparkSqlWrites,
+ preppedSparkSqlMergeInto, preppedWriteOperation))) match {
+ case Success(recs) => recs
+ case Failure(e) => throw new
HoodieRecordCreationException("Failed to create Hoodie Spark Record", e)
+ }
val dedupedHoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) && operation !=
WriteOperationType.INSERT_OVERWRITE_TABLE && operation !=
WriteOperationType.INSERT_OVERWRITE) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
index b3b64cff905..e50e7fa0612 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
@@ -132,4 +132,11 @@ public class HoodieStreamerConfig extends HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("Number of records to sample from the first write. To
improve the estimation's accuracy, "
+ "for smaller or more compressable record size, set the sample size
bigger. For bigger or less compressable record size, set smaller.");
+
+ public static final ConfigProperty<Boolean> ROW_THROW_EXPLICIT_EXCEPTIONS =
ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX + "row.throw.explicit.exceptions")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("0.15.0")
+ .withDocumentation("When enabled, the dataframe generated from reading
source data is wrapped with an exception handler to explicitly surface
exceptions.");
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
index f2cc48f280c..1c7e9d99098 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
@@ -18,10 +18,13 @@
package org.apache.hudi.utilities.sources;
+import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
@@ -30,6 +33,8 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS;
+
public abstract class RowSource extends Source<Dataset<Row>> {
public RowSource(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
@@ -46,7 +51,9 @@ public abstract class RowSource extends Source<Dataset<Row>> {
Dataset<Row> sanitizedRows =
SanitizationUtils.sanitizeColumnNamesForAvro(dsr, props);
SchemaProvider rowSchemaProvider =
UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(),
props, sparkContext);
- return new InputBatch<>(Option.of(sanitizedRows), res.getValue(),
rowSchemaProvider);
+ Dataset<Row> wrappedDf =
HoodieSparkUtils.maybeWrapDataFrameWithException(sanitizedRows,
HoodieReadFromSourceException.class.getName(),
+ "Failed to read from row source",
ConfigUtils.getBooleanWithAltKeys(props, ROW_THROW_EXPLICIT_EXCEPTIONS));
+ return new InputBatch<>(Option.of(wrappedDf), res.getValue(),
rowSchemaProvider);
}).orElseGet(() -> new InputBatch<>(res.getKey(), res.getValue()));
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 90315bc9764..61d7793e6ad 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -36,6 +36,9 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.exception.HoodieRecordCreationException;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -104,10 +107,7 @@ public class HoodieStreamerUtils {
: DataSourceUtils.createPayload(cfg.payloadClassName,
gr);
avroRecords.add(Either.left(new
HoodieAvroRecord<>(hoodieKey, payload)));
} catch (Exception e) {
- if (!shouldErrorTable) {
- throw e;
- }
- avroRecords.add(generateErrorRecord(genRec));
+ avroRecords.add(generateErrorRecordOrThrowException(genRec,
e, shouldErrorTable));
}
}
return avroRecords.iterator();
@@ -135,10 +135,7 @@ public class HoodieStreamerUtils {
return Either.left(new HoodieSparkRecord(new
HoodieKey(recordKey, partitionPath),
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType,
targetStructType).apply(row), targetStructType, false));
} catch (Exception e) {
- if (!shouldErrorTable) {
- throw e;
- }
- return generateErrorRecord(rec);
+ return generateErrorRecordOrThrowException(rec, e,
shouldErrorTable);
}
});
@@ -159,7 +156,16 @@ public class HoodieStreamerUtils {
* @return the representation of error record (empty {@link HoodieRecord}
and the error record
* String) for writing to error table.
*/
- private static Either<HoodieRecord, String>
generateErrorRecord(GenericRecord genRec) {
+ private static Either<HoodieRecord, String>
generateErrorRecordOrThrowException(GenericRecord genRec, Exception e, boolean
shouldErrorTable) {
+ if (!shouldErrorTable) {
+ if (e instanceof HoodieKeyException) {
+ throw (HoodieKeyException) e;
+ } else if (e instanceof HoodieKeyGeneratorException) {
+ throw (HoodieKeyGeneratorException) e;
+ } else {
+ throw new HoodieRecordCreationException("Failed to create Hoodie
Record", e);
+ }
+ }
try {
return Either.right(HoodieAvroUtils.avroToJsonString(genRec, false));
} catch (Exception ex) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index 1796c96dab8..c379472b26e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -23,8 +23,10 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -53,6 +55,7 @@ import java.util.stream.Collectors;
import scala.util.Either;
+import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS;
import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES;
import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
import static
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -66,6 +69,8 @@ public class SourceFormatAdapter implements Closeable {
private final Source source;
private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue();
+
+ private boolean wrapWithException =
ROW_THROW_EXPLICIT_EXCEPTIONS.defaultValue();
private String invalidCharMask =
SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue();
private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
@@ -80,6 +85,7 @@ public class SourceFormatAdapter implements Closeable {
if (props.isPresent()) {
this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get());
this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get());
+ this.wrapWithException = ConfigUtils.getBooleanWithAltKeys(props.get(),
ROW_THROW_EXPLICIT_EXCEPTIONS);
}
if (this.shouldSanitize && source.getSourceType() ==
Source.SourceType.PROTO) {
throw new IllegalArgumentException("PROTO cannot be sanitized");
@@ -244,7 +250,8 @@ public class SourceFormatAdapter implements Closeable {
StructType dataType =
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
return new InputBatch<>(
Option.ofNullable(
- r.getBatch().map(rdd ->
source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
+ r.getBatch().map(rdd ->
HoodieSparkUtils.maybeWrapDataFrameWithException(source.getSparkSession().read().schema(dataType).json(rdd),
+ SchemaCompatibilityException.class.getName(), "Schema
does not match json data", wrapWithException)).orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
index 5ccf9ad2b29..808a4ca57ce 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
@@ -39,8 +39,7 @@ public class TestAvroDFSSource extends
AbstractDFSSourceTestBase {
}
@Override
- protected Source prepareDFSSource() {
- TypedProperties props = new TypedProperties();
+ protected Source prepareDFSSource(TypedProperties props) {
props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
try {
return new AvroDFSSource(props, jsc, sparkSession, schemaProvider);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
index 6a2bbcd0136..c4bb59ff812 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
@@ -46,8 +46,7 @@ public class TestCsvDFSSource extends
AbstractDFSSourceTestBase {
}
@Override
- public Source prepareDFSSource() {
- TypedProperties props = new TypedProperties();
+ public Source prepareDFSSource(TypedProperties props) {
props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
props.setProperty("hoodie.streamer.csv.header", Boolean.toString(true));
props.setProperty("hoodie.streamer.csv.sep", "\t");
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
index 24a341fe9c3..ae134e862be 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
@@ -20,15 +20,29 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.SchemaCompatibilityException;
+import org.apache.hudi.utilities.config.HoodieStreamerConfig;
+import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.io.PrintStream;
import java.util.List;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
/**
* Basic tests for {@link JsonDFSSource}.
*/
@@ -42,8 +56,7 @@ public class TestJsonDFSSource extends
AbstractDFSSourceTestBase {
}
@Override
- public Source prepareDFSSource() {
- TypedProperties props = new TypedProperties();
+ public Source prepareDFSSource(TypedProperties props) {
props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
return new JsonDFSSource(props, jsc, sparkSession, schemaProvider);
}
@@ -53,4 +66,36 @@ public class TestJsonDFSSource extends
AbstractDFSSourceTestBase {
UtilitiesTestBase.Helpers.saveStringsToDFS(
Helpers.jsonifyRecords(records), fs, path.toString());
}
+
+ @Test
+ public void testCorruptedSourceFile() throws IOException {
+ fs.mkdirs(new Path(dfsRoot));
+ TypedProperties props = new TypedProperties();
+
props.setProperty(HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS.key(),
"true");
+ SourceFormatAdapter sourceFormatAdapter = new
SourceFormatAdapter(prepareDFSSource(props), Option.empty(), Option.of(props));
+ generateOneFile("1", "000", 10);
+ generateOneFile("2", "000", 10);
+ RemoteIterator<LocatedFileStatus> files =
fs.listFiles(generateOneFile("3", "000", 10), true);
+
+ FileStatus file1Status = files.next();
+ InputBatch<Dataset<Row>> batch =
sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
+ corruptFile(file1Status.getPath());
+ assertTrue(batch.getBatch().isPresent());
+ Throwable t = assertThrows(Exception.class,
+ () -> batch.getBatch().get().show(30));
+ while (t != null) {
+ if (t instanceof SchemaCompatibilityException) {
+ return;
+ }
+ t = t.getCause();
+ }
+ throw new AssertionError("Exception does not have SchemaCompatibility in
its trace", t);
+ }
+
+ protected void corruptFile(Path path) throws IOException {
+ PrintStream os = new PrintStream(fs.appendFile(path).build());
+ os.println("🤷");
+ os.flush();
+ os.close();
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
index 159ababcf47..a9c448748c9 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
@@ -41,8 +41,7 @@ public class TestParquetDFSSource extends
AbstractDFSSourceTestBase {
}
@Override
- public Source prepareDFSSource() {
- TypedProperties props = new TypedProperties();
+ public Source prepareDFSSource(TypedProperties props) {
props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
index 0de087ece73..76a1a645367 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.testutils.sources;
import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
@@ -74,7 +75,11 @@ public abstract class AbstractDFSSourceTestBase extends
UtilitiesTestBase {
*
* @return A {@link Source} using DFS as the file system.
*/
- protected abstract Source prepareDFSSource();
+ protected final Source prepareDFSSource() {
+ return prepareDFSSource(new TypedProperties());
+ }
+
+ protected abstract Source prepareDFSSource(TypedProperties props);
/**
* Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file
on DFS.