jonvex commented on code in PR #10500:
URL: https://github.com/apache/hudi/pull/10500#discussion_r1463716137
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java:
##########
@@ -116,16 +128,32 @@ static Option<JavaRDD<HoodieRecord>>
createHoodieRecords(HoodieStreamer.Config c
return new CloseableMappingIterator<>(ClosableIterator.wrap(itr),
rec -> {
InternalRow row = (InternalRow)
deserializer.deserialize(rec).get();
- String recordKey = builtinKeyGenerator.getRecordKey(row,
baseStructType).toString();
- String partitionPath = builtinKeyGenerator.getPartitionPath(row,
baseStructType).toString();
- return new HoodieSparkRecord(new HoodieKey(recordKey,
partitionPath),
-
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType,
targetStructType).apply(row), targetStructType, false);
+ try {
+ String recordKey = builtinKeyGenerator.getRecordKey(row,
baseStructType).toString();
+ String partitionPath = builtinKeyGenerator.getPartitionPath(row,
baseStructType).toString();
+ return Either.left(new HoodieSparkRecord(new
HoodieKey(recordKey, partitionPath),
+
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType,
targetStructType).apply(row), targetStructType, false));
+ } catch (Exception e) {
+ if (!shouldErrorTable) {
+ throw e;
+ }
+ try {
+ return Either.right(HoodieAvroUtils.avroToJsonString(rec,
false));
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
});
+
});
} else {
throw new UnsupportedOperationException(recordType.name());
}
- return records;
+ if (shouldErrorTable) {
+
errorTableWriter.get().addErrorEvents(records.filter(Either::isRight).map(Either::asRight).map(evStr
-> new ErrorEvent<>(evStr,
Review Comment:
decided to add a config as well so
```
boolean shouldErrorTable = errorTableWriter.isPresent() &&
props.getBoolean(ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(),
ERROR_ENABLE_VALIDATE_RECORD_CREATION.defaultValue());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]