danny0405 commented on code in PR #9593:
URL: https://github.com/apache/hudi/pull/9593#discussion_r1340784698


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -139,23 +140,34 @@ protected void doWrite(HoodieRecord record, Schema 
schema, TypedProperties props
           return;
         }
 
-        MetadataValues metadataValues = new 
MetadataValues().setFileName(path.getName());
-        HoodieRecord populatedRecord =
-            record.prependMetaFields(schema, writeSchemaWithMetaFields, 
metadataValues, config.getProps());
-
-        if (preserveMetadata) {
-          fileWriter.write(record.getRecordKey(), populatedRecord, 
writeSchemaWithMetaFields);
-        } else {
-          fileWriter.writeWithMetadata(record.getKey(), populatedRecord, 
writeSchemaWithMetaFields);
+        // Inject custom logic for the record.
+        Option<Pair<HoodieRecord, Schema>> processedRecord = 
config.getRecordMerger().insert(record, schema, config.getProps());
+        if (!processedRecord.isPresent()
+            || 
HoodieOperation.isDelete(processedRecord.get().getLeft().getOperation())

Review Comment:
   We can revert all the changes to create handle, the delete messages already 
got handled in line 137.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -270,14 +272,46 @@ protected boolean writeUpdateRecord(HoodieRecord<T> 
newRecord, HoodieRecord<T> o
     if (combineRecordOpt.isPresent()) {
       if (oldRecord.getData() != combineRecordOpt.get().getData()) {
         // the incoming record is chosen
-        isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+        isDelete = is_delete_record(newRecord, writerSchema, 
config.getProps());
       } else {
         // the incoming record is dropped
         return false;
       }
       updatedRecordsWritten++;
     }
-    return writeRecord(newRecord, combineRecordOpt, writerSchema, 
config.getPayloadConfig().getProps(), isDelete);
+
+    // Do delete since the newRecord is a delete record.
+    if (isDelete) {
+      recordsDeleted++;
+      newRecord.unseal();
+      newRecord.clearNewLocation();
+      newRecord.seal();
+      newRecord.deflate();
+      return true;
+    }
+
+    // Inject custom insert/abort logic.
+    Option<Pair<HoodieRecord, Schema>> processedRecord = recordMerger.merge(
+        Option.empty(), writerSchema, combineRecordOpt, writerSchema, 
config.getProps());
+    if (!processedRecord.isPresent()
+        || !is_valid_record(processedRecord.get().getLeft(), writerSchema, 
config.getProps())) {
+      return false;
+    }
+
+    // Write the record finally.
+    // TODO: remove delete logic from writeRecord function.
+    return writeRecord(newRecord, Option.of(processedRecord.get().getLeft()), 
writerSchema, config.getPayloadConfig().getProps(), isDelete);
+  }
+
+  protected boolean is_delete_record(HoodieRecord record, Schema schema, 
TypedProperties props) throws IOException {
+    return record.isDelete(schema, props)
+        || record instanceof HoodieEmptyRecord
+        || (record.getData() != null && record.getData() instanceof 
EmptyHoodieRecordPayload)
+        || HoodieOperation.isDelete(record.getOperation());

Review Comment:
   Revert all the changes to `HoodieMergeHandle`, then add a new interface to 
the Merger API:
   
   ```java
   /**
    * Checks the merged record valility before flushing into dist, if returns 
false, the given record would be ignored.
    * In some scenarios, the bussiness logic needs to check the validity of the 
merged record, so this interface give 
    * a chance for the user to do a sanity check.
    *
    * <p> This interface is experimental and might got evolved in the future.
   **/
   @Experimental
   default boolean isValid(HoodieRecord record, Schema schema) {
     return true;
   }
   ```
   
   This interface would be invoked before each **merged** record flushing. Only 
**merged** record needs this check currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to