This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 11861c8a50e [HUDI-7298] Write bad records to error table in more cases 
instead of failing stream (#10500)
11861c8a50e is described below

commit 11861c8a50e7dd23186d44bdc7aef871e5fc1280
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Jan 24 22:59:29 2024 -0500

    [HUDI-7298] Write bad records to error table in more cases instead of 
failing stream (#10500)
    
    Cases:
    - No transformers, with schema provider. Records will go to the error table 
if they cannot be rewritten in the deduced schema.
    - recordkey is null, even if the column is nullable in the schema
---
 .../apache/hudi/config/HoodieErrorTableConfig.java |   6 ++
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  21 +++++
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  33 ++++++-
 .../org/apache/hudi/TestHoodieSparkUtils.scala     |   4 +
 .../apache/hudi/utilities/streamer/ErrorEvent.java |   6 +-
 .../utilities/streamer/HoodieStreamerUtils.java    |  68 ++++++++++----
 .../apache/hudi/utilities/streamer/StreamSync.java |  19 +++-
 ...TestHoodieDeltaStreamerSchemaEvolutionBase.java |  63 +++++++++++++
 ...oodieDeltaStreamerSchemaEvolutionExtensive.java | 100 +++++++++++++++++++--
 ...estHoodieDeltaStreamerSchemaEvolutionQuick.java |  18 ++--
 .../utilities/sources/TestGenericRddTransform.java |  29 ++++++
 .../schema-evolution/testMissingRecordKey.json     |   2 +
 12 files changed, 334 insertions(+), 35 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
index 68e2097c33b..8ba013b00ee 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
@@ -72,6 +72,12 @@ public class HoodieErrorTableConfig {
       .defaultValue(false)
       .withDocumentation("Records with schema mismatch with Target Schema are 
sent to Error Table.");
 
+  public static final ConfigProperty<Boolean> 
ERROR_ENABLE_VALIDATE_RECORD_CREATION = ConfigProperty
+      .key("hoodie.errortable.validate.recordcreation.enable")
+      .defaultValue(true)
+      .sinceVersion("0.14.2")
+      .withDocumentation("Records that fail to be created due to keygeneration 
failure or other issues will be sent to the Error Table");
+
   public static final ConfigProperty<String> 
ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
       .key("hoodie.errortable.write.failure.strategy")
       .defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 527864fcf24..535af8db193 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -199,6 +199,27 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
     }
   }
 
+  /**
+   * Rerwite the record into the target schema.
+   * Return tuple of rewritten records and records that could not be converted
+   */
+  def safeRewriteRDD(df: RDD[GenericRecord], serializedTargetSchema: String): 
Tuple2[RDD[GenericRecord], RDD[String]] = {
+    val rdds: RDD[Either[GenericRecord, String]] = df.mapPartitions { recs =>
+      if (recs.isEmpty) {
+        Iterator.empty
+      } else {
+        val schema = new Schema.Parser().parse(serializedTargetSchema)
+        val transform: GenericRecord => Either[GenericRecord, String] = record 
=> try {
+          Left(HoodieAvroUtils.rewriteRecordDeep(record, schema, true))
+        } catch {
+          case _: Throwable => Right(HoodieAvroUtils.avroToJsonString(record, 
false))
+        }
+        recs.map(transform)
+      }
+    }
+    (rdds.filter(_.isLeft).map(_.left.get), 
rdds.filter(_.isRight).map(_.right.get))
+  }
+
   def getCatalystRowSerDe(structType: StructType): SparkRowSerDe = {
     sparkAdapter.createSparkRowSerDe(structType)
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index ac7dcd42979..9b925eb59be 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -187,6 +187,16 @@ public class HoodieAvroUtils {
     }
   }
 
+  /**
+   * Convert a given avro record to json and return the string
+   *
+   * @param record The GenericRecord to convert
+   * @param pretty Whether to pretty-print the json output
+   */
+  public static String avroToJsonString(GenericRecord record, boolean pretty) 
throws IOException {
+    return avroToJsonHelper(record, pretty).toString();
+  }
+
   /**
    * Convert a given avro record to json and return the encoded bytes.
    *
@@ -194,12 +204,16 @@ public class HoodieAvroUtils {
    * @param pretty Whether to pretty-print the json output
    */
   public static byte[] avroToJson(GenericRecord record, boolean pretty) throws 
IOException {
+    return avroToJsonHelper(record, pretty).toByteArray();
+  }
+
+  private static ByteArrayOutputStream avroToJsonHelper(GenericRecord record, 
boolean pretty) throws IOException {
     DatumWriter<Object> writer = new GenericDatumWriter<>(record.getSchema());
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     JsonEncoder jsonEncoder = 
EncoderFactory.get().jsonEncoder(record.getSchema(), out, pretty);
     writer.write(record, jsonEncoder);
     jsonEncoder.flush();
-    return out.toByteArray();
+    return out;
   }
 
   /**
@@ -328,6 +342,23 @@ public class HoodieAvroUtils {
     return "string,string,string,string,string," + hiveColumnTypes;
   }
 
+  public static Schema makeFieldNonNull(Schema schema, String fieldName, 
Object fieldDefaultValue) {
+    ValidationUtils.checkArgument(fieldDefaultValue != null);
+    List<Schema.Field> filteredFields = schema.getFields()
+        .stream()
+        .map(field -> {
+          if (Objects.equals(field.name(), fieldName)) {
+            return new Schema.Field(field.name(), 
AvroSchemaUtils.resolveNullableSchema(field.schema()), field.doc(), 
fieldDefaultValue);
+          } else {
+            return new Schema.Field(field.name(), field.schema(), field.doc(), 
field.defaultVal());
+          }
+        })
+        .collect(Collectors.toList());
+    Schema withNonNullField = Schema.createRecord(schema.getName(), 
schema.getDoc(), schema.getNamespace(), false);
+    withNonNullField.setFields(filteredFields);
+    return withNonNullField;
+  }
+
   private static Schema initRecordKeySchema() {
     Schema.Field recordKeyField =
         new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
index 36ac37cfd6d..15b6b2b35da 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
@@ -228,6 +228,10 @@ object TestHoodieSparkUtils {
     })
   }
 
+  def getSchemaColumnNotNullable(structType: StructType, columnName: String): 
StructType = {
+    setNullableRec(structType, columnName.split('.'), 0)
+  }
+
   def setColumnNotNullable(df: DataFrame, columnName: String): DataFrame = {
     // get schema
     val schema = df.schema
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
index 714225f23ab..f268464d6f1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
@@ -53,6 +53,10 @@ public class ErrorEvent<T> {
     // Failure during hudi writes
     HUDI_WRITE_FAILURES,
     // Failure during transformation of source to target RDD
-    CUSTOM_TRANSFORMER_FAILURE
+    CUSTOM_TRANSFORMER_FAILURE,
+    // record schema is not valid for the table
+    INVALID_RECORD_SCHEMA,
+    // exception when attempting to create HoodieRecord
+    RECORD_CREATION
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index a6f9513a14e..44c367ba384 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -31,9 +31,11 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -50,6 +52,7 @@ import org.apache.spark.sql.avro.HoodieAvroDeserializer;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -58,6 +61,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
+import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_RECORD_CREATION;
 
 
 /**
@@ -70,39 +74,49 @@ public class HoodieStreamerUtils {
    * Takes care of dropping columns, precombine, auto key generation.
    * Both AVRO and SPARK record types are supported.
    */
-  static Option<JavaRDD<HoodieRecord>> 
createHoodieRecords(HoodieStreamer.Config cfg, TypedProperties props, 
Option<JavaRDD<GenericRecord>> avroRDDOptional,
-                                  SchemaProvider schemaProvider, 
HoodieRecord.HoodieRecordType recordType, boolean autoGenerateRecordKeys,
-                                  String instantTime) {
+  public static Option<JavaRDD<HoodieRecord>> 
createHoodieRecords(HoodieStreamer.Config cfg, TypedProperties props, 
Option<JavaRDD<GenericRecord>> avroRDDOptional,
+                                                                  
SchemaProvider schemaProvider, HoodieRecord.HoodieRecordType recordType, 
boolean autoGenerateRecordKeys,
+                                                                  String 
instantTime, Option<BaseErrorTableWriter> errorTableWriter) {
     boolean shouldCombine = cfg.filterDupes || 
cfg.operation.equals(WriteOperationType.UPSERT);
+    boolean shouldErrorTable = errorTableWriter.isPresent() && 
props.getBoolean(ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(), 
ERROR_ENABLE_VALIDATE_RECORD_CREATION.defaultValue());
     Set<String> partitionColumns = getPartitionColumns(props);
     return avroRDDOptional.map(avroRDD -> {
-      JavaRDD<HoodieRecord> records;
       SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetSchema());
       SerializableSchema processedAvroSchema = new 
SerializableSchema(isDropPartitionColumns(props) ? 
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
+      JavaRDD<Either<HoodieRecord,String>> records;
       if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
         records = avroRDD.mapPartitions(
-            (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>) 
genericRecordIterator -> {
+            (FlatMapFunction<Iterator<GenericRecord>, 
Either<HoodieRecord,String>>) genericRecordIterator -> {
               if (autoGenerateRecordKeys) {
                 
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
                 
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
               }
               BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
-              List<HoodieRecord> avroRecords = new ArrayList<>();
+              List<Either<HoodieRecord,String>> avroRecords = new 
ArrayList<>();
               while (genericRecordIterator.hasNext()) {
                 GenericRecord genRec = genericRecordIterator.next();
-                HoodieKey hoodieKey = new 
HoodieKey(builtinKeyGenerator.getRecordKey(genRec), 
builtinKeyGenerator.getPartitionPath(genRec));
-                GenericRecord gr = isDropPartitionColumns(props) ? 
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
-                HoodieRecordPayload payload = shouldCombine ? 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-                    (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false, props.getBoolean(
-                        
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                        
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
-                    : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
-                avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload));
+                try {
+                  HoodieKey hoodieKey = new 
HoodieKey(builtinKeyGenerator.getRecordKey(genRec), 
builtinKeyGenerator.getPartitionPath(genRec));
+                  GenericRecord gr = isDropPartitionColumns(props) ? 
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
+                  HoodieRecordPayload payload = shouldCombine ? 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
+                      (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false, props.getBoolean(
+                          
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+                          
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
+                      : DataSourceUtils.createPayload(cfg.payloadClassName, 
gr);
+                  avroRecords.add(Either.left(new 
HoodieAvroRecord<>(hoodieKey, payload)));
+                } catch (Exception e) {
+                  if (!shouldErrorTable) {
+                    throw e;
+                  }
+                  
avroRecords.add(Either.right(HoodieAvroUtils.avroToJsonString(genRec, false)));
+                }
               }
               return avroRecords.iterator();
             });
+
       } else if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
         // TODO we should remove it if we can read InternalRow from source.
+
         records = avroRDD.mapPartitions(itr -> {
           if (autoGenerateRecordKeys) {
             props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
@@ -116,16 +130,32 @@ public class HoodieStreamerUtils {
 
           return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), 
rec -> {
             InternalRow row = (InternalRow) 
deserializer.deserialize(rec).get();
-            String recordKey = builtinKeyGenerator.getRecordKey(row, 
baseStructType).toString();
-            String partitionPath = builtinKeyGenerator.getPartitionPath(row, 
baseStructType).toString();
-            return new HoodieSparkRecord(new HoodieKey(recordKey, 
partitionPath),
-                
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false);
+            try {
+              String recordKey = builtinKeyGenerator.getRecordKey(row, 
baseStructType).toString();
+              String partitionPath = builtinKeyGenerator.getPartitionPath(row, 
baseStructType).toString();
+              return Either.left(new HoodieSparkRecord(new 
HoodieKey(recordKey, partitionPath),
+                  
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false));
+            } catch (Exception e) {
+              if (!shouldErrorTable) {
+                throw e;
+              }
+              try {
+                return Either.right(HoodieAvroUtils.avroToJsonString(rec, 
false));
+              } catch (IOException ex) {
+                throw new HoodieIOException("Failed to convert illegal record 
to json", ex);
+              }
+            }
           });
+
         });
       } else {
         throw new UnsupportedOperationException(recordType.name());
       }
-      return records;
+      if (shouldErrorTable) {
+        
errorTableWriter.get().addErrorEvents(records.filter(Either::isRight).map(Either::asRight).map(evStr
 -> new ErrorEvent<>(evStr,
+            ErrorEvent.ErrorReason.RECORD_CREATION)));
+      }
+      return records.filter(Either::isLeft).map(Either::asLeft);
     });
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 8d629299684..659f6429f01 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -545,7 +545,7 @@ public class StreamSync implements Serializable, Closeable {
       return inputBatch;
     } else {
       Option<JavaRDD<HoodieRecord>> recordsOpt = 
HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(), 
schemaProvider,
-          recordType, autoGenerateRecordKeys, instantTime);
+          recordType, autoGenerateRecordKeys, instantTime, errorTableWriter);
       return new InputBatch(recordsOpt, checkpointStr, schemaProvider);
     }
   }
@@ -633,8 +633,21 @@ public class StreamSync implements Serializable, Closeable 
{
         // Rewrite transformed records into the expected target schema
         schemaProvider = 
getDeducedSchemaProvider(dataAndCheckpoint.getSchemaProvider().getTargetSchema(),
 dataAndCheckpoint.getSchemaProvider(), metaClient);
         String serializedTargetSchema = 
schemaProvider.getTargetSchema().toString();
-        avroRDDOptional = dataAndCheckpoint.getBatch().map(t -> 
t.mapPartitions(iterator ->
-            new LazyCastingIterator(iterator, serializedTargetSchema)));
+        if (errorTableWriter.isPresent()
+            && 
props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
+            
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue())) {
+          avroRDDOptional = dataAndCheckpoint.getBatch().map(
+              records -> {
+                Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs = 
HoodieSparkUtils.safeRewriteRDD(records.rdd(), serializedTargetSchema);
+                
errorTableWriter.get().addErrorEvents(safeCreateRDDs._2().toJavaRDD()
+                    .map(evStr -> new ErrorEvent<>(evStr,
+                        ErrorEvent.ErrorReason.INVALID_RECORD_SCHEMA)));
+                return safeCreateRDDs._1.toJavaRDD();
+              });
+        } else {
+          avroRDDOptional = dataAndCheckpoint.getBatch().map(t -> 
t.mapPartitions(iterator ->
+              new LazyCastingIterator(iterator, serializedTargetSchema)));
+        }
       }
     }
     if (useRowWriter) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 6b364667054..1b983b65e97 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -22,30 +22,38 @@ package org.apache.hudi.utilities.deltastreamer;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.TestHoodieSparkUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieErrorTableConfig;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.AvroKafkaSource;
 import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
 import org.apache.hudi.utilities.streamer.HoodieStreamer;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
@@ -80,6 +88,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
 
   protected String tableType;
   protected String tableBasePath;
+  protected String tableName;
   protected Boolean shouldCluster;
   protected Boolean shouldCompact;
   protected Boolean rowWriterEnable;
@@ -90,6 +99,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
   protected String sourceSchemaFile;
   protected String targetSchemaFile;
   protected boolean useKafkaSource;
+  protected boolean withErrorTable;
   protected boolean useTransformer;
   protected boolean userProvidedSchema;
 
@@ -107,8 +117,11 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
   @BeforeEach
   public void setupTest() {
     super.setupTest();
+    TestErrorTable.commited = new HashMap<>();
+    TestErrorTable.errorEvents = new ArrayList<>();
     useSchemaProvider = false;
     hasTransformer = false;
+    withErrorTable = false;
     sourceSchemaFile = "";
     targetSchemaFile = "";
     topicName = "topic" + testNum;
@@ -174,6 +187,16 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
       
extraProps.setProperty(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), 
"_row_key");
     }
 
+    if (withErrorTable) {
+      extraProps.setProperty(HoodieErrorTableConfig.ERROR_TABLE_ENABLED.key(), 
"true");
+      
extraProps.setProperty(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
 "true");
+      
extraProps.setProperty(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(),
 "true");
+      extraProps.setProperty(HoodieErrorTableConfig.ERROR_TARGET_TABLE.key(), 
tableName + "ERROR");
+      
extraProps.setProperty(HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH.key(), 
basePath + tableName + "ERROR");
+      
extraProps.setProperty(HoodieErrorTableConfig.ERROR_TABLE_WRITE_CLASS.key(), 
TestErrorTable.class.getName());
+      extraProps.setProperty("hoodie.base.path", tableBasePath);
+    }
+
     List<String> transformerClassNames = new ArrayList<>();
     Collections.addAll(transformerClassNames, transformerClasses);
 
@@ -196,6 +219,9 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
   protected void addData(Dataset<Row> df, Boolean isFirst) {
     if (useSchemaProvider) {
       TestSchemaProvider.sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(), 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE);
+      if (withErrorTable && isFirst) {
+        
TestSchemaProvider.setTargetSchema(AvroConversionUtils.convertStructTypeToAvroSchema(TestHoodieSparkUtils.getSchemaColumnNotNullable(df.schema(),
 "_row_key"),"idk", "idk"));
+      }
     }
     if (useKafkaSource) {
       addKafkaData(df, isFirst);
@@ -303,4 +329,41 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
       TestSchemaProvider.targetSchema = null;
     }
   }
+
+  public static class TestErrorTable extends BaseErrorTableWriter {
+
+    public static List<JavaRDD> errorEvents = new ArrayList<>();
+    public static Map<String,Option<JavaRDD>> commited = new HashMap<>();
+    public TestErrorTable(HoodieStreamer.Config cfg, SparkSession 
sparkSession, TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext,
+                          FileSystem fs) {
+      super(cfg, sparkSession, props, hoodieSparkContext, fs);
+    }
+
+    @Override
+    public void addErrorEvents(JavaRDD errorEvent) {
+      errorEvents.add(errorEvent);
+    }
+
+    @Override
+    public boolean upsertAndCommit(String baseTableInstantTime, Option 
commitedInstantTime) {
+      if (errorEvents.size() > 0) {
+        JavaRDD errorsCombined = errorEvents.get(0);
+        for (int i = 1; i < errorEvents.size(); i++) {
+          errorsCombined = errorsCombined.union(errorEvents.get(i));
+        }
+        commited.put(baseTableInstantTime, Option.of(errorsCombined));
+        errorEvents = new ArrayList<>();
+
+      } else {
+        commited.put(baseTableInstantTime, Option.empty());
+      }
+      return true;
+    }
+
+    @Override
+    public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String 
baseTableInstantTime, Option commitedInstantTime) {
+      return Option.empty();
+    }
+  }
+
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
index f330bb62732..7adc3f66684 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
@@ -20,7 +20,10 @@
 package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.TestHoodieSparkUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.streamer.ErrorEvent;
 
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -31,7 +34,9 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
@@ -45,16 +50,24 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class TestHoodieDeltaStreamerSchemaEvolutionExtensive extends 
TestHoodieDeltaStreamerSchemaEvolutionBase {
 
   protected void testBase(String updateFile, String updateColumn, String 
condition, int count) throws Exception {
+    testBase(updateFile, updateColumn, condition, count, null);
+  }
+
+  protected void testBase(String updateFile, String updateColumn, String 
condition, int count, ErrorEvent.ErrorReason reason) throws Exception {
     Map<String,Integer> conditions = new HashMap<>();
     conditions.put(condition, count);
-    testBase(updateFile, updateColumn, conditions, true);
+    testBase(updateFile, updateColumn, conditions, true, reason);
 
     //adding non-nullable cols should fail, but instead it is adding nullable 
cols
     //assertThrows(Exception.class, () -> testBase(tableType, shouldCluster, 
shouldCompact, reconcileSchema, rowWriterEnable, updateFile, updateColumn, 
condition, count, false));
   }
 
   protected void testBase(String updateFile, String updateColumn, 
Map<String,Integer> conditions) throws Exception {
-    testBase(updateFile, updateColumn, conditions, true);
+    testBase(updateFile, updateColumn, conditions, null);
+  }
+
+  protected void testBase(String updateFile, String updateColumn, 
Map<String,Integer> conditions, ErrorEvent.ErrorReason reason) throws Exception 
{
+    testBase(updateFile, updateColumn, conditions, true, reason);
   }
 
   protected void doFirstDeltaWrite() throws Exception {
@@ -100,10 +113,11 @@ public class 
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
   /**
    * Main testing logic for non-type promotion tests
    */
-  protected void testBase(String updateFile, String updateColumn, 
Map<String,Integer> conditions, Boolean nullable) throws Exception {
+  protected void testBase(String updateFile, String updateColumn, 
Map<String,Integer> conditions, Boolean nullable, ErrorEvent.ErrorReason 
reason) throws Exception {
     boolean isCow = tableType.equals("COPY_ON_WRITE");
     PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum++;
-    tableBasePath = basePath + "test_parquet_table" + testNum;
+    tableName = "test_parquet_table" + testNum;
+    tableBasePath = basePath + tableName;
     this.deltaStreamer = new HoodieDeltaStreamer(getDeltaStreamerConfig(), 
jsc);
 
     //first write
@@ -149,6 +163,8 @@ public class 
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
       if (updateFile.equals("testAddColChangeOrderAllFiles.json")) {
         //this test updates all 3 partitions instead of 2 like the rest of the 
tests
         numFiles++;
+      } else if (withErrorTable) {
+        numFiles--;
       }
       assertFileNumber(numFiles, false);
     }
@@ -161,6 +177,19 @@ public class 
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
       assertEquals(conditions.get(condition).intValue(), 
df.filter(condition).count());
     }
 
+    if (withErrorTable) {
+      List<ErrorEvent> recs = new ArrayList<>();
+      for (String key : TestErrorTable.commited.keySet()) {
+        Option<JavaRDD> errors = TestErrorTable.commited.get(key);
+        if (errors.isPresent()) {
+          if (!errors.get().isEmpty()) {
+            recs.addAll(errors.get().collect());
+          }
+        }
+      }
+      assertEquals(1, recs.size());
+      assertEquals(recs.get(0).getReason(), reason);
+    }
   }
 
   protected static Stream<Arguments> testArgs() {
@@ -183,6 +212,66 @@ public class 
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
     return b.build();
   }
 
+  @ParameterizedTest
+  @MethodSource("testArgs")
+  public void testErrorTable(String tableType,
+                             Boolean shouldCluster,
+                             Boolean shouldCompact,
+                             Boolean rowWriterEnable,
+                             Boolean addFilegroups,
+                             Boolean multiLogFiles) throws Exception {
+    this.withErrorTable = true;
+    this.useSchemaProvider = false;
+    this.useTransformer = false;
+    this.tableType = tableType;
+    this.shouldCluster = shouldCluster;
+    this.shouldCompact = shouldCompact;
+    this.rowWriterEnable = rowWriterEnable;
+    this.addFilegroups = addFilegroups;
+    this.multiLogFiles = multiLogFiles;
+    testBase("testMissingRecordKey.json", "driver", "driver = 'driver-003'", 
1, ErrorEvent.ErrorReason.RECORD_CREATION);
+  }
+
+  @ParameterizedTest
+  @MethodSource("testArgs")
+  public void testErrorTableWithSchemaProvider(String tableType,
+                                               Boolean shouldCluster,
+                                               Boolean shouldCompact,
+                                               Boolean rowWriterEnable,
+                                               Boolean addFilegroups,
+                                               Boolean multiLogFiles) throws 
Exception {
+    this.withErrorTable = true;
+    this.useSchemaProvider = true;
+    this.useTransformer = false;
+    this.tableType = tableType;
+    this.shouldCluster = shouldCluster;
+    this.shouldCompact = shouldCompact;
+    this.rowWriterEnable = rowWriterEnable;
+    this.addFilegroups = addFilegroups;
+    this.multiLogFiles = multiLogFiles;
+    testBase("testMissingRecordKey.json", "driver", "driver = 'driver-003'", 
1, ErrorEvent.ErrorReason.INVALID_RECORD_SCHEMA);
+  }
+
+  @ParameterizedTest
+  @MethodSource("testArgs")
+  public void testErrorTableWithTransformer(String tableType,
+                             Boolean shouldCluster,
+                             Boolean shouldCompact,
+                             Boolean rowWriterEnable,
+                             Boolean addFilegroups,
+                             Boolean multiLogFiles) throws Exception {
+    this.withErrorTable = true;
+    this.useSchemaProvider = true;
+    this.useTransformer = true;
+    this.tableType = tableType;
+    this.shouldCluster = shouldCluster;
+    this.shouldCompact = shouldCompact;
+    this.rowWriterEnable = rowWriterEnable;
+    this.addFilegroups = addFilegroups;
+    this.multiLogFiles = multiLogFiles;
+    testBase("testMissingRecordKey.json", "driver", "driver = 'driver-003'", 
1, ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE);
+  }
+
   /**
    * Add a new column at root level at the end
    */
@@ -367,7 +456,8 @@ public class 
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
   protected void testTypePromotionBase(String colName, DataType startType, 
DataType updateType, DataType endType) throws Exception {
     boolean isCow = tableType.equals("COPY_ON_WRITE");
     PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum++;
-    tableBasePath = basePath + "test_parquet_table" + testNum;
+    tableName = "test_parquet_table" + testNum;
+    tableBasePath = basePath + tableName;
     this.deltaStreamer = new HoodieDeltaStreamer(getDeltaStreamerConfig(), 
jsc);
 
     //first write
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index 959446a63dc..096ddf14cc7 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -157,7 +157,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     this.useTransformer = true;
     boolean isCow = tableType.equals("COPY_ON_WRITE");
     PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
-    tableBasePath = basePath + "test_parquet_table" + testNum;
+    tableName = "test_parquet_table" + testNum;
+    tableBasePath = basePath + tableName;
     this.deltaStreamer = new 
HoodieDeltaStreamer(getDeltaStreamerConfig(allowNullForDeletedCols), jsc);
 
     //first write
@@ -283,7 +284,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
 
     boolean isCow = tableType.equals("COPY_ON_WRITE");
     PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
-    tableBasePath = basePath + "test_parquet_table" + testNum;
+    tableName =  "test_parquet_table" + testNum;
+    tableBasePath = basePath + tableName;
 
     //first write
     String datapath = 
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
@@ -353,7 +355,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
 
     boolean isCow = tableType.equals("COPY_ON_WRITE");
     PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
-    tableBasePath = basePath + "test_parquet_table" + testNum;
+    tableName = "test_parquet_table" + testNum;
+    tableBasePath = basePath + tableName;
 
     //first write
     String datapath = 
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
@@ -431,7 +434,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
 
     boolean isCow = tableType.equals("COPY_ON_WRITE");
     PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
-    tableBasePath = basePath + "test_parquet_table" + testNum;
+    tableName = "test_parquet_table" + testNum;
+    tableBasePath = basePath + tableName;
 
     //first write
     String datapath = 
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
@@ -510,7 +514,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
 
     boolean isCow = tableType.equals("COPY_ON_WRITE");
     PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
-    tableBasePath = basePath + "test_parquet_table" + testNum;
+    tableName = "test_parquet_table" + testNum;
+    tableBasePath = basePath + tableName;
 
     //first write
     String datapath = 
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
@@ -597,7 +602,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
 
     boolean isCow = tableType.equals("COPY_ON_WRITE");
     PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
-    tableBasePath = basePath + "test_parquet_table" + testNum;
+    tableName = "test_parquet_table" + testNum;
+    tableBasePath = basePath + tableName;
 
     //first write
     String datapath = 
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
index 78bc21ecf92..8adfdb4dc37 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
@@ -20,11 +20,13 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.types.DataTypes;
@@ -33,8 +35,11 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Test;
 
+import java.util.List;
+
 import scala.Tuple2;
 
+import static org.apache.hudi.avro.HoodieAvroUtils.makeFieldNonNull;
 import static org.apache.spark.sql.functions.expr;
 import static org.apache.spark.sql.functions.lit;
 import static org.apache.spark.sql.functions.when;
@@ -54,4 +59,28 @@ public class TestGenericRddTransform extends 
SparkClientFunctionalTestHarness {
     assertEquals(5, failSafeRdds._1.count());
     assertEquals(5, failSafeRdds._2.count());
   }
+
+  @Test
+  public void testGenericRddConvert() {
+    String fieldToNull = "partition_path";
+    String schemaStr = makeFieldNonNull(HoodieTestDataGenerator.AVRO_SCHEMA, 
fieldToNull, "").toString();
+    HoodieTestDataGenerator datagen = new HoodieTestDataGenerator();
+    List<GenericRecord> recs = datagen.generateGenericRecords(10);
+    for (int i = 0; i < recs.size(); i++) {
+      if (i % 2 == 0) {
+        recs.get(i).put(fieldToNull, null);
+      }
+    }
+    JavaSparkContext jsc = jsc();
+    RDD<GenericRecord> rdd = jsc.parallelize(recs).rdd();
+    Tuple2<RDD<GenericRecord>, RDD<String>> failSafeRdds = 
HoodieSparkUtils.safeRewriteRDD(rdd, schemaStr);
+    assertEquals(5, failSafeRdds._1.count());
+    assertEquals(5, failSafeRdds._2.count());
+
+    //if field is nullable, no records should fail validation
+    failSafeRdds = HoodieSparkUtils.safeRewriteRDD(rdd, 
HoodieTestDataGenerator.AVRO_SCHEMA.toString());
+    assertEquals(10, failSafeRdds._1.count());
+    assertEquals(0, failSafeRdds._2.count());
+  }
+
 }
diff --git 
a/hudi-utilities/src/test/resources/data/schema-evolution/testMissingRecordKey.json
 
b/hudi-utilities/src/test/resources/data/schema-evolution/testMissingRecordKey.json
new file mode 100644
index 00000000000..c3b65587e2d
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/data/schema-evolution/testMissingRecordKey.json
@@ -0,0 +1,2 @@
+{"timestamp":3,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-003","driver":"driver-003","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"three","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare
 [...]
+{"timestamp":3,"_row_key":null,"partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-003","driver":"driver-003","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"distance_in_meters":1137109733,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"three","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"curr
 [...]


Reply via email to