This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b6490c160bf [HUDI-5813] Error table support for error events (#7982)
b6490c160bf is described below
commit b6490c160bf7644607ead00da60c4eda3ec07773
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Feb 28 18:58:28 2023 +0530
[HUDI-5813] Error table support for error events (#7982)
Add support for capturing source conversion error events to a configured
error table.
- Track error events in `SourceFormatAdapter`
- Track error events from `WriteStatus`
- Added new WriteClient for error table `BaseErrorTableWriter`
- If commit to error table fails then throw error so that delta sync fails
.
---
.../apache/hudi/config/HoodieErrorTableConfig.java | 100 +++++++++++++++++
.../org/apache/hudi/AvroConversionUtils.scala | 1 +
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 79 +++++++++++++-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 63 +++++++++--
.../deltastreamer/BaseErrorTableWriter.java | 70 ++++++++++++
.../hudi/utilities/deltastreamer/DeltaSync.java | 95 +++++++++++++---
.../hudi/utilities/deltastreamer/ErrorEvent.java | 58 ++++++++++
.../utilities/deltastreamer/ErrorTableUtils.java | 66 ++++++++++++
.../deltastreamer/SourceFormatAdapter.java | 111 +++++++++++++++----
.../utilities/sources/helpers/AvroConvertor.java | 14 +++
.../utilities/sources/TestGenericRddTransform.java | 57 ++++++++++
.../utilities/sources/TestJsonKafkaSource.java | 120 +++++++++++++++++++++
12 files changed, 791 insertions(+), 43 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
new file mode 100644
index 00000000000..68e2097c33b
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+ groupName = ConfigGroups.Names.WRITE_CLIENT,
+ description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+ public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED =
ConfigProperty
+ .key("hoodie.errortable.enable")
+ .defaultValue(false)
+ .withDocumentation("Config to enable error table. If the config is
enabled, "
+ + "all the records with processing error in DeltaStreamer are
transferred to error table.");
+
+ public static final ConfigProperty<String> ERROR_TABLE_BASE_PATH =
ConfigProperty
+ .key("hoodie.errortable.base.path")
+ .noDefaultValue()
+ .withDocumentation("Base path for error table under which all error
records "
+ + "would be stored.");
+
+ public static final ConfigProperty<String> ERROR_TARGET_TABLE =
ConfigProperty
+ .key("hoodie.errortable.target.table.name")
+ .noDefaultValue()
+ .withDocumentation("Table name to be used for the error table");
+
+ public static final ConfigProperty<Integer>
ERROR_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+ .key("hoodie.errortable.upsert.shuffle.parallelism")
+ .defaultValue(200)
+ .withDocumentation("Config to set upsert shuffle parallelism. The config
is similar to "
+ + "hoodie.upsert.shuffle.parallelism config but applies to the error
table.");
+
+ public static final ConfigProperty<Integer>
ERROR_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+ .key("hoodie.errortable.insert.shuffle.parallelism")
+ .defaultValue(200)
+ .withDocumentation("Config to set insert shuffle parallelism. The config
is similar to "
+ + "hoodie.insert.shuffle.parallelism config but applies to the error
table.");
+
+ public static final ConfigProperty<String> ERROR_TABLE_WRITE_CLASS =
ConfigProperty
+ .key("hoodie.errortable.write.class")
+ .noDefaultValue()
+ .withDocumentation("Class which handles the error table writes. This
config is used to configure "
+ + "a custom implementation for Error Table Writer. Specify the full
class name of the custom "
+ + "error table writer as a value for this config");
+
+ public static final ConfigProperty<Boolean>
ERROR_ENABLE_VALIDATE_TARGET_SCHEMA = ConfigProperty
+ .key("hoodie.errortable.validate.targetschema.enable")
+ .defaultValue(false)
+ .withDocumentation("Records with schema mismatch with Target Schema are
sent to Error Table.");
+
+ public static final ConfigProperty<String>
ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
+ .key("hoodie.errortable.write.failure.strategy")
+ .defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
+ .withDocumentation("The config specifies the failure strategy if error
table write fails. "
+ + "Use one of - " +
Arrays.toString(ErrorWriteFailureStrategy.values()));
+
+ public enum ErrorWriteFailureStrategy {
+ ROLLBACK_COMMIT("Rollback the corresponding base table write commit for
which the error events were triggered"),
+ LOG_ERROR("Error is logged but the base table write succeeds");
+
+ private final String description;
+
+ ErrorWriteFailureStrategy(String description) {
+ this.description = description;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + " (" + description + ")\n";
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 9f5d6fd7afe..8e44dc4b1fe 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index dec0eb5805c..ba9dcaccce3 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.SPARK_VERSION
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.SQLConfInjectingRDD
import org.apache.spark.sql.internal.SQLConf
@@ -116,10 +116,79 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport {
}, SQLConf.get)
}
- def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = {
- sparkAdapter.createSparkRowSerDe(structType)
- }
-
private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
new SQLConfInjectingRDD(rdd, conf)
+
+ def safeCreateRDD(df: DataFrame, structName: String, recordNamespace:
String, reconcileToLatestSchema: Boolean,
+ latestTableSchema:
org.apache.hudi.common.util.Option[Schema] =
org.apache.hudi.common.util.Option.empty()):
+ Tuple2[RDD[GenericRecord], RDD[String]] = {
+ var latestTableSchemaConverted: Option[Schema] = None
+
+ if (latestTableSchema.isPresent && reconcileToLatestSchema) {
+ latestTableSchemaConverted = Some(latestTableSchema.get())
+ } else {
+ // cases when users want to use latestTableSchema but have not turned on
reconcileToLatestSchema explicitly
+ // for example, when using a Transformer implementation to transform
source RDD to target RDD
+ latestTableSchemaConverted = if (latestTableSchema.isPresent)
Some(latestTableSchema.get()) else None
+ }
+ safeCreateRDD(df, structName, recordNamespace, latestTableSchemaConverted);
+ }
+
+ def safeCreateRDD(df: DataFrame, structName: String, recordNamespace:
String, readerAvroSchemaOpt: Option[Schema]):
+ Tuple2[RDD[GenericRecord], RDD[String]] = {
+ val writerSchema = df.schema
+ val writerAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName,
recordNamespace)
+ val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+ // We check whether passed in reader schema is identical to writer schema
to avoid costly serde loop of
+ // making Spark deserialize its internal representation [[InternalRow]]
into [[Row]] for subsequent conversion
+ // (and back)
+ val sameSchema = writerAvroSchema.equals(readerAvroSchema)
+ val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) !=
writerAvroSchema
+
+ // NOTE: We have to serialize Avro schema, and then subsequently parse it
on the executor node, since Spark
+ // serializer is not able to digest it
+ val writerAvroSchemaStr = writerAvroSchema.toString
+ val readerAvroSchemaStr = readerAvroSchema.toString
+ // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to
[[Row]] conversion
+
+ if (!sameSchema) {
+ val rdds: RDD[Either[GenericRecord, InternalRow]] =
df.queryExecution.toRdd.mapPartitions { rows =>
+ if (rows.isEmpty) {
+ Iterator.empty
+ } else {
+ val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
+ val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr)
+ val convert =
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema,
writerAvroSchema, nullable = nullable)
+ val transform: InternalRow => Either[GenericRecord, InternalRow] =
internalRow => try {
+ Left(HoodieAvroUtils.rewriteRecordDeep(convert(internalRow),
readerAvroSchema, true))
+ } catch {
+ case _: Throwable =>
+ Right(internalRow)
+ }
+ rows.map(transform)
+ }
+ }
+
+ val rowDeserializer = getCatalystRowSerDe(writerSchema)
+ val errorRDD = df.sparkSession.createDataFrame(
+ rdds.filter(_.isRight).map(_.right.get).map(ir =>
rowDeserializer.deserializeRow(ir)), writerSchema)
+
+ // going to follow up on improving performance of separating out events
+ (rdds.filter(_.isLeft).map(_.left.get), errorRDD.toJSON.rdd)
+ } else {
+ val rdd = df.queryExecution.toRdd.mapPartitions { rows =>
+ if (rows.isEmpty) {
+ Iterator.empty
+ } else {
+ val convert =
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema,
writerAvroSchema, nullable = nullable)
+ rows.map(convert)
+ }
+ }
+ (rdd, df.sparkSession.sparkContext.emptyRDD[String])
+ }
+ }
+
+ def getCatalystRowSerDe(structType: StructType): SparkRowSerDe = {
+ sparkAdapter.createSparkRowSerDe(structType)
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index a4157debdbf..aa19fac10b2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -74,6 +74,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -787,7 +788,12 @@ public class HoodieAvroUtils {
* @return newRecord for new Schema
*/
public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord
oldRecord, Schema newSchema, Map<String, String> renameCols) {
- Object newRecord = rewriteRecordWithNewSchema(oldRecord,
oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());
+ Object newRecord = rewriteRecordWithNewSchema(oldRecord,
oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>(),false);
+ return (GenericData.Record) newRecord;
+ }
+
+ public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord
oldRecord, Schema newSchema, Map<String, String> renameCols, boolean validate) {
+ Object newRecord = rewriteRecordWithNewSchema(oldRecord,
oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>(), validate);
return (GenericData.Record) newRecord;
}
@@ -805,12 +811,22 @@ public class HoodieAvroUtils {
* @param fieldNames track the full name of visited field when we travel new
schema.
* @return newRecord for new Schema
*/
- private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema
oldAvroSchema, Schema newSchema, Map<String, String> renameCols, Deque<String>
fieldNames) {
+
+ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema
oldAvroSchema, Schema newSchema, Map<String, String> renameCols, Deque<String>
fieldNames, boolean validate) {
if (oldRecord == null) {
return null;
}
// try to get real schema for union type
Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
+ Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord,
oldSchema, newSchema, renameCols, fieldNames, validate);
+ if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema,
newRecord)) {
+ throw new SchemaCompatibilityException(
+ "Unable to validate the rewritten record " + oldRecord + " against
schema " + newSchema);
+ }
+ return newRecord;
+ }
+
+ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord,
Schema oldSchema, Schema newSchema, Map<String, String> renameCols,
Deque<String> fieldNames, boolean validate) {
switch (newSchema.getType()) {
case RECORD:
ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord,
"cannot rewrite record with different type");
@@ -823,7 +839,7 @@ public class HoodieAvroUtils {
fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null &&
!renameCols.containsKey(field.name())) {
Schema.Field oldField = oldSchema.getField(field.name());
- newRecord.put(i,
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()),
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+ newRecord.put(i,
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()),
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
} else {
String fieldFullName = createFullName(fieldNames);
String fieldNameFromOldSchema =
renameCols.getOrDefault(fieldFullName, "");
@@ -831,7 +847,7 @@ public class HoodieAvroUtils {
if (oldSchema.getField(fieldNameFromOldSchema) != null) {
// find rename
Schema.Field oldField =
oldSchema.getField(fieldNameFromOldSchema);
- newRecord.put(i,
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()),
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+ newRecord.put(i,
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()),
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
} else {
// deal with default value
if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
@@ -850,7 +866,7 @@ public class HoodieAvroUtils {
List<Object> newArray = new ArrayList();
fieldNames.push("element");
for (Object element : array) {
- newArray.add(rewriteRecordWithNewSchema(element,
oldSchema.getElementType(), newSchema.getElementType(), renameCols,
fieldNames));
+ newArray.add(rewriteRecordWithNewSchema(element,
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames,
validate));
}
fieldNames.pop();
return newArray;
@@ -860,12 +876,12 @@ public class HoodieAvroUtils {
Map<Object, Object> newMap = new HashMap<>();
fieldNames.push("value");
for (Map.Entry<Object, Object> entry : map.entrySet()) {
- newMap.put(entry.getKey(),
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(),
newSchema.getValueType(), renameCols, fieldNames));
+ newMap.put(entry.getKey(),
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(),
newSchema.getValueType(), renameCols, fieldNames, validate));
}
fieldNames.pop();
return newMap;
case UNION:
- return rewriteRecordWithNewSchema(oldRecord,
getActualSchemaFromUnion(oldSchema, oldRecord),
getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames);
+ return rewriteRecordWithNewSchema(oldRecord,
getActualSchemaFromUnion(oldSchema, oldRecord),
getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames,
validate);
default:
return rewritePrimaryType(oldRecord, oldSchema, newSchema);
}
@@ -1083,10 +1099,43 @@ public class HoodieAvroUtils {
}
}
+ /**
+ * Given avro records, rewrites them with new schema.
+ *
+ * @param oldRecords oldRecords to be rewrite
+ * @param newSchema newSchema used to rewrite oldRecord
+ * @param renameCols a map store all rename cols, (k, v)->
(colNameFromNewSchema, colNameFromOldSchema)
+ * @return a iterator of rewrote GeneriRcords
+ */
+ public static Iterator<GenericRecord>
rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema
newSchema, Map<String, String> renameCols, boolean validate) {
+ if (oldRecords == null || newSchema == null) {
+ return Collections.emptyIterator();
+ }
+ return new Iterator<GenericRecord>() {
+ @Override
+ public boolean hasNext() {
+ return oldRecords.hasNext();
+ }
+
+ @Override
+ public GenericRecord next() {
+ return rewriteRecordWithNewSchema(oldRecords.next(), newSchema,
renameCols, validate);
+ }
+ };
+ }
+
+ public static Iterator<GenericRecord>
rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema
newSchema, Map<String, String> renameCols) {
+ return rewriteRecordWithNewSchema(oldRecords, newSchema,
Collections.EMPTY_MAP, false);
+ }
+
public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord,
Schema newSchema) {
return rewriteRecordWithNewSchema(oldRecord, newSchema,
Collections.EMPTY_MAP);
}
+ public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord,
Schema newSchema, boolean validate) {
+ return rewriteRecordWithNewSchema(oldRecord, newSchema,
Collections.EMPTY_MAP, validate);
+ }
+
public static boolean gteqAvro1_9() {
return VersionUtil.compareVersions(AVRO_VERSION, "1.9") >= 0;
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
new file mode 100644
index 00000000000..fea6bdb3cf3
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events while processing write records. All the
+ * records which have a processing/write failure are triggered as error events
to
+ * BaseErrorTableWriter. The implementation of BaseErrorTableWriter processes
+ * these error events through addErrorEvents API and commits them to the error
table when
+ * upsertAndCommit API is called.
+ *
+ * The writer can use the configs defined in HoodieErrorTableConfig to manage
the error table.
+ */
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+
+ // The column name passed to Spark for option `columnNameOfCorruptRecord`.
The record
+ // is set to this column in case of an error
+ public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+ public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession
sparkSession,
+ TypedProperties props, JavaSparkContext
jssc, FileSystem fs) {
+ }
+
+ /**
+ * Processes input error events. These error events would be committed later
through upsertAndCommit
+ * API call.
+ *
+ * @param errorEvent Input error event RDD
+ */
+ public abstract void addErrorEvents(JavaRDD<T> errorEvent);
+
+ /**
+ * Fetches the error events RDD processed by the writer so far. This is a
test API.
+ */
+ @VisibleForTesting
+ public abstract Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String
baseTableInstantTime, Option<String> commitedInstantTime);
+
+ /**
+ * This API is called to commit the error events (failed Hoodie Records)
processed by the writer so far.
+ * These records are committed to a error table.
+ */
+ public abstract boolean upsertAndCommit(String baseTableInstantTime,
Option<String> commitedInstantTime);
+
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index b25631bb179..07ba73b528a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -70,6 +70,7 @@ import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
+import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -103,6 +104,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.Row;
@@ -129,12 +131,14 @@ import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import scala.Tuple2;
import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static
org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
import static
org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
+import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_ENABLED;
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
@@ -242,6 +246,9 @@ public class DeltaSync implements Serializable, Closeable {
*/
private transient SparkRDDWriteClient writeClient;
+ private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
+ private HoodieErrorTableConfig.ErrorWriteFailureStrategy
errorWriteFailureStrategy;
+
private transient HoodieDeltaStreamerMetrics metrics;
private transient HoodieMetrics hoodieMetrics;
@@ -279,15 +286,19 @@ public class DeltaSync implements Serializable, Closeable
{
this.metrics = new
HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
this.hoodieMetrics = new
HoodieMetrics(getHoodieClientConfig(this.schemaProvider));
-
- this.formatAdapter = new SourceFormatAdapter(
- UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider, metrics));
this.conf = conf;
String id = conf.get(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key());
if (StringUtils.isNullOrEmpty(id)) {
id = props.getProperty(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key());
}
this.multiwriterIdentifier = StringUtils.isNullOrEmpty(id) ?
Option.empty() : Option.of(id);
+ if
(props.getBoolean(ERROR_TABLE_ENABLED.key(),ERROR_TABLE_ENABLED.defaultValue()))
{
+ this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg,
sparkSession, props, jssc, fs);
+ this.errorWriteFailureStrategy =
ErrorTableUtils.getErrorWriteFailureStrategy(props);
+ }
+ this.formatAdapter = new SourceFormatAdapter(
+ UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider, metrics),
+ this.errorTableWriter);
}
/**
@@ -513,9 +524,34 @@ public class DeltaSync implements Serializable, Closeable {
Option<Dataset<Row>> transformed =
dataAndCheckpoint.getBatch().map(data ->
transformer.get().apply(jssc, sparkSession, data, props));
+ transformed = formatAdapter.processErrorEvents(transformed,
+ ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
+
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
boolean reconcileSchema =
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
if (this.userProvidedSchemaProvider != null &&
this.userProvidedSchemaProvider.getTargetSchema() != null) {
+ // If the target schema is specified through Avro schema,
+ // pass in the schema for the Row-to-Avro conversion
+ // to avoid nullability mismatch between Avro schema and Row schema
+ if (errorTableWriter.isPresent()
+ &&
props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
+
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue())) {
+ // If the above conditions are met, trigger error events for the
rows whose conversion to
+ // avro records fails.
+ avroRDDOptional = transformed.map(
+ rowDataset -> {
+ Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs =
HoodieSparkUtils.safeCreateRDD(rowDataset,
+ HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
reconcileSchema,
+
Option.of(this.userProvidedSchemaProvider.getTargetSchema()));
+
errorTableWriter.get().addErrorEvents(safeCreateRDDs._2().toJavaRDD()
+ .map(evStr -> new ErrorEvent<>(evStr,
+ ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
+ return safeCreateRDDs._1.toJavaRDD();
+ });
+ } else {
+ avroRDDOptional = transformed.map(
+ rowDataset -> getTransformedRDD(rowDataset, reconcileSchema,
this.userProvidedSchemaProvider.getTargetSchema()));
+ }
schemaProvider = this.userProvidedSchemaProvider;
} else {
Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
@@ -537,12 +573,9 @@ public class DeltaSync implements Serializable, Closeable {
(SchemaProvider) new DelegatingSchemaProvider(props, jssc,
dataAndCheckpoint.getSchemaProvider(),
new SimpleSchemaProvider(jssc, targetSchema, props)))
.orElse(dataAndCheckpoint.getSchemaProvider());
+ // Rewrite transformed records into the expected target schema
+ avroRDDOptional = transformed.map(t -> getTransformedRDD(t,
reconcileSchema, schemaProvider.getTargetSchema()));
}
-
- // Rewrite transformed records into the expected target schema
- avroRDDOptional =
- transformed.map(t -> HoodieSparkUtils.createRdd(t,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
- reconcileSchema,
Option.of(schemaProvider.getTargetSchema())).toJavaRDD());
} else {
// Pull the data from the source & prepare the write
InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
@@ -607,6 +640,11 @@ public class DeltaSync implements Serializable, Closeable {
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
}
+ private JavaRDD<GenericRecord> getTransformedRDD(Dataset<Row> rowDataset,
boolean reconcileSchema, Schema readerSchema) {
+ return HoodieSparkUtils.createRdd(rowDataset, HOODIE_RECORD_STRUCT_NAME,
HOODIE_RECORD_NAMESPACE, reconcileSchema,
+ Option.ofNullable(readerSchema)).toJavaRDD();
+ }
+
/**
* Process previous commit metadata and checkpoint configs set by user to
determine the checkpoint to resume from.
* @param commitTimelineOpt commit timeline of interest.
@@ -661,13 +699,13 @@ public class DeltaSync implements Serializable, Closeable
{
}
}
- public Option<HoodieCommitMetadata>
getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws
IOException {
- return (Option<HoodieCommitMetadata>)
timeline.getReverseOrderedInstants().map(instant -> {
+ protected Option<Pair<String, HoodieCommitMetadata>>
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline
timeline) throws IOException {
+ return (Option<Pair<String, HoodieCommitMetadata>>)
timeline.getReverseOrderedInstants().map(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY)) ||
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
- return Option.of(commitMetadata);
+ return Option.of(Pair.of(instant.toString(), commitMetadata));
} else {
return Option.empty();
}
@@ -677,6 +715,20 @@ public class DeltaSync implements Serializable, Closeable {
}).filter(Option::isPresent).findFirst().orElse(Option.empty());
}
+ protected Option<HoodieCommitMetadata>
getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws
IOException {
+ return
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(timeline).map(pair ->
pair.getRight());
+ }
+
+ protected Option<String>
getLatestInstantWithValidCheckpointInfo(Option<HoodieTimeline> timelineOpt) {
+ return timelineOpt.map(timeline -> {
+ try {
+ return
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(timeline).map(pair ->
pair.getLeft());
+ } catch (IOException e) {
+ throw new HoodieIOException("failed to get latest instant with
ValidCheckpointInfo", e);
+ }
+ }).orElse(Option.empty());
+ }
+
/**
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive
if needed.
*
@@ -748,8 +800,25 @@ public class DeltaSync implements Serializable, Closeable {
+ totalErrorRecords + "/" + totalRecords);
}
String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType,
- Collections.emptyMap(), extraPreCommitFunc);
+ if (errorTableWriter.isPresent()) {
+ // Commit the error events triggered so far to the error table
+ Option<String> commitedInstantTime =
getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+ boolean errorTableSuccess =
errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
+ if (!errorTableSuccess) {
+ switch (errorWriteFailureStrategy) {
+ case ROLLBACK_COMMIT:
+ LOG.info("Commit " + instantTime + " failed!");
+ writeClient.rollback(instantTime);
+ throw new HoodieException("Error Table Commit failed!");
+ case LOG_ERROR:
+ LOG.error("Error Table write failed for instant " + instantTime);
+ break;
+ default:
+ throw new HoodieException("Write failure strategy not
implemented for " + errorWriteFailureStrategy);
+ }
+ }
+ }
+ boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType, Collections.emptyMap(),
extraPreCommitFunc);
if (success) {
LOG.info("Commit " + instantTime + " successful!");
latestCheckpointWritten = checkpointStr;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java
new file mode 100644
index 00000000000..b300d40d397
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+/**
+ * Error event is an event triggered during write or processing failure of a
record.
+ */
+public class ErrorEvent<T> {
+
+ private final ErrorReason reason;
+ private final T payload;
+
+ public ErrorEvent(T payload, ErrorReason reason) {
+ this.payload = payload;
+ this.reason = reason;
+ }
+
+ public T getPayload() {
+ return payload;
+ }
+
+ public ErrorReason getReason() {
+ return reason;
+ }
+
+ /**
+ * The reason behind write or processing failure of a record
+ */
+ public enum ErrorReason {
+ // Failure during json to avro record conversion
+ JSON_AVRO_DESERIALIZATION_FAILURE,
+ // Failure during json to row conversion
+ JSON_ROW_DESERIALIZATION_FAILURE,
+ // Failure during row to avro record conversion
+ AVRO_DESERIALIZATION_FAILURE,
+ // Failure during hudi writes
+ HUDI_WRITE_FAILURES,
+ // Failure during transformation of source to target RDD
+ CUSTOM_TRANSFORMER_FAILURE
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
new file mode 100644
index 00000000000..a5533449a6a
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieErrorTableConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+
+import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_CLASS;
+import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_FAILURE_STRATEGY;
+
+public final class ErrorTableUtils {
+
+ public static Option<BaseErrorTableWriter>
getErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
+ TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+ String errorTableWriterClass =
props.getString(ERROR_TABLE_WRITE_CLASS.key());
+
ValidationUtils.checkState(!StringUtils.isNullOrEmpty(errorTableWriterClass),
+ "Missing error table config " + ERROR_TABLE_WRITE_CLASS);
+
+ Class<?>[] argClassArr = new Class[] {HoodieDeltaStreamer.Config.class,
+ SparkSession.class, TypedProperties.class, JavaSparkContext.class,
FileSystem.class};
+ String errMsg = "Unable to instantiate ErrorTableWriter with arguments
type " + Arrays.toString(argClassArr);
+
ValidationUtils.checkArgument(ReflectionUtils.hasConstructor(BaseErrorTableWriter.class.getName(),
argClassArr), errMsg);
+
+ try {
+ return Option.of((BaseErrorTableWriter)
ReflectionUtils.getClass(errorTableWriterClass).getConstructor(argClassArr)
+ .newInstance(cfg, sparkSession, props, jssc, fs));
+ } catch (NoSuchMethodException | InvocationTargetException |
InstantiationException | IllegalAccessException e) {
+ throw new HoodieException(errMsg, e);
+ }
+ }
+
+ public static HoodieErrorTableConfig.ErrorWriteFailureStrategy
getErrorWriteFailureStrategy(
+ TypedProperties props) {
+ String writeFailureStrategy =
props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key());
+ return
HoodieErrorTableConfig.ErrorWriteFailureStrategy.valueOf(writeFailureStrategy);
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index c8aa2f951c9..fdc2af66d9f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -34,13 +34,24 @@ import com.google.protobuf.Message;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import scala.util.Either;
+
+
+import static
org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME;
import static
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
@@ -51,10 +62,56 @@ public final class SourceFormatAdapter implements Closeable
{
private final Source source;
+ private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
+
public SourceFormatAdapter(Source source) {
+ this(source, Option.empty());
+ }
+
+ public SourceFormatAdapter(Source source, Option<BaseErrorTableWriter>
errorTableWriter) {
+ this.errorTableWriter = errorTableWriter;
this.source = source;
}
+ /**
+ * transform input rdd of json string to generic records with support for
adding error events to error table
+ * @param inputBatch
+ * @return
+ */
+ private JavaRDD<GenericRecord>
transformJsonToGenericRdd(InputBatch<JavaRDD<String>> inputBatch) {
+ AvroConvertor convertor = new
AvroConvertor(inputBatch.getSchemaProvider().getSourceSchema());
+ return inputBatch.getBatch().map(rdd -> {
+ if (errorTableWriter.isPresent()) {
+ JavaRDD<Either<GenericRecord,String>> javaRDD =
rdd.map(convertor::fromJsonWithError);
+ errorTableWriter.get().addErrorEvents(javaRDD.filter(x ->
x.isRight()).map(x ->
+ new ErrorEvent<>(x.right().get(),
ErrorEvent.ErrorReason.JSON_AVRO_DESERIALIZATION_FAILURE)));
+ return javaRDD.filter(x -> x.isLeft()).map(x -> x.left().get());
+ } else {
+ return rdd.map(convertor::fromJson);
+ }
+ }).orElse(null);
+ }
+
+ /**
+ * transform datasets with error events when error table is enabled
+ * @param eventsRow
+ * @return
+ */
+ public Option<Dataset<Row>> processErrorEvents(Option<Dataset<Row>>
eventsRow,
+ ErrorEvent.ErrorReason
errorReason) {
+ return eventsRow.map(dataset -> {
+ if (errorTableWriter.isPresent() &&
Arrays.stream(dataset.columns()).collect(Collectors.toList())
+ .contains(ERROR_TABLE_CURRUPT_RECORD_COL_NAME)) {
+ errorTableWriter.get().addErrorEvents(dataset.filter(new
Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME).isNotNull())
+ .select(new
Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME)).toJavaRDD().map(ev ->
+ new ErrorEvent<>(ev.getString(0), errorReason)));
+ return dataset.filter(new
Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME).isNull()).drop(ERROR_TABLE_CURRUPT_RECORD_COL_NAME);
+ }
+ return dataset;
+ }
+ );
+ }
+
/**
* Fetch new data in avro format. If the source provides data in different
format, they are translated to Avro format
*/
@@ -64,23 +121,22 @@ public final class SourceFormatAdapter implements
Closeable {
return ((Source<JavaRDD<GenericRecord>>)
source).fetchNext(lastCkptStr, sourceLimit);
case JSON: {
InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>)
source).fetchNext(lastCkptStr, sourceLimit);
- AvroConvertor convertor = new
AvroConvertor(r.getSchemaProvider().getSourceSchema());
- return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd ->
rdd.map(convertor::fromJson)).orElse(null)),
- r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ JavaRDD<GenericRecord> eventsRdd = transformJsonToGenericRdd(r);
+ return new
InputBatch<>(Option.ofNullable(eventsRdd),r.getCheckpointForNextBatch(),
r.getSchemaProvider());
}
case ROW: {
InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>)
source).fetchNext(lastCkptStr, sourceLimit);
return new InputBatch<>(Option.ofNullable(r.getBatch().map(
rdd -> {
- SchemaProvider originalProvider =
UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
- return ((originalProvider instanceof FilebasedSchemaProvider) ||
(originalProvider instanceof SchemaRegistryProvider))
- // If the source schema is specified through Avro schema,
- // pass in the schema for the Row-to-Avro conversion
- // to avoid nullability mismatch between Avro schema and Row
schema
- ? HoodieSparkUtils.createRdd(rdd, HOODIE_RECORD_STRUCT_NAME,
HOODIE_RECORD_NAMESPACE, true,
-
org.apache.hudi.common.util.Option.ofNullable(r.getSchemaProvider().getSourceSchema())
- ).toJavaRDD() : HoodieSparkUtils.createRdd(rdd,
- HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, false,
Option.empty()).toJavaRDD();
+ SchemaProvider originalProvider =
UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
+ return (originalProvider instanceof FilebasedSchemaProvider ||
(originalProvider instanceof SchemaRegistryProvider))
+ // If the source schema is specified through Avro schema,
+ // pass in the schema for the Row-to-Avro conversion
+ // to avoid nullability mismatch between Avro schema and
Row schema
+ ? HoodieSparkUtils.createRdd(rdd,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, true,
+
org.apache.hudi.common.util.Option.ofNullable(r.getSchemaProvider().getSourceSchema())
+ ).toJavaRDD() : HoodieSparkUtils.createRdd(rdd,
+ HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, false,
Option.empty()).toJavaRDD();
})
.orElse(null)), r.getCheckpointForNextBatch(),
r.getSchemaProvider());
}
@@ -101,7 +157,10 @@ public final class SourceFormatAdapter implements
Closeable {
public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String>
lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case ROW:
- return ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr,
sourceLimit);
+ InputBatch<Dataset<Row>> datasetInputBatch = ((Source<Dataset<Row>>)
source).fetchNext(lastCkptStr, sourceLimit);
+ return new
InputBatch<>(processErrorEvents(datasetInputBatch.getBatch(),
+ ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE),
+ datasetInputBatch.getCheckpointForNextBatch(),
datasetInputBatch.getSchemaProvider());
case AVRO: {
InputBatch<JavaRDD<GenericRecord>> r =
((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
@@ -118,11 +177,27 @@ public final class SourceFormatAdapter implements
Closeable {
case JSON: {
InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>)
source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
- StructType dataType =
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
- return new InputBatch<>(
- Option.ofNullable(
- r.getBatch().map(rdd ->
source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
- r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ if (errorTableWriter.isPresent()) {
+ // if error table writer is enabled, during spark read
`columnNameOfCorruptRecord` option is configured.
+ // Any records which spark is unable to read successfully are
transferred to the column
+ // configured via this option. The column is then used to trigger
error events.
+ StructType dataType =
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)
+ .add(new StructField(ERROR_TABLE_CURRUPT_RECORD_COL_NAME,
DataTypes.StringType, true, Metadata.empty()));
+ Option<Dataset<Row>> dataset = r.getBatch().map(rdd ->
source.getSparkSession().read()
+ .option("columnNameOfCorruptRecord",
ERROR_TABLE_CURRUPT_RECORD_COL_NAME).schema(dataType.asNullable())
+ .json(rdd));
+ Option<Dataset<Row>> eventsDataset = processErrorEvents(dataset,
+ ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE);
+ return new InputBatch<>(
+ eventsDataset,
+ r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ } else {
+ StructType dataType =
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
+ return new InputBatch<>(
+ Option.ofNullable(
+ r.getBatch().map(rdd ->
source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
+ r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ }
}
case PROTO: {
InputBatch<JavaRDD<Message>> r = ((Source<JavaRDD<Message>>)
source).fetchNext(lastCkptStr, sourceLimit);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index 860c67f4e2a..ee160e19d52 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -28,6 +28,10 @@ import org.apache.avro.generic.GenericRecord;
import java.io.Serializable;
+import scala.util.Either;
+import scala.util.Left;
+import scala.util.Right;
+
/**
* Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy
fields to circumvent issues around
* serializing these objects from driver to executors
@@ -87,6 +91,16 @@ public class AvroConvertor implements Serializable {
return jsonConverter.convert(json, schema);
}
+ public Either<GenericRecord,String> fromJsonWithError(String json) {
+ GenericRecord genericRecord;
+ try {
+ genericRecord = fromJson(json);
+ } catch (Exception e) {
+ return new Right(json);
+ }
+ return new Left(genericRecord);
+ }
+
public Schema getSchema() {
return new Schema.Parser().parse(schemaStr);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
new file mode 100644
index 00000000000..78bc21ecf92
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.Test;
+
+import scala.Tuple2;
+
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.lit;
+import static org.apache.spark.sql.functions.when;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestGenericRddTransform extends SparkClientFunctionalTestHarness {
+ @Test
+ public void testGenericRddTransform() {
+ Dataset ds = spark().range(10).withColumn("null_check_col", when(expr("id
% 2 == 0"),
+ lit("test")).otherwise(lit(null)));
+ StructType structType = new StructType(new StructField[] {
+ new StructField("id", DataTypes.StringType, false, Metadata.empty()),
+ new StructField("null_check_col", DataTypes.StringType, false,
Metadata.empty())});
+ Schema nonNullSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(structType,"record","record");
+ Tuple2<RDD<GenericRecord>, RDD<String>> failSafeRdds =
HoodieSparkUtils.safeCreateRDD(ds, "record",
+ "record",false, Option.of(nonNullSchema));
+ assertEquals(5, failSafeRdds._1.count());
+ assertEquals(5, failSafeRdds._2.count());
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index d77789140f3..9e6e5aad661 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -19,24 +19,42 @@
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter;
+import org.apache.hudi.utilities.deltastreamer.ErrorEvent;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import scala.Tuple2;
+
+import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH;
+import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TARGET_TABLE;
+import static
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -44,6 +62,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
* Tests against {@link JsonKafkaSource}.
*/
public class TestJsonKafkaSource extends BaseTestKafkaSource {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final HoodieTestDataGenerator DATA_GENERATOR = new
HoodieTestDataGenerator(1L);
static final URL SCHEMA_FILE_URL =
TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source.avsc");
@BeforeEach
@@ -175,4 +195,104 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.sendMessages(topic,
jsonifyRecords(dataGenerator.generateInserts("000", count)));
}
+
+ void sendJsonSafeMessagesToKafka(String topic, int count, int numPartitions)
{
+ try {
+ Tuple2<String, String>[] keyValues = new Tuple2[count];
+ String[] records = jsonifyRecords(DATA_GENERATOR.generateInserts("000",
count));
+ for (int i = 0; i < count; i++) {
+ // Drop fields that don't translate to json properly
+ Map node = OBJECT_MAPPER.readValue(records[i], Map.class);
+ node.remove("height");
+ node.remove("current_date");
+ node.remove("nation");
+ keyValues[i] = new Tuple2<>(Integer.toString(i % numPartitions),
OBJECT_MAPPER.writeValueAsString(node));
+ }
+ testUtils.sendMessages(topic, keyValues);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Test
+ public void testErrorEventsForDataInRowForamt() throws IOException {
+ // topic setup.
+ final String topic = TEST_TOPIC_PREFIX +
"testErrorEventsForDataInRowForamt";
+
+ testUtils.createTopic(topic, 2);
+ List<TopicPartition> topicPartitions = new ArrayList<>();
+ TopicPartition topicPartition0 = new TopicPartition(topic, 0);
+ topicPartitions.add(topicPartition0);
+ TopicPartition topicPartition1 = new TopicPartition(topic, 1);
+ topicPartitions.add(topicPartition1);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ sendJsonSafeMessagesToKafka(topic, 1000, 2);
+ testUtils.sendMessages(topic, new String[]{"error_event1",
"error_event2"});
+
+ TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+ props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
+
props.put(ERROR_TABLE_BASE_PATH.key(),"/tmp/qurantine_table_test/json_kafka_row_events");
+ props.put(ERROR_TARGET_TABLE.key(),"json_kafka_row_events");
+ props.put("hoodie.errortable.validate.targetschema.enable", "true");
+ props.put("hoodie.base.path","/tmp/json_kafka_row_events");
+ Source jsonSource = new JsonKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
+ Option<BaseErrorTableWriter> errorTableWriter =
Option.of(getAnonymousErrorTableWriter(props));
+ SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource,
errorTableWriter);
+ assertEquals(1000,
kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE).getBatch().get().count());
+ assertEquals(2,((JavaRDD)errorTableWriter.get().getErrorEvents(
+ HoodieActiveTimeline.createNewInstantTime(),
Option.empty()).get()).count());
+ }
+
+ @Test
+ public void testErrorEventsForDataInAvroFormat() throws IOException {
+
+ // topic setup.
+ final String topic = TEST_TOPIC_PREFIX +
"testErrorEventsForDataInAvroFormat";
+
+ testUtils.createTopic(topic, 2);
+ List<TopicPartition> topicPartitions = new ArrayList<>();
+ TopicPartition topicPartition0 = new TopicPartition(topic, 0);
+ topicPartitions.add(topicPartition0);
+ TopicPartition topicPartition1 = new TopicPartition(topic, 1);
+ topicPartitions.add(topicPartition1);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ testUtils.sendMessages(topic,
jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.sendMessages(topic, new String[]{"error_event1",
"error_event2"});
+
+ TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+ props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
+
props.put(ERROR_TABLE_BASE_PATH.key(),"/tmp/qurantine_table_test/json_kafka_events");
+ props.put(ERROR_TARGET_TABLE.key(),"json_kafka_events");
+ props.put("hoodie.base.path","/tmp/json_kafka_events");
+
+ Source jsonSource = new JsonKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
+ Option<BaseErrorTableWriter> errorTableWriter =
Option.of(getAnonymousErrorTableWriter(props));
+ SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource,
errorTableWriter);
+ InputBatch<JavaRDD<GenericRecord>> fetch1 =
kafkaSource.fetchNewDataInAvroFormat(Option.empty(),Long.MAX_VALUE);
+ assertEquals(1000,fetch1.getBatch().get().count());
+ assertEquals(2, ((JavaRDD)errorTableWriter.get().getErrorEvents(
+ HoodieActiveTimeline.createNewInstantTime(),
Option.empty()).get()).count());
+ }
+
+ private BaseErrorTableWriter getAnonymousErrorTableWriter(TypedProperties
props) {
+ return new BaseErrorTableWriter<ErrorEvent<String>>(new
HoodieDeltaStreamer.Config(),
+ spark(), props, jsc(), fs()) {
+ List<JavaRDD<HoodieAvroRecord>> errorEvents = new LinkedList();
+
+ @Override
+ public void addErrorEvents(JavaRDD errorEvent) {
+ errorEvents.add(errorEvent.map(r -> new HoodieAvroRecord<>(new
HoodieKey(), null)));
+ }
+
+ @Override
+ public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String
baseTableInstantTime, Option commitedInstantTime) {
+ return Option.of(errorEvents.stream().reduce((rdd1, rdd2) ->
rdd1.union(rdd2)).get());
+ }
+
+ @Override
+ public boolean upsertAndCommit(String baseTableInstantTime, Option
commitedInstantTime) {
+ return false;
+ }
+ };
+ }
}