This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b6490c160bf [HUDI-5813] Error table support for error events (#7982)
b6490c160bf is described below

commit b6490c160bf7644607ead00da60c4eda3ec07773
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Feb 28 18:58:28 2023 +0530

    [HUDI-5813] Error table support for error events (#7982)
    
    Add support for capturing source conversion error events to a configured 
error table.
    
    -  Track error events in `SourceFormatAdapter`
    -  Track error events from `WriteStatus`
    -  Added new WriteClient for error table `BaseErrorTableWriter`
    -  If commit to error table fails then throw error so that delta sync fails 
.
---
 .../apache/hudi/config/HoodieErrorTableConfig.java | 100 +++++++++++++++++
 .../org/apache/hudi/AvroConversionUtils.scala      |   1 +
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  79 +++++++++++++-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  63 +++++++++--
 .../deltastreamer/BaseErrorTableWriter.java        |  70 ++++++++++++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  95 +++++++++++++---
 .../hudi/utilities/deltastreamer/ErrorEvent.java   |  58 ++++++++++
 .../utilities/deltastreamer/ErrorTableUtils.java   |  66 ++++++++++++
 .../deltastreamer/SourceFormatAdapter.java         | 111 +++++++++++++++----
 .../utilities/sources/helpers/AvroConvertor.java   |  14 +++
 .../utilities/sources/TestGenericRddTransform.java |  57 ++++++++++
 .../utilities/sources/TestJsonKafkaSource.java     | 120 +++++++++++++++++++++
 12 files changed, 791 insertions(+), 43 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
new file mode 100644
index 00000000000..68e2097c33b
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+  public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = 
ConfigProperty
+      .key("hoodie.errortable.enable")
+      .defaultValue(false)
+      .withDocumentation("Config to enable error table. If the config is 
enabled, "
+          + "all the records with processing error in DeltaStreamer are 
transferred to error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_BASE_PATH = 
ConfigProperty
+      .key("hoodie.errortable.base.path")
+      .noDefaultValue()
+      .withDocumentation("Base path for error table under which all error 
records "
+          + "would be stored.");
+
+  public static final ConfigProperty<String> ERROR_TARGET_TABLE = 
ConfigProperty
+      .key("hoodie.errortable.target.table.name")
+      .noDefaultValue()
+      .withDocumentation("Table name to be used for the error table");
+
+  public static final ConfigProperty<Integer> 
ERROR_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.errortable.upsert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set upsert shuffle parallelism. The config 
is similar to "
+          + "hoodie.upsert.shuffle.parallelism config but applies to the error 
table.");
+
+  public static final ConfigProperty<Integer> 
ERROR_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.errortable.insert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set insert shuffle parallelism. The config 
is similar to "
+          + "hoodie.insert.shuffle.parallelism config but applies to the error 
table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_CLASS = 
ConfigProperty
+      .key("hoodie.errortable.write.class")
+      .noDefaultValue()
+      .withDocumentation("Class which handles the error table writes. This 
config is used to configure "
+          + "a custom implementation for Error Table Writer. Specify the full 
class name of the custom "
+          + "error table writer as a value for this config");
+
+  public static final ConfigProperty<Boolean> 
ERROR_ENABLE_VALIDATE_TARGET_SCHEMA = ConfigProperty
+      .key("hoodie.errortable.validate.targetschema.enable")
+      .defaultValue(false)
+      .withDocumentation("Records with schema mismatch with Target Schema are 
sent to Error Table.");
+
+  public static final ConfigProperty<String> 
ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
+      .key("hoodie.errortable.write.failure.strategy")
+      .defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
+      .withDocumentation("The config specifies the failure strategy if error 
table write fails. "
+          + "Use one of - " + 
Arrays.toString(ErrorWriteFailureStrategy.values()));
+
+  public enum ErrorWriteFailureStrategy {
+    ROLLBACK_COMMIT("Rollback the corresponding base table write commit for 
which the error events were triggered"),
+    LOG_ERROR("Error is logged but the base table write succeeds");
+
+    private final String description;
+
+    ErrorWriteFailureStrategy(String description) {
+      this.description = description;
+    }
+
+    public String getDescription() {
+      return description;
+    }
+
+    @Override
+    public String toString() {
+      return super.toString() + " (" + description + ")\n";
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 9f5d6fd7afe..8e44dc4b1fe 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.HoodieSparkUtils.sparkAdapter
 import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index dec0eb5805c..ba9dcaccce3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.SQLConfInjectingRDD
 import org.apache.spark.sql.internal.SQLConf
@@ -116,10 +116,79 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport {
     }, SQLConf.get)
   }
 
-  def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = {
-    sparkAdapter.createSparkRowSerDe(structType)
-  }
-
   private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
     new SQLConfInjectingRDD(rdd, conf)
+
+  def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: 
String, reconcileToLatestSchema: Boolean,
+                    latestTableSchema: 
org.apache.hudi.common.util.Option[Schema] = 
org.apache.hudi.common.util.Option.empty()):
+  Tuple2[RDD[GenericRecord], RDD[String]] = {
+    var latestTableSchemaConverted: Option[Schema] = None
+
+    if (latestTableSchema.isPresent && reconcileToLatestSchema) {
+      latestTableSchemaConverted = Some(latestTableSchema.get())
+    } else {
+      // cases when users want to use latestTableSchema but have not turned on 
reconcileToLatestSchema explicitly
+      // for example, when using a Transformer implementation to transform 
source RDD to target RDD
+      latestTableSchemaConverted = if (latestTableSchema.isPresent) 
Some(latestTableSchema.get()) else None
+    }
+    safeCreateRDD(df, structName, recordNamespace, latestTableSchemaConverted);
+  }
+
+  def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: 
String, readerAvroSchemaOpt: Option[Schema]):
+  Tuple2[RDD[GenericRecord], RDD[String]] = {
+    val writerSchema = df.schema
+    val writerAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName, 
recordNamespace)
+    val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+    // We check whether passed in reader schema is identical to writer schema 
to avoid costly serde loop of
+    // making Spark deserialize its internal representation [[InternalRow]] 
into [[Row]] for subsequent conversion
+    // (and back)
+    val sameSchema = writerAvroSchema.equals(readerAvroSchema)
+    val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != 
writerAvroSchema
+
+    // NOTE: We have to serialize Avro schema, and then subsequently parse it 
on the executor node, since Spark
+    //       serializer is not able to digest it
+    val writerAvroSchemaStr = writerAvroSchema.toString
+    val readerAvroSchemaStr = readerAvroSchema.toString
+    // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to 
[[Row]] conversion
+
+    if (!sameSchema) {
+      val rdds: RDD[Either[GenericRecord, InternalRow]] = 
df.queryExecution.toRdd.mapPartitions { rows =>
+        if (rows.isEmpty) {
+          Iterator.empty
+        } else {
+          val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
+          val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr)
+          val convert = 
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, 
writerAvroSchema, nullable = nullable)
+          val transform: InternalRow => Either[GenericRecord, InternalRow] = 
internalRow => try {
+            Left(HoodieAvroUtils.rewriteRecordDeep(convert(internalRow), 
readerAvroSchema, true))
+          } catch {
+            case _: Throwable =>
+              Right(internalRow)
+          }
+          rows.map(transform)
+        }
+      }
+
+      val rowDeserializer = getCatalystRowSerDe(writerSchema)
+      val errorRDD = df.sparkSession.createDataFrame(
+        rdds.filter(_.isRight).map(_.right.get).map(ir => 
rowDeserializer.deserializeRow(ir)), writerSchema)
+
+      // going to follow up on improving performance of separating out events
+      (rdds.filter(_.isLeft).map(_.left.get), errorRDD.toJSON.rdd)
+    } else {
+      val rdd = df.queryExecution.toRdd.mapPartitions { rows =>
+        if (rows.isEmpty) {
+          Iterator.empty
+        } else {
+          val convert = 
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, 
writerAvroSchema, nullable = nullable)
+          rows.map(convert)
+        }
+      }
+      (rdd, df.sparkSession.sparkContext.emptyRDD[String])
+    }
+  }
+
+  def getCatalystRowSerDe(structType: StructType): SparkRowSerDe = {
+    sparkAdapter.createSparkRowSerDe(structType)
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index a4157debdbf..aa19fac10b2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -74,6 +74,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -787,7 +788,12 @@ public class HoodieAvroUtils {
    * @return newRecord for new Schema
    */
   public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord 
oldRecord, Schema newSchema, Map<String, String> renameCols) {
-    Object newRecord = rewriteRecordWithNewSchema(oldRecord, 
oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());
+    Object newRecord = rewriteRecordWithNewSchema(oldRecord, 
oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>(),false);
+    return (GenericData.Record) newRecord;
+  }
+
+  public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord 
oldRecord, Schema newSchema, Map<String, String> renameCols, boolean validate) {
+    Object newRecord = rewriteRecordWithNewSchema(oldRecord, 
oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>(), validate);
     return (GenericData.Record) newRecord;
   }
 
@@ -805,12 +811,22 @@ public class HoodieAvroUtils {
    * @param fieldNames track the full name of visited field when we travel new 
schema.
    * @return newRecord for new Schema
    */
-  private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema 
oldAvroSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> 
fieldNames) {
+
+  private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema 
oldAvroSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> 
fieldNames, boolean validate) {
     if (oldRecord == null) {
       return null;
     }
     // try to get real schema for union type
     Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
+    Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, 
oldSchema, newSchema, renameCols, fieldNames, validate);
+    if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema, 
newRecord)) {
+      throw new SchemaCompatibilityException(
+          "Unable to validate the rewritten record " + oldRecord + " against 
schema " + newSchema);
+    }
+    return newRecord;
+  }
+
+  private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, 
Schema oldSchema, Schema newSchema, Map<String, String> renameCols, 
Deque<String> fieldNames, boolean validate) {
     switch (newSchema.getType()) {
       case RECORD:
         ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, 
"cannot rewrite record with different type");
@@ -823,7 +839,7 @@ public class HoodieAvroUtils {
           fieldNames.push(fieldName);
           if (oldSchema.getField(field.name()) != null && 
!renameCols.containsKey(field.name())) {
             Schema.Field oldField = oldSchema.getField(field.name());
-            newRecord.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+            newRecord.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
           } else {
             String fieldFullName = createFullName(fieldNames);
             String fieldNameFromOldSchema = 
renameCols.getOrDefault(fieldFullName, "");
@@ -831,7 +847,7 @@ public class HoodieAvroUtils {
             if (oldSchema.getField(fieldNameFromOldSchema) != null) {
               // find rename
               Schema.Field oldField = 
oldSchema.getField(fieldNameFromOldSchema);
-              newRecord.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+              newRecord.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
             } else {
               // deal with default value
               if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
@@ -850,7 +866,7 @@ public class HoodieAvroUtils {
         List<Object> newArray = new ArrayList();
         fieldNames.push("element");
         for (Object element : array) {
-          newArray.add(rewriteRecordWithNewSchema(element, 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, 
fieldNames));
+          newArray.add(rewriteRecordWithNewSchema(element, 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames, 
validate));
         }
         fieldNames.pop();
         return newArray;
@@ -860,12 +876,12 @@ public class HoodieAvroUtils {
         Map<Object, Object> newMap = new HashMap<>();
         fieldNames.push("value");
         for (Map.Entry<Object, Object> entry : map.entrySet()) {
-          newMap.put(entry.getKey(), 
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), 
newSchema.getValueType(), renameCols, fieldNames));
+          newMap.put(entry.getKey(), 
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), 
newSchema.getValueType(), renameCols, fieldNames, validate));
         }
         fieldNames.pop();
         return newMap;
       case UNION:
-        return rewriteRecordWithNewSchema(oldRecord, 
getActualSchemaFromUnion(oldSchema, oldRecord), 
getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames);
+        return rewriteRecordWithNewSchema(oldRecord, 
getActualSchemaFromUnion(oldSchema, oldRecord), 
getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames, 
validate);
       default:
         return rewritePrimaryType(oldRecord, oldSchema, newSchema);
     }
@@ -1083,10 +1099,43 @@ public class HoodieAvroUtils {
     }
   }
 
+  /**
+   * Given avro records, rewrites them with new schema.
+   *
+   * @param oldRecords oldRecords to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @param renameCols a map store all rename cols, (k, v)-> 
(colNameFromNewSchema, colNameFromOldSchema)
+   * @return a iterator of rewrote GeneriRcords
+   */
+  public static Iterator<GenericRecord> 
rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema 
newSchema, Map<String, String> renameCols, boolean validate) {
+    if (oldRecords == null || newSchema == null) {
+      return Collections.emptyIterator();
+    }
+    return new Iterator<GenericRecord>() {
+      @Override
+      public boolean hasNext() {
+        return oldRecords.hasNext();
+      }
+
+      @Override
+      public GenericRecord next() {
+        return rewriteRecordWithNewSchema(oldRecords.next(), newSchema, 
renameCols, validate);
+      }
+    };
+  }
+
+  public static Iterator<GenericRecord> 
rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema 
newSchema, Map<String, String> renameCols) {
+    return rewriteRecordWithNewSchema(oldRecords, newSchema, 
Collections.EMPTY_MAP, false);
+  }
+
   public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord, 
Schema newSchema) {
     return rewriteRecordWithNewSchema(oldRecord, newSchema, 
Collections.EMPTY_MAP);
   }
 
+  public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord, 
Schema newSchema, boolean validate) {
+    return rewriteRecordWithNewSchema(oldRecord, newSchema, 
Collections.EMPTY_MAP, validate);
+  }
+
   public static boolean gteqAvro1_9() {
     return VersionUtil.compareVersions(AVRO_VERSION, "1.9") >= 0;
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
new file mode 100644
index 00000000000..fea6bdb3cf3
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events while processing write records. All the
+ * records which have a processing/write failure are triggered as error events 
to
+ * BaseErrorTableWriter. The implementation of BaseErrorTableWriter processes
+ * these error events through addErrorEvents API and commits them to the error 
table when
+ * upsertAndCommit API is called.
+ *
+ * The writer can use the configs defined in HoodieErrorTableConfig to manage 
the error table.
+ */
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+
+  // The column name passed to Spark for option `columnNameOfCorruptRecord`. 
The record
+  // is set to this column in case of an error
+  public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession 
sparkSession,
+                                   TypedProperties props, JavaSparkContext 
jssc, FileSystem fs) {
+  }
+
+  /**
+   * Processes input error events. These error events would be committed later 
through upsertAndCommit
+   * API call.
+   *
+   * @param errorEvent Input error event RDD
+   */
+  public abstract void addErrorEvents(JavaRDD<T> errorEvent);
+
+  /**
+   * Fetches the error events RDD processed by the writer so far. This is a 
test API.
+   */
+  @VisibleForTesting
+  public abstract Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String 
baseTableInstantTime, Option<String> commitedInstantTime);
+
+  /**
+   * This API is called to commit the error events (failed Hoodie Records) 
processed by the writer so far.
+   * These records are committed to a error table.
+   */
+  public abstract boolean upsertAndCommit(String baseTableInstantTime, 
Option<String> commitedInstantTime);
+
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index b25631bb179..07ba73b528a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -70,6 +70,7 @@ import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
+import org.apache.hudi.config.HoodieErrorTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -103,6 +104,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.Row;
@@ -129,12 +131,14 @@ import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import scala.Tuple2;
 import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
 import static 
org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
 import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
 import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
+import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_ENABLED;
 import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
 import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
 import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
@@ -242,6 +246,9 @@ public class DeltaSync implements Serializable, Closeable {
    */
   private transient SparkRDDWriteClient writeClient;
 
+  private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
+  private HoodieErrorTableConfig.ErrorWriteFailureStrategy 
errorWriteFailureStrategy;
+
   private transient HoodieDeltaStreamerMetrics metrics;
 
   private transient HoodieMetrics hoodieMetrics;
@@ -279,15 +286,19 @@ public class DeltaSync implements Serializable, Closeable 
{
 
     this.metrics = new 
HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
     this.hoodieMetrics = new 
HoodieMetrics(getHoodieClientConfig(this.schemaProvider));
-
-    this.formatAdapter = new SourceFormatAdapter(
-        UtilHelpers.createSource(cfg.sourceClassName, props, jssc, 
sparkSession, schemaProvider, metrics));
     this.conf = conf;
     String id = conf.get(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key());
     if (StringUtils.isNullOrEmpty(id)) {
       id = props.getProperty(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key());
     }
     this.multiwriterIdentifier = StringUtils.isNullOrEmpty(id) ? 
Option.empty() : Option.of(id);
+    if 
(props.getBoolean(ERROR_TABLE_ENABLED.key(),ERROR_TABLE_ENABLED.defaultValue()))
 {
+      this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, 
sparkSession, props, jssc, fs);
+      this.errorWriteFailureStrategy = 
ErrorTableUtils.getErrorWriteFailureStrategy(props);
+    }
+    this.formatAdapter = new SourceFormatAdapter(
+        UtilHelpers.createSource(cfg.sourceClassName, props, jssc, 
sparkSession, schemaProvider, metrics),
+        this.errorTableWriter);
   }
 
   /**
@@ -513,9 +524,34 @@ public class DeltaSync implements Serializable, Closeable {
       Option<Dataset<Row>> transformed =
           dataAndCheckpoint.getBatch().map(data -> 
transformer.get().apply(jssc, sparkSession, data, props));
 
+      transformed = formatAdapter.processErrorEvents(transformed,
+          ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
+
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = 
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && 
this.userProvidedSchemaProvider.getTargetSchema() != null) {
+        // If the target schema is specified through Avro schema,
+        // pass in the schema for the Row-to-Avro conversion
+        // to avoid nullability mismatch between Avro schema and Row schema
+        if (errorTableWriter.isPresent()
+            && 
props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
+            
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue())) {
+          // If the above conditions are met, trigger error events for the 
rows whose conversion to
+          // avro records fails.
+          avroRDDOptional = transformed.map(
+              rowDataset -> {
+                Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs = 
HoodieSparkUtils.safeCreateRDD(rowDataset,
+                    HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
+                    
Option.of(this.userProvidedSchemaProvider.getTargetSchema()));
+                
errorTableWriter.get().addErrorEvents(safeCreateRDDs._2().toJavaRDD()
+                    .map(evStr -> new ErrorEvent<>(evStr,
+                        ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
+                return safeCreateRDDs._1.toJavaRDD();
+              });
+        } else {
+          avroRDDOptional = transformed.map(
+              rowDataset -> getTransformedRDD(rowDataset, reconcileSchema, 
this.userProvidedSchemaProvider.getTargetSchema()));
+        }
         schemaProvider = this.userProvidedSchemaProvider;
       } else {
         Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
@@ -537,12 +573,9 @@ public class DeltaSync implements Serializable, Closeable {
           (SchemaProvider) new DelegatingSchemaProvider(props, jssc, 
dataAndCheckpoint.getSchemaProvider(),
                 new SimpleSchemaProvider(jssc, targetSchema, props)))
           .orElse(dataAndCheckpoint.getSchemaProvider());
+        // Rewrite transformed records into the expected target schema
+        avroRDDOptional = transformed.map(t -> getTransformedRDD(t, 
reconcileSchema, schemaProvider.getTargetSchema()));
       }
-
-      // Rewrite transformed records into the expected target schema
-      avroRDDOptional =
-          transformed.map(t -> HoodieSparkUtils.createRdd(t, 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
-              reconcileSchema, 
Option.of(schemaProvider.getTargetSchema())).toJavaRDD());
     } else {
       // Pull the data from the source & prepare the write
       InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
@@ -607,6 +640,11 @@ public class DeltaSync implements Serializable, Closeable {
     return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
   }
 
+  private JavaRDD<GenericRecord> getTransformedRDD(Dataset<Row> rowDataset, 
boolean reconcileSchema, Schema readerSchema) {
+    return HoodieSparkUtils.createRdd(rowDataset, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE, reconcileSchema,
+        Option.ofNullable(readerSchema)).toJavaRDD();
+  }
+
   /**
    * Process previous commit metadata and checkpoint configs set by user to 
determine the checkpoint to resume from.
    * @param commitTimelineOpt commit timeline of interest.
@@ -661,13 +699,13 @@ public class DeltaSync implements Serializable, Closeable 
{
     }
   }
 
-  public Option<HoodieCommitMetadata> 
getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws 
IOException {
-    return (Option<HoodieCommitMetadata>) 
timeline.getReverseOrderedInstants().map(instant -> {
+  protected Option<Pair<String, HoodieCommitMetadata>> 
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline 
timeline) throws IOException {
+    return (Option<Pair<String, HoodieCommitMetadata>>) 
timeline.getReverseOrderedInstants().map(instant -> {
       try {
         HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
             .fromBytes(timeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
         if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY)) || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
-          return Option.of(commitMetadata);
+          return Option.of(Pair.of(instant.toString(), commitMetadata));
         } else {
           return Option.empty();
         }
@@ -677,6 +715,20 @@ public class DeltaSync implements Serializable, Closeable {
     }).filter(Option::isPresent).findFirst().orElse(Option.empty());
   }
 
+  protected Option<HoodieCommitMetadata> 
getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws 
IOException {
+    return 
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(timeline).map(pair -> 
pair.getRight());
+  }
+
+  protected Option<String> 
getLatestInstantWithValidCheckpointInfo(Option<HoodieTimeline> timelineOpt) {
+    return timelineOpt.map(timeline -> {
+      try {
+        return 
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(timeline).map(pair -> 
pair.getLeft());
+      } catch (IOException e) {
+        throw new HoodieIOException("failed to get latest instant with 
ValidCheckpointInfo", e);
+      }
+    }).orElse(Option.empty());
+  }
+
   /**
    * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive 
if needed.
    *
@@ -748,8 +800,25 @@ public class DeltaSync implements Serializable, Closeable {
             + totalErrorRecords + "/" + totalRecords);
       }
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
-      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType,
-          Collections.emptyMap(), extraPreCommitFunc);
+      if (errorTableWriter.isPresent()) {
+        // Commit the error events triggered so far to the error table
+        Option<String> commitedInstantTime = 
getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+        boolean errorTableSuccess = 
errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
+        if (!errorTableSuccess) {
+          switch (errorWriteFailureStrategy) {
+            case ROLLBACK_COMMIT:
+              LOG.info("Commit " + instantTime + " failed!");
+              writeClient.rollback(instantTime);
+              throw new HoodieException("Error Table Commit failed!");
+            case LOG_ERROR:
+              LOG.error("Error Table write failed for instant " + instantTime);
+              break;
+            default:
+              throw new HoodieException("Write failure strategy not 
implemented for " + errorWriteFailureStrategy);
+          }
+        }
+      }
+      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, Collections.emptyMap(), 
extraPreCommitFunc);
       if (success) {
         LOG.info("Commit " + instantTime + " successful!");
         latestCheckpointWritten = checkpointStr;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java
new file mode 100644
index 00000000000..b300d40d397
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+/**
+ * Error event is an event triggered during write or processing failure of a 
record.
+ */
+public class ErrorEvent<T> {
+
+  private final ErrorReason reason;
+  private final T payload;
+
+  public ErrorEvent(T payload, ErrorReason reason) {
+    this.payload = payload;
+    this.reason = reason;
+  }
+
+  public T getPayload() {
+    return payload;
+  }
+
+  public ErrorReason getReason() {
+    return reason;
+  }
+
+  /**
+   * The reason behind write or processing failure of a record
+   */
+  public enum ErrorReason {
+    // Failure during json to avro record conversion
+    JSON_AVRO_DESERIALIZATION_FAILURE,
+    // Failure during json to row conversion
+    JSON_ROW_DESERIALIZATION_FAILURE,
+    // Failure during row to avro record conversion
+    AVRO_DESERIALIZATION_FAILURE,
+    // Failure during hudi writes
+    HUDI_WRITE_FAILURES,
+    // Failure during transformation of source to target RDD
+    CUSTOM_TRANSFORMER_FAILURE
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
new file mode 100644
index 00000000000..a5533449a6a
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieErrorTableConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+
+import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_CLASS;
+import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_FAILURE_STRATEGY;
+
+public final class ErrorTableUtils {
+
+  public static Option<BaseErrorTableWriter> 
getErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
+      TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+    String errorTableWriterClass = 
props.getString(ERROR_TABLE_WRITE_CLASS.key());
+    
ValidationUtils.checkState(!StringUtils.isNullOrEmpty(errorTableWriterClass),
+        "Missing error table config " + ERROR_TABLE_WRITE_CLASS);
+
+    Class<?>[] argClassArr = new Class[] {HoodieDeltaStreamer.Config.class,
+        SparkSession.class, TypedProperties.class, JavaSparkContext.class, 
FileSystem.class};
+    String errMsg = "Unable to instantiate ErrorTableWriter with arguments 
type " + Arrays.toString(argClassArr);
+    
ValidationUtils.checkArgument(ReflectionUtils.hasConstructor(BaseErrorTableWriter.class.getName(),
 argClassArr), errMsg);
+
+    try {
+      return Option.of((BaseErrorTableWriter) 
ReflectionUtils.getClass(errorTableWriterClass).getConstructor(argClassArr)
+          .newInstance(cfg, sparkSession, props, jssc, fs));
+    } catch (NoSuchMethodException | InvocationTargetException | 
InstantiationException | IllegalAccessException e) {
+      throw new HoodieException(errMsg, e);
+    }
+  }
+
+  public static HoodieErrorTableConfig.ErrorWriteFailureStrategy 
getErrorWriteFailureStrategy(
+      TypedProperties props) {
+    String writeFailureStrategy = 
props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key());
+    return 
HoodieErrorTableConfig.ErrorWriteFailureStrategy.valueOf(writeFailureStrategy);
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index c8aa2f951c9..fdc2af66d9f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -34,13 +34,24 @@ import com.google.protobuf.Message;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
 import java.io.Closeable;
 import java.io.IOException;
 
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import scala.util.Either;
+
+
+import static 
org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME;
 import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
 import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
 
@@ -51,10 +62,56 @@ public final class SourceFormatAdapter implements Closeable 
{
 
   private final Source source;
 
+  private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
+
   public SourceFormatAdapter(Source source) {
+    this(source, Option.empty());
+  }
+
+  public SourceFormatAdapter(Source source, Option<BaseErrorTableWriter> 
errorTableWriter) {
+    this.errorTableWriter = errorTableWriter;
     this.source = source;
   }
 
+  /**
+   * transform input rdd of json string to generic records with support for 
adding error events to error table
+   * @param inputBatch
+   * @return
+   */
+  private JavaRDD<GenericRecord> 
transformJsonToGenericRdd(InputBatch<JavaRDD<String>> inputBatch) {
+    AvroConvertor convertor = new 
AvroConvertor(inputBatch.getSchemaProvider().getSourceSchema());
+    return inputBatch.getBatch().map(rdd -> {
+      if (errorTableWriter.isPresent()) {
+        JavaRDD<Either<GenericRecord,String>> javaRDD = 
rdd.map(convertor::fromJsonWithError);
+        errorTableWriter.get().addErrorEvents(javaRDD.filter(x -> 
x.isRight()).map(x ->
+            new ErrorEvent<>(x.right().get(), 
ErrorEvent.ErrorReason.JSON_AVRO_DESERIALIZATION_FAILURE)));
+        return javaRDD.filter(x -> x.isLeft()).map(x -> x.left().get());
+      } else {
+        return rdd.map(convertor::fromJson);
+      }
+    }).orElse(null);
+  }
+
+  /**
+   * transform datasets with error events when error table is enabled
+   * @param eventsRow
+   * @return
+   */
+  public Option<Dataset<Row>> processErrorEvents(Option<Dataset<Row>> 
eventsRow,
+                                                      ErrorEvent.ErrorReason 
errorReason) {
+    return eventsRow.map(dataset -> {
+          if (errorTableWriter.isPresent() && 
Arrays.stream(dataset.columns()).collect(Collectors.toList())
+              .contains(ERROR_TABLE_CURRUPT_RECORD_COL_NAME)) {
+            errorTableWriter.get().addErrorEvents(dataset.filter(new 
Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME).isNotNull())
+                .select(new 
Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME)).toJavaRDD().map(ev ->
+                    new ErrorEvent<>(ev.getString(0), errorReason)));
+            return dataset.filter(new 
Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME).isNull()).drop(ERROR_TABLE_CURRUPT_RECORD_COL_NAME);
+          }
+          return dataset;
+        }
+    );
+  }
+
   /**
    * Fetch new data in avro format. If the source provides data in different 
format, they are translated to Avro format
    */
@@ -64,23 +121,22 @@ public final class SourceFormatAdapter implements 
Closeable {
         return ((Source<JavaRDD<GenericRecord>>) 
source).fetchNext(lastCkptStr, sourceLimit);
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) 
source).fetchNext(lastCkptStr, sourceLimit);
-        AvroConvertor convertor = new 
AvroConvertor(r.getSchemaProvider().getSourceSchema());
-        return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd -> 
rdd.map(convertor::fromJson)).orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        JavaRDD<GenericRecord> eventsRdd = transformJsonToGenericRdd(r);
+        return new 
InputBatch<>(Option.ofNullable(eventsRdd),r.getCheckpointForNextBatch(), 
r.getSchemaProvider());
       }
       case ROW: {
         InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>) 
source).fetchNext(lastCkptStr, sourceLimit);
         return new InputBatch<>(Option.ofNullable(r.getBatch().map(
             rdd -> {
-              SchemaProvider originalProvider = 
UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
-              return ((originalProvider instanceof FilebasedSchemaProvider) || 
(originalProvider instanceof SchemaRegistryProvider))
-                  // If the source schema is specified through Avro schema,
-                  // pass in the schema for the Row-to-Avro conversion
-                  // to avoid nullability mismatch between Avro schema and Row 
schema
-                  ? HoodieSparkUtils.createRdd(rdd, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE, true,
-                  
org.apache.hudi.common.util.Option.ofNullable(r.getSchemaProvider().getSourceSchema())
-              ).toJavaRDD() : HoodieSparkUtils.createRdd(rdd,
-                  HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, false, 
Option.empty()).toJavaRDD();
+                SchemaProvider originalProvider = 
UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
+                return (originalProvider instanceof FilebasedSchemaProvider || 
(originalProvider instanceof SchemaRegistryProvider))
+                    // If the source schema is specified through Avro schema,
+                    // pass in the schema for the Row-to-Avro conversion
+                    // to avoid nullability mismatch between Avro schema and 
Row schema
+                    ? HoodieSparkUtils.createRdd(rdd, 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, true,
+                    
org.apache.hudi.common.util.Option.ofNullable(r.getSchemaProvider().getSourceSchema())
+                ).toJavaRDD() : HoodieSparkUtils.createRdd(rdd,
+                    HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, false, 
Option.empty()).toJavaRDD();
             })
             .orElse(null)), r.getCheckpointForNextBatch(), 
r.getSchemaProvider());
       }
@@ -101,7 +157,10 @@ public final class SourceFormatAdapter implements 
Closeable {
   public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> 
lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
       case ROW:
-        return ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, 
sourceLimit);
+        InputBatch<Dataset<Row>> datasetInputBatch = ((Source<Dataset<Row>>) 
source).fetchNext(lastCkptStr, sourceLimit);
+        return new 
InputBatch<>(processErrorEvents(datasetInputBatch.getBatch(),
+            ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE),
+            datasetInputBatch.getCheckpointForNextBatch(), 
datasetInputBatch.getSchemaProvider());
       case AVRO: {
         InputBatch<JavaRDD<GenericRecord>> r = 
((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
@@ -118,11 +177,27 @@ public final class SourceFormatAdapter implements 
Closeable {
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) 
source).fetchNext(lastCkptStr, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-        StructType dataType = 
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
-        return new InputBatch<>(
-            Option.ofNullable(
-                r.getBatch().map(rdd -> 
source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (errorTableWriter.isPresent()) {
+          // if error table writer is enabled, during spark read 
`columnNameOfCorruptRecord` option is configured.
+          // Any records which spark is unable to read successfully are 
transferred to the column
+          // configured via this option. The column is then used to trigger 
error events.
+          StructType dataType = 
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)
+              .add(new StructField(ERROR_TABLE_CURRUPT_RECORD_COL_NAME, 
DataTypes.StringType, true, Metadata.empty()));
+          Option<Dataset<Row>> dataset = r.getBatch().map(rdd -> 
source.getSparkSession().read()
+              .option("columnNameOfCorruptRecord", 
ERROR_TABLE_CURRUPT_RECORD_COL_NAME).schema(dataType.asNullable())
+              .json(rdd));
+          Option<Dataset<Row>> eventsDataset = processErrorEvents(dataset,
+              ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE);
+          return new InputBatch<>(
+              eventsDataset,
+              r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        } else {
+          StructType dataType = 
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
+          return new InputBatch<>(
+              Option.ofNullable(
+                  r.getBatch().map(rdd -> 
source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
+              r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        }
       }
       case PROTO: {
         InputBatch<JavaRDD<Message>> r = ((Source<JavaRDD<Message>>) 
source).fetchNext(lastCkptStr, sourceLimit);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index 860c67f4e2a..ee160e19d52 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -28,6 +28,10 @@ import org.apache.avro.generic.GenericRecord;
 
 import java.io.Serializable;
 
+import scala.util.Either;
+import scala.util.Left;
+import scala.util.Right;
+
 /**
  * Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy 
fields to circumvent issues around
  * serializing these objects from driver to executors
@@ -87,6 +91,16 @@ public class AvroConvertor implements Serializable {
     return jsonConverter.convert(json, schema);
   }
 
+  public Either<GenericRecord,String> fromJsonWithError(String json) {
+    GenericRecord genericRecord;
+    try {
+      genericRecord = fromJson(json);
+    } catch (Exception e) {
+      return new Right(json);
+    }
+    return new Left(genericRecord);
+  }
+
   public Schema getSchema() {
     return new Schema.Parser().parse(schemaStr);
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
new file mode 100644
index 00000000000..78bc21ecf92
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.Test;
+
+import scala.Tuple2;
+
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.lit;
+import static org.apache.spark.sql.functions.when;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestGenericRddTransform extends SparkClientFunctionalTestHarness {
+  @Test
+  public void testGenericRddTransform() {
+    Dataset ds = spark().range(10).withColumn("null_check_col", when(expr("id 
% 2 == 0"),
+        lit("test")).otherwise(lit(null)));
+    StructType structType = new StructType(new StructField[] {
+        new StructField("id", DataTypes.StringType, false, Metadata.empty()),
+        new StructField("null_check_col", DataTypes.StringType, false, 
Metadata.empty())});
+    Schema nonNullSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(structType,"record","record");
+    Tuple2<RDD<GenericRecord>, RDD<String>> failSafeRdds = 
HoodieSparkUtils.safeCreateRDD(ds, "record",
+        "record",false, Option.of(nonNullSchema));
+    assertEquals(5, failSafeRdds._1.count());
+    assertEquals(5, failSafeRdds._2.count());
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index d77789140f3..9e6e5aad661 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -19,24 +19,42 @@
 package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter;
+import org.apache.hudi.utilities.deltastreamer.ErrorEvent;
 import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 
+import scala.Tuple2;
+
+import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH;
+import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TARGET_TABLE;
+import static 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -44,6 +62,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  * Tests against {@link JsonKafkaSource}.
  */
 public class TestJsonKafkaSource extends BaseTestKafkaSource {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final HoodieTestDataGenerator DATA_GENERATOR = new 
HoodieTestDataGenerator(1L);
   static final URL SCHEMA_FILE_URL = 
TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source.avsc");
 
   @BeforeEach
@@ -175,4 +195,104 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.sendMessages(topic, 
jsonifyRecords(dataGenerator.generateInserts("000", count)));
   }
+
+  void sendJsonSafeMessagesToKafka(String topic, int count, int numPartitions) 
{
+    try {
+      Tuple2<String, String>[] keyValues = new Tuple2[count];
+      String[] records = jsonifyRecords(DATA_GENERATOR.generateInserts("000", 
count));
+      for (int i = 0; i < count; i++) {
+        // Drop fields that don't translate to json properly
+        Map node = OBJECT_MAPPER.readValue(records[i], Map.class);
+        node.remove("height");
+        node.remove("current_date");
+        node.remove("nation");
+        keyValues[i] = new Tuple2<>(Integer.toString(i % numPartitions), 
OBJECT_MAPPER.writeValueAsString(node));
+      }
+      testUtils.sendMessages(topic, keyValues);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Test
+  public void testErrorEventsForDataInRowForamt() throws IOException {
+    // topic setup.
+    final String topic = TEST_TOPIC_PREFIX + 
"testErrorEventsForDataInRowForamt";
+
+    testUtils.createTopic(topic, 2);
+    List<TopicPartition> topicPartitions = new ArrayList<>();
+    TopicPartition topicPartition0 = new TopicPartition(topic, 0);
+    topicPartitions.add(topicPartition0);
+    TopicPartition topicPartition1 = new TopicPartition(topic, 1);
+    topicPartitions.add(topicPartition1);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    sendJsonSafeMessagesToKafka(topic, 1000, 2);
+    testUtils.sendMessages(topic, new String[]{"error_event1", 
"error_event2"});
+
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+    props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
+    
props.put(ERROR_TABLE_BASE_PATH.key(),"/tmp/qurantine_table_test/json_kafka_row_events");
+    props.put(ERROR_TARGET_TABLE.key(),"json_kafka_row_events");
+    props.put("hoodie.errortable.validate.targetschema.enable", "true");
+    props.put("hoodie.base.path","/tmp/json_kafka_row_events");
+    Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
+    Option<BaseErrorTableWriter> errorTableWriter = 
Option.of(getAnonymousErrorTableWriter(props));
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource, 
errorTableWriter);
+    assertEquals(1000, 
kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE).getBatch().get().count());
+    assertEquals(2,((JavaRDD)errorTableWriter.get().getErrorEvents(
+        HoodieActiveTimeline.createNewInstantTime(), 
Option.empty()).get()).count());
+  }
+
+  @Test
+  public void testErrorEventsForDataInAvroFormat() throws IOException {
+
+    // topic setup.
+    final String topic = TEST_TOPIC_PREFIX + 
"testErrorEventsForDataInAvroFormat";
+
+    testUtils.createTopic(topic, 2);
+    List<TopicPartition> topicPartitions = new ArrayList<>();
+    TopicPartition topicPartition0 = new TopicPartition(topic, 0);
+    topicPartitions.add(topicPartition0);
+    TopicPartition topicPartition1 = new TopicPartition(topic, 1);
+    topicPartitions.add(topicPartition1);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.sendMessages(topic, 
jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    testUtils.sendMessages(topic, new String[]{"error_event1", 
"error_event2"});
+
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+    props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
+    
props.put(ERROR_TABLE_BASE_PATH.key(),"/tmp/qurantine_table_test/json_kafka_events");
+    props.put(ERROR_TARGET_TABLE.key(),"json_kafka_events");
+    props.put("hoodie.base.path","/tmp/json_kafka_events");
+
+    Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
+    Option<BaseErrorTableWriter> errorTableWriter = 
Option.of(getAnonymousErrorTableWriter(props));
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource, 
errorTableWriter);
+    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(),Long.MAX_VALUE);
+    assertEquals(1000,fetch1.getBatch().get().count());
+    assertEquals(2, ((JavaRDD)errorTableWriter.get().getErrorEvents(
+        HoodieActiveTimeline.createNewInstantTime(), 
Option.empty()).get()).count());
+  }
+
+  private BaseErrorTableWriter getAnonymousErrorTableWriter(TypedProperties 
props) {
+    return new BaseErrorTableWriter<ErrorEvent<String>>(new 
HoodieDeltaStreamer.Config(),
+        spark(), props, jsc(), fs()) {
+      List<JavaRDD<HoodieAvroRecord>> errorEvents = new LinkedList();
+
+      @Override
+      public void addErrorEvents(JavaRDD errorEvent) {
+        errorEvents.add(errorEvent.map(r -> new HoodieAvroRecord<>(new 
HoodieKey(), null)));
+      }
+
+      @Override
+      public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String 
baseTableInstantTime, Option commitedInstantTime) {
+        return Option.of(errorEvents.stream().reduce((rdd1, rdd2) -> 
rdd1.union(rdd2)).get());
+      }
+
+      @Override
+      public boolean upsertAndCommit(String baseTableInstantTime, Option 
commitedInstantTime) {
+        return false;
+      }
+    };
+  }
 }

Reply via email to