This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 11861c8a50e [HUDI-7298] Write bad records to error table in more cases
instead of failing stream (#10500)
11861c8a50e is described below
commit 11861c8a50e7dd23186d44bdc7aef871e5fc1280
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Jan 24 22:59:29 2024 -0500
[HUDI-7298] Write bad records to error table in more cases instead of
failing stream (#10500)
Cases:
- No transformers, with schema provider. Records will go to the error table
if they cannot be rewritten in the deduced schema.
- recordkey is null, even if the column is nullable in the schema
---
.../apache/hudi/config/HoodieErrorTableConfig.java | 6 ++
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 21 +++++
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 33 ++++++-
.../org/apache/hudi/TestHoodieSparkUtils.scala | 4 +
.../apache/hudi/utilities/streamer/ErrorEvent.java | 6 +-
.../utilities/streamer/HoodieStreamerUtils.java | 68 ++++++++++----
.../apache/hudi/utilities/streamer/StreamSync.java | 19 +++-
...TestHoodieDeltaStreamerSchemaEvolutionBase.java | 63 +++++++++++++
...oodieDeltaStreamerSchemaEvolutionExtensive.java | 100 +++++++++++++++++++--
...estHoodieDeltaStreamerSchemaEvolutionQuick.java | 18 ++--
.../utilities/sources/TestGenericRddTransform.java | 29 ++++++
.../schema-evolution/testMissingRecordKey.json | 2 +
12 files changed, 334 insertions(+), 35 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
index 68e2097c33b..8ba013b00ee 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
@@ -72,6 +72,12 @@ public class HoodieErrorTableConfig {
.defaultValue(false)
.withDocumentation("Records with schema mismatch with Target Schema are
sent to Error Table.");
+ public static final ConfigProperty<Boolean>
ERROR_ENABLE_VALIDATE_RECORD_CREATION = ConfigProperty
+ .key("hoodie.errortable.validate.recordcreation.enable")
+ .defaultValue(true)
+ .sinceVersion("0.14.2")
+ .withDocumentation("Records that fail to be created due to keygeneration
failure or other issues will be sent to the Error Table");
+
public static final ConfigProperty<String>
ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
.key("hoodie.errortable.write.failure.strategy")
.defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 527864fcf24..535af8db193 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -199,6 +199,27 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
}
}
+ /**
+ * Rerwite the record into the target schema.
+ * Return tuple of rewritten records and records that could not be converted
+ */
+ def safeRewriteRDD(df: RDD[GenericRecord], serializedTargetSchema: String):
Tuple2[RDD[GenericRecord], RDD[String]] = {
+ val rdds: RDD[Either[GenericRecord, String]] = df.mapPartitions { recs =>
+ if (recs.isEmpty) {
+ Iterator.empty
+ } else {
+ val schema = new Schema.Parser().parse(serializedTargetSchema)
+ val transform: GenericRecord => Either[GenericRecord, String] = record
=> try {
+ Left(HoodieAvroUtils.rewriteRecordDeep(record, schema, true))
+ } catch {
+ case _: Throwable => Right(HoodieAvroUtils.avroToJsonString(record,
false))
+ }
+ recs.map(transform)
+ }
+ }
+ (rdds.filter(_.isLeft).map(_.left.get),
rdds.filter(_.isRight).map(_.right.get))
+ }
+
def getCatalystRowSerDe(structType: StructType): SparkRowSerDe = {
sparkAdapter.createSparkRowSerDe(structType)
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index ac7dcd42979..9b925eb59be 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -187,6 +187,16 @@ public class HoodieAvroUtils {
}
}
+ /**
+ * Convert a given avro record to json and return the string
+ *
+ * @param record The GenericRecord to convert
+ * @param pretty Whether to pretty-print the json output
+ */
+ public static String avroToJsonString(GenericRecord record, boolean pretty)
throws IOException {
+ return avroToJsonHelper(record, pretty).toString();
+ }
+
/**
* Convert a given avro record to json and return the encoded bytes.
*
@@ -194,12 +204,16 @@ public class HoodieAvroUtils {
* @param pretty Whether to pretty-print the json output
*/
public static byte[] avroToJson(GenericRecord record, boolean pretty) throws
IOException {
+ return avroToJsonHelper(record, pretty).toByteArray();
+ }
+
+ private static ByteArrayOutputStream avroToJsonHelper(GenericRecord record,
boolean pretty) throws IOException {
DatumWriter<Object> writer = new GenericDatumWriter<>(record.getSchema());
ByteArrayOutputStream out = new ByteArrayOutputStream();
JsonEncoder jsonEncoder =
EncoderFactory.get().jsonEncoder(record.getSchema(), out, pretty);
writer.write(record, jsonEncoder);
jsonEncoder.flush();
- return out.toByteArray();
+ return out;
}
/**
@@ -328,6 +342,23 @@ public class HoodieAvroUtils {
return "string,string,string,string,string," + hiveColumnTypes;
}
+ public static Schema makeFieldNonNull(Schema schema, String fieldName,
Object fieldDefaultValue) {
+ ValidationUtils.checkArgument(fieldDefaultValue != null);
+ List<Schema.Field> filteredFields = schema.getFields()
+ .stream()
+ .map(field -> {
+ if (Objects.equals(field.name(), fieldName)) {
+ return new Schema.Field(field.name(),
AvroSchemaUtils.resolveNullableSchema(field.schema()), field.doc(),
fieldDefaultValue);
+ } else {
+ return new Schema.Field(field.name(), field.schema(), field.doc(),
field.defaultVal());
+ }
+ })
+ .collect(Collectors.toList());
+ Schema withNonNullField = Schema.createRecord(schema.getName(),
schema.getDoc(), schema.getNamespace(), false);
+ withNonNullField.setFields(filteredFields);
+ return withNonNullField;
+ }
+
private static Schema initRecordKeySchema() {
Schema.Field recordKeyField =
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD,
METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
index 36ac37cfd6d..15b6b2b35da 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
@@ -228,6 +228,10 @@ object TestHoodieSparkUtils {
})
}
+ def getSchemaColumnNotNullable(structType: StructType, columnName: String):
StructType = {
+ setNullableRec(structType, columnName.split('.'), 0)
+ }
+
def setColumnNotNullable(df: DataFrame, columnName: String): DataFrame = {
// get schema
val schema = df.schema
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
index 714225f23ab..f268464d6f1 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
@@ -53,6 +53,10 @@ public class ErrorEvent<T> {
// Failure during hudi writes
HUDI_WRITE_FAILURES,
// Failure during transformation of source to target RDD
- CUSTOM_TRANSFORMER_FAILURE
+ CUSTOM_TRANSFORMER_FAILURE,
+ // record schema is not valid for the table
+ INVALID_RECORD_SCHEMA,
+ // exception when attempting to create HoodieRecord
+ RECORD_CREATION
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index a6f9513a14e..44c367ba384 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -31,9 +31,11 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -50,6 +52,7 @@ import org.apache.spark.sql.avro.HoodieAvroDeserializer;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
@@ -58,6 +61,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
+import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_RECORD_CREATION;
/**
@@ -70,39 +74,49 @@ public class HoodieStreamerUtils {
* Takes care of dropping columns, precombine, auto key generation.
* Both AVRO and SPARK record types are supported.
*/
- static Option<JavaRDD<HoodieRecord>>
createHoodieRecords(HoodieStreamer.Config cfg, TypedProperties props,
Option<JavaRDD<GenericRecord>> avroRDDOptional,
- SchemaProvider schemaProvider,
HoodieRecord.HoodieRecordType recordType, boolean autoGenerateRecordKeys,
- String instantTime) {
+ public static Option<JavaRDD<HoodieRecord>>
createHoodieRecords(HoodieStreamer.Config cfg, TypedProperties props,
Option<JavaRDD<GenericRecord>> avroRDDOptional,
+
SchemaProvider schemaProvider, HoodieRecord.HoodieRecordType recordType,
boolean autoGenerateRecordKeys,
+ String
instantTime, Option<BaseErrorTableWriter> errorTableWriter) {
boolean shouldCombine = cfg.filterDupes ||
cfg.operation.equals(WriteOperationType.UPSERT);
+ boolean shouldErrorTable = errorTableWriter.isPresent() &&
props.getBoolean(ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(),
ERROR_ENABLE_VALIDATE_RECORD_CREATION.defaultValue());
Set<String> partitionColumns = getPartitionColumns(props);
return avroRDDOptional.map(avroRDD -> {
- JavaRDD<HoodieRecord> records;
SerializableSchema avroSchema = new
SerializableSchema(schemaProvider.getTargetSchema());
SerializableSchema processedAvroSchema = new
SerializableSchema(isDropPartitionColumns(props) ?
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
+ JavaRDD<Either<HoodieRecord,String>> records;
if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
records = avroRDD.mapPartitions(
- (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>)
genericRecordIterator -> {
+ (FlatMapFunction<Iterator<GenericRecord>,
Either<HoodieRecord,String>>) genericRecordIterator -> {
if (autoGenerateRecordKeys) {
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()));
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
}
BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
- List<HoodieRecord> avroRecords = new ArrayList<>();
+ List<Either<HoodieRecord,String>> avroRecords = new
ArrayList<>();
while (genericRecordIterator.hasNext()) {
GenericRecord genRec = genericRecordIterator.next();
- HoodieKey hoodieKey = new
HoodieKey(builtinKeyGenerator.getRecordKey(genRec),
builtinKeyGenerator.getPartitionPath(genRec));
- GenericRecord gr = isDropPartitionColumns(props) ?
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
- HoodieRecordPayload payload = shouldCombine ?
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
- (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false, props.getBoolean(
-
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
- : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
- avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload));
+ try {
+ HoodieKey hoodieKey = new
HoodieKey(builtinKeyGenerator.getRecordKey(genRec),
builtinKeyGenerator.getPartitionPath(genRec));
+ GenericRecord gr = isDropPartitionColumns(props) ?
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
+ HoodieRecordPayload payload = shouldCombine ?
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
+ (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false, props.getBoolean(
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
+ : DataSourceUtils.createPayload(cfg.payloadClassName,
gr);
+ avroRecords.add(Either.left(new
HoodieAvroRecord<>(hoodieKey, payload)));
+ } catch (Exception e) {
+ if (!shouldErrorTable) {
+ throw e;
+ }
+
avroRecords.add(Either.right(HoodieAvroUtils.avroToJsonString(genRec, false)));
+ }
}
return avroRecords.iterator();
});
+
} else if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
// TODO we should remove it if we can read InternalRow from source.
+
records = avroRDD.mapPartitions(itr -> {
if (autoGenerateRecordKeys) {
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()));
@@ -116,16 +130,32 @@ public class HoodieStreamerUtils {
return new CloseableMappingIterator<>(ClosableIterator.wrap(itr),
rec -> {
InternalRow row = (InternalRow)
deserializer.deserialize(rec).get();
- String recordKey = builtinKeyGenerator.getRecordKey(row,
baseStructType).toString();
- String partitionPath = builtinKeyGenerator.getPartitionPath(row,
baseStructType).toString();
- return new HoodieSparkRecord(new HoodieKey(recordKey,
partitionPath),
-
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType,
targetStructType).apply(row), targetStructType, false);
+ try {
+ String recordKey = builtinKeyGenerator.getRecordKey(row,
baseStructType).toString();
+ String partitionPath = builtinKeyGenerator.getPartitionPath(row,
baseStructType).toString();
+ return Either.left(new HoodieSparkRecord(new
HoodieKey(recordKey, partitionPath),
+
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType,
targetStructType).apply(row), targetStructType, false));
+ } catch (Exception e) {
+ if (!shouldErrorTable) {
+ throw e;
+ }
+ try {
+ return Either.right(HoodieAvroUtils.avroToJsonString(rec,
false));
+ } catch (IOException ex) {
+ throw new HoodieIOException("Failed to convert illegal record
to json", ex);
+ }
+ }
});
+
});
} else {
throw new UnsupportedOperationException(recordType.name());
}
- return records;
+ if (shouldErrorTable) {
+
errorTableWriter.get().addErrorEvents(records.filter(Either::isRight).map(Either::asRight).map(evStr
-> new ErrorEvent<>(evStr,
+ ErrorEvent.ErrorReason.RECORD_CREATION)));
+ }
+ return records.filter(Either::isLeft).map(Either::asLeft);
});
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 8d629299684..659f6429f01 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -545,7 +545,7 @@ public class StreamSync implements Serializable, Closeable {
return inputBatch;
} else {
Option<JavaRDD<HoodieRecord>> recordsOpt =
HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(),
schemaProvider,
- recordType, autoGenerateRecordKeys, instantTime);
+ recordType, autoGenerateRecordKeys, instantTime, errorTableWriter);
return new InputBatch(recordsOpt, checkpointStr, schemaProvider);
}
}
@@ -633,8 +633,21 @@ public class StreamSync implements Serializable, Closeable
{
// Rewrite transformed records into the expected target schema
schemaProvider =
getDeducedSchemaProvider(dataAndCheckpoint.getSchemaProvider().getTargetSchema(),
dataAndCheckpoint.getSchemaProvider(), metaClient);
String serializedTargetSchema =
schemaProvider.getTargetSchema().toString();
- avroRDDOptional = dataAndCheckpoint.getBatch().map(t ->
t.mapPartitions(iterator ->
- new LazyCastingIterator(iterator, serializedTargetSchema)));
+ if (errorTableWriter.isPresent()
+ &&
props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
+
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue())) {
+ avroRDDOptional = dataAndCheckpoint.getBatch().map(
+ records -> {
+ Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs =
HoodieSparkUtils.safeRewriteRDD(records.rdd(), serializedTargetSchema);
+
errorTableWriter.get().addErrorEvents(safeCreateRDDs._2().toJavaRDD()
+ .map(evStr -> new ErrorEvent<>(evStr,
+ ErrorEvent.ErrorReason.INVALID_RECORD_SCHEMA)));
+ return safeCreateRDDs._1.toJavaRDD();
+ });
+ } else {
+ avroRDDOptional = dataAndCheckpoint.getBatch().map(t ->
t.mapPartitions(iterator ->
+ new LazyCastingIterator(iterator, serializedTargetSchema)));
+ }
}
}
if (useRowWriter) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 6b364667054..1b983b65e97 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -22,30 +22,38 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.TestHoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
@@ -80,6 +88,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
protected String tableType;
protected String tableBasePath;
+ protected String tableName;
protected Boolean shouldCluster;
protected Boolean shouldCompact;
protected Boolean rowWriterEnable;
@@ -90,6 +99,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
protected String sourceSchemaFile;
protected String targetSchemaFile;
protected boolean useKafkaSource;
+ protected boolean withErrorTable;
protected boolean useTransformer;
protected boolean userProvidedSchema;
@@ -107,8 +117,11 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
@BeforeEach
public void setupTest() {
super.setupTest();
+ TestErrorTable.commited = new HashMap<>();
+ TestErrorTable.errorEvents = new ArrayList<>();
useSchemaProvider = false;
hasTransformer = false;
+ withErrorTable = false;
sourceSchemaFile = "";
targetSchemaFile = "";
topicName = "topic" + testNum;
@@ -174,6 +187,16 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
extraProps.setProperty(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(),
"_row_key");
}
+ if (withErrorTable) {
+ extraProps.setProperty(HoodieErrorTableConfig.ERROR_TABLE_ENABLED.key(),
"true");
+
extraProps.setProperty(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
"true");
+
extraProps.setProperty(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(),
"true");
+ extraProps.setProperty(HoodieErrorTableConfig.ERROR_TARGET_TABLE.key(),
tableName + "ERROR");
+
extraProps.setProperty(HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH.key(),
basePath + tableName + "ERROR");
+
extraProps.setProperty(HoodieErrorTableConfig.ERROR_TABLE_WRITE_CLASS.key(),
TestErrorTable.class.getName());
+ extraProps.setProperty("hoodie.base.path", tableBasePath);
+ }
+
List<String> transformerClassNames = new ArrayList<>();
Collections.addAll(transformerClassNames, transformerClasses);
@@ -196,6 +219,9 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
protected void addData(Dataset<Row> df, Boolean isFirst) {
if (useSchemaProvider) {
TestSchemaProvider.sourceSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE);
+ if (withErrorTable && isFirst) {
+
TestSchemaProvider.setTargetSchema(AvroConversionUtils.convertStructTypeToAvroSchema(TestHoodieSparkUtils.getSchemaColumnNotNullable(df.schema(),
"_row_key"),"idk", "idk"));
+ }
}
if (useKafkaSource) {
addKafkaData(df, isFirst);
@@ -303,4 +329,41 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
TestSchemaProvider.targetSchema = null;
}
}
+
+ public static class TestErrorTable extends BaseErrorTableWriter {
+
+ public static List<JavaRDD> errorEvents = new ArrayList<>();
+ public static Map<String,Option<JavaRDD>> commited = new HashMap<>();
+ public TestErrorTable(HoodieStreamer.Config cfg, SparkSession
sparkSession, TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext,
+ FileSystem fs) {
+ super(cfg, sparkSession, props, hoodieSparkContext, fs);
+ }
+
+ @Override
+ public void addErrorEvents(JavaRDD errorEvent) {
+ errorEvents.add(errorEvent);
+ }
+
+ @Override
+ public boolean upsertAndCommit(String baseTableInstantTime, Option
commitedInstantTime) {
+ if (errorEvents.size() > 0) {
+ JavaRDD errorsCombined = errorEvents.get(0);
+ for (int i = 1; i < errorEvents.size(); i++) {
+ errorsCombined = errorsCombined.union(errorEvents.get(i));
+ }
+ commited.put(baseTableInstantTime, Option.of(errorsCombined));
+ errorEvents = new ArrayList<>();
+
+ } else {
+ commited.put(baseTableInstantTime, Option.empty());
+ }
+ return true;
+ }
+
+ @Override
+ public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String
baseTableInstantTime, Option commitedInstantTime) {
+ return Option.empty();
+ }
+ }
+
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
index f330bb62732..7adc3f66684 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
@@ -20,7 +20,10 @@
package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.TestHoodieSparkUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.streamer.ErrorEvent;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -31,7 +34,9 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@@ -45,16 +50,24 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestHoodieDeltaStreamerSchemaEvolutionExtensive extends
TestHoodieDeltaStreamerSchemaEvolutionBase {
protected void testBase(String updateFile, String updateColumn, String
condition, int count) throws Exception {
+ testBase(updateFile, updateColumn, condition, count, null);
+ }
+
+ protected void testBase(String updateFile, String updateColumn, String
condition, int count, ErrorEvent.ErrorReason reason) throws Exception {
Map<String,Integer> conditions = new HashMap<>();
conditions.put(condition, count);
- testBase(updateFile, updateColumn, conditions, true);
+ testBase(updateFile, updateColumn, conditions, true, reason);
//adding non-nullable cols should fail, but instead it is adding nullable
cols
//assertThrows(Exception.class, () -> testBase(tableType, shouldCluster,
shouldCompact, reconcileSchema, rowWriterEnable, updateFile, updateColumn,
condition, count, false));
}
protected void testBase(String updateFile, String updateColumn,
Map<String,Integer> conditions) throws Exception {
- testBase(updateFile, updateColumn, conditions, true);
+ testBase(updateFile, updateColumn, conditions, null);
+ }
+
+ protected void testBase(String updateFile, String updateColumn,
Map<String,Integer> conditions, ErrorEvent.ErrorReason reason) throws Exception
{
+ testBase(updateFile, updateColumn, conditions, true, reason);
}
protected void doFirstDeltaWrite() throws Exception {
@@ -100,10 +113,11 @@ public class
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
/**
* Main testing logic for non-type promotion tests
*/
- protected void testBase(String updateFile, String updateColumn,
Map<String,Integer> conditions, Boolean nullable) throws Exception {
+ protected void testBase(String updateFile, String updateColumn,
Map<String,Integer> conditions, Boolean nullable, ErrorEvent.ErrorReason
reason) throws Exception {
boolean isCow = tableType.equals("COPY_ON_WRITE");
PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum++;
- tableBasePath = basePath + "test_parquet_table" + testNum;
+ tableName = "test_parquet_table" + testNum;
+ tableBasePath = basePath + tableName;
this.deltaStreamer = new HoodieDeltaStreamer(getDeltaStreamerConfig(),
jsc);
//first write
@@ -149,6 +163,8 @@ public class
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
if (updateFile.equals("testAddColChangeOrderAllFiles.json")) {
//this test updates all 3 partitions instead of 2 like the rest of the
tests
numFiles++;
+ } else if (withErrorTable) {
+ numFiles--;
}
assertFileNumber(numFiles, false);
}
@@ -161,6 +177,19 @@ public class
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
assertEquals(conditions.get(condition).intValue(),
df.filter(condition).count());
}
+ if (withErrorTable) {
+ List<ErrorEvent> recs = new ArrayList<>();
+ for (String key : TestErrorTable.commited.keySet()) {
+ Option<JavaRDD> errors = TestErrorTable.commited.get(key);
+ if (errors.isPresent()) {
+ if (!errors.get().isEmpty()) {
+ recs.addAll(errors.get().collect());
+ }
+ }
+ }
+ assertEquals(1, recs.size());
+ assertEquals(recs.get(0).getReason(), reason);
+ }
}
protected static Stream<Arguments> testArgs() {
@@ -183,6 +212,66 @@ public class
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
return b.build();
}
+ @ParameterizedTest
+ @MethodSource("testArgs")
+ public void testErrorTable(String tableType,
+ Boolean shouldCluster,
+ Boolean shouldCompact,
+ Boolean rowWriterEnable,
+ Boolean addFilegroups,
+ Boolean multiLogFiles) throws Exception {
+ this.withErrorTable = true;
+ this.useSchemaProvider = false;
+ this.useTransformer = false;
+ this.tableType = tableType;
+ this.shouldCluster = shouldCluster;
+ this.shouldCompact = shouldCompact;
+ this.rowWriterEnable = rowWriterEnable;
+ this.addFilegroups = addFilegroups;
+ this.multiLogFiles = multiLogFiles;
+ testBase("testMissingRecordKey.json", "driver", "driver = 'driver-003'",
1, ErrorEvent.ErrorReason.RECORD_CREATION);
+ }
+
+ @ParameterizedTest
+ @MethodSource("testArgs")
+ public void testErrorTableWithSchemaProvider(String tableType,
+ Boolean shouldCluster,
+ Boolean shouldCompact,
+ Boolean rowWriterEnable,
+ Boolean addFilegroups,
+ Boolean multiLogFiles) throws
Exception {
+ this.withErrorTable = true;
+ this.useSchemaProvider = true;
+ this.useTransformer = false;
+ this.tableType = tableType;
+ this.shouldCluster = shouldCluster;
+ this.shouldCompact = shouldCompact;
+ this.rowWriterEnable = rowWriterEnable;
+ this.addFilegroups = addFilegroups;
+ this.multiLogFiles = multiLogFiles;
+ testBase("testMissingRecordKey.json", "driver", "driver = 'driver-003'",
1, ErrorEvent.ErrorReason.INVALID_RECORD_SCHEMA);
+ }
+
+ @ParameterizedTest
+ @MethodSource("testArgs")
+ public void testErrorTableWithTransformer(String tableType,
+ Boolean shouldCluster,
+ Boolean shouldCompact,
+ Boolean rowWriterEnable,
+ Boolean addFilegroups,
+ Boolean multiLogFiles) throws Exception {
+ this.withErrorTable = true;
+ this.useSchemaProvider = true;
+ this.useTransformer = true;
+ this.tableType = tableType;
+ this.shouldCluster = shouldCluster;
+ this.shouldCompact = shouldCompact;
+ this.rowWriterEnable = rowWriterEnable;
+ this.addFilegroups = addFilegroups;
+ this.multiLogFiles = multiLogFiles;
+ testBase("testMissingRecordKey.json", "driver", "driver = 'driver-003'",
1, ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE);
+ }
+
/**
* Add a new column at root level at the end
*/
@@ -367,7 +456,8 @@ public class
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
protected void testTypePromotionBase(String colName, DataType startType,
DataType updateType, DataType endType) throws Exception {
boolean isCow = tableType.equals("COPY_ON_WRITE");
PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum++;
- tableBasePath = basePath + "test_parquet_table" + testNum;
+ tableName = "test_parquet_table" + testNum;
+ tableBasePath = basePath + tableName;
this.deltaStreamer = new HoodieDeltaStreamer(getDeltaStreamerConfig(),
jsc);
//first write
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index 959446a63dc..096ddf14cc7 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -157,7 +157,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
this.useTransformer = true;
boolean isCow = tableType.equals("COPY_ON_WRITE");
PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
- tableBasePath = basePath + "test_parquet_table" + testNum;
+ tableName = "test_parquet_table" + testNum;
+ tableBasePath = basePath + tableName;
this.deltaStreamer = new
HoodieDeltaStreamer(getDeltaStreamerConfig(allowNullForDeletedCols), jsc);
//first write
@@ -283,7 +284,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
boolean isCow = tableType.equals("COPY_ON_WRITE");
PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
- tableBasePath = basePath + "test_parquet_table" + testNum;
+ tableName = "test_parquet_table" + testNum;
+ tableBasePath = basePath + tableName;
//first write
String datapath =
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
@@ -353,7 +355,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
boolean isCow = tableType.equals("COPY_ON_WRITE");
PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
- tableBasePath = basePath + "test_parquet_table" + testNum;
+ tableName = "test_parquet_table" + testNum;
+ tableBasePath = basePath + tableName;
//first write
String datapath =
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
@@ -431,7 +434,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
boolean isCow = tableType.equals("COPY_ON_WRITE");
PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
- tableBasePath = basePath + "test_parquet_table" + testNum;
+ tableName = "test_parquet_table" + testNum;
+ tableBasePath = basePath + tableName;
//first write
String datapath =
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
@@ -510,7 +514,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
boolean isCow = tableType.equals("COPY_ON_WRITE");
PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
- tableBasePath = basePath + "test_parquet_table" + testNum;
+ tableName = "test_parquet_table" + testNum;
+ tableBasePath = basePath + tableName;
//first write
String datapath =
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
@@ -597,7 +602,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
boolean isCow = tableType.equals("COPY_ON_WRITE");
PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
- tableBasePath = basePath + "test_parquet_table" + testNum;
+ tableName = "test_parquet_table" + testNum;
+ tableBasePath = basePath + tableName;
//first write
String datapath =
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
index 78bc21ecf92..8adfdb4dc37 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
@@ -20,11 +20,13 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.types.DataTypes;
@@ -33,8 +35,11 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
+import java.util.List;
+
import scala.Tuple2;
+import static org.apache.hudi.avro.HoodieAvroUtils.makeFieldNonNull;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.when;
@@ -54,4 +59,28 @@ public class TestGenericRddTransform extends
SparkClientFunctionalTestHarness {
assertEquals(5, failSafeRdds._1.count());
assertEquals(5, failSafeRdds._2.count());
}
+
+ @Test
+ public void testGenericRddConvert() {
+ String fieldToNull = "partition_path";
+ String schemaStr = makeFieldNonNull(HoodieTestDataGenerator.AVRO_SCHEMA,
fieldToNull, "").toString();
+ HoodieTestDataGenerator datagen = new HoodieTestDataGenerator();
+ List<GenericRecord> recs = datagen.generateGenericRecords(10);
+ for (int i = 0; i < recs.size(); i++) {
+ if (i % 2 == 0) {
+ recs.get(i).put(fieldToNull, null);
+ }
+ }
+ JavaSparkContext jsc = jsc();
+ RDD<GenericRecord> rdd = jsc.parallelize(recs).rdd();
+ Tuple2<RDD<GenericRecord>, RDD<String>> failSafeRdds =
HoodieSparkUtils.safeRewriteRDD(rdd, schemaStr);
+ assertEquals(5, failSafeRdds._1.count());
+ assertEquals(5, failSafeRdds._2.count());
+
+ //if field is nullable, no records should fail validation
+ failSafeRdds = HoodieSparkUtils.safeRewriteRDD(rdd,
HoodieTestDataGenerator.AVRO_SCHEMA.toString());
+ assertEquals(10, failSafeRdds._1.count());
+ assertEquals(0, failSafeRdds._2.count());
+ }
+
}
diff --git
a/hudi-utilities/src/test/resources/data/schema-evolution/testMissingRecordKey.json
b/hudi-utilities/src/test/resources/data/schema-evolution/testMissingRecordKey.json
new file mode 100644
index 00000000000..c3b65587e2d
--- /dev/null
+++
b/hudi-utilities/src/test/resources/data/schema-evolution/testMissingRecordKey.json
@@ -0,0 +1,2 @@
+{"timestamp":3,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-003","driver":"driver-003","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"three","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare
[...]
+{"timestamp":3,"_row_key":null,"partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-003","driver":"driver-003","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"distance_in_meters":1137109733,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"three","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"curr
[...]