yihua commented on code in PR #9593:
URL: https://github.com/apache/hudi/pull/9593#discussion_r1325211919
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -277,7 +278,15 @@ protected boolean writeUpdateRecord(HoodieRecord<T>
newRecord, HoodieRecord<T> o
}
updatedRecordsWritten++;
}
- return writeRecord(newRecord, combineRecordOpt, writerSchema,
config.getPayloadConfig().getProps(), isDelete);
+
+ // Insert possible insert logic.
+ Option<Pair<HoodieRecord, Schema>> processedRecord = recordMerger.merge(
+ Option.empty(), writerSchema, Option.of(newRecord), writerSchema,
config.getProps());
+ if (!processedRecord.isPresent() || !(processedRecord.get().getLeft()
instanceof HoodieEmptyRecord)) {
Review Comment:
```suggestion
if (!processedRecord.isPresent() || processedRecord.get().getLeft()
instanceof HoodieEmptyRecord) {
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -286,7 +295,15 @@ protected void writeInsertRecord(HoodieRecord<T>
newRecord) throws IOException {
if (newRecord.shouldIgnore(schema, config.getProps())) {
return;
}
Review Comment:
Should we remove `HoodieRecord#shouldIgnore` once merge API can cover such
case and consolidate the implementation of `shouldIgnore` in `HoodieAvroRecord`
to the new merger API?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -338,6 +355,7 @@ private boolean writeRecord(HoodieRecord<T> newRecord,
Option<HoodieRecord> comb
public void write(HoodieRecord<T> oldRecord) {
Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields
: writeSchema;
Schema newSchema = useWriterSchemaForCompaction ?
writeSchemaWithMetaFields : writeSchema;
+
Review Comment:
nit: let's avoid unnecessary cosmetic changes
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -286,7 +295,15 @@ protected void writeInsertRecord(HoodieRecord<T>
newRecord) throws IOException {
if (newRecord.shouldIgnore(schema, config.getProps())) {
return;
}
- writeInsertRecord(newRecord, schema, config.getProps());
+
+ // Insert possible insert logic.
+ Option<Pair<HoodieRecord, Schema>> processedRecord = recordMerger.merge(
+ Option.empty(), schema, Option.of(newRecord), schema,
config.getProps());
+ if (!processedRecord.isPresent() || !(processedRecord.get().getLeft()
instanceof HoodieEmptyRecord)) {
Review Comment:
```suggestion
if (!processedRecord.isPresent() || processedRecord.get().getLeft()
instanceof HoodieEmptyRecord) {
```
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -43,9 +43,24 @@ public String getMergingStrategy() {
}
@Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
- return combineAndGetUpdateValue(older, newer, newSchema, props)
- .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
+ public Option<Pair<HoodieRecord, Schema>> merge(Option<HoodieRecord> older,
+ Schema oldSchema,
+ Option<HoodieRecord> newer,
+ Schema newSchema,
+ TypedProperties props)
throws IOException {
+ boolean isValidNew = isValid(newer, newSchema, props);
+ boolean isValidOld = isValid(older, oldSchema, props);
+
+ if (!isValidOld && !isValidNew) {
+ return Option.empty();
+ } else if (isValidOld && !isValidNew) {
+ return Option.of(Pair.of(older.get(), oldSchema));
+ } else if (!isValidOld && isValidNew) {
Review Comment:
I'm thinking the `HoodieAvroRecord#shouldIgnore` can be implemented in this
branch and `HoodieRecord#shouldIgnore` API can be removed to avoid confusion.
Is this doable?
```
@Override
public boolean shouldIgnore(Schema recordSchema, Properties props) throws
IOException {
HoodieRecordPayload<?> recordPayload = getData();
// NOTE: Currently only records borne by [[ExpressionPayload]] can
currently be ignored,
// as such, we limit exposure of this method only to such payloads
if (recordPayload instanceof BaseAvroPayload && ((BaseAvroPayload)
recordPayload).canProduceSentinel()) {
Option<IndexedRecord> insertRecord =
recordPayload.getInsertValue(recordSchema, props);
return insertRecord.isPresent() && insertRecord.get().equals(SENTINEL);
}
return false;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]