xushiyan commented on a change in pull request #3602:
URL: https://github.com/apache/hudi/pull/3602#discussion_r703072819



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
##########
@@ -112,9 +112,9 @@ public void write(GenericRecord oldRecord) {
         HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
         if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
           if (useWriterSchema) {
-            writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields));
+            writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, 
config.getProps()));
           } else {
-            writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(tableSchema));
+            writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));

Review comment:
       pls double confirm this and similar changes above are compatible to all 
other payload class implementations.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
##########
@@ -68,17 +67,31 @@ public DefaultHoodieRecordPayload(Option<GenericRecord> 
record) {
     /*
      * We reached a point where the value is disk is older than the incoming 
record.
      */
-    eventTime = Option.ofNullable(getNestedFieldVal(incomingRecord, 
hoodieConfig
-        .getString(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), 
true));
+    updateEventTime(incomingRecord, properties);
 
     /*
      * Now check if the incoming record is a delete record.
      */
-    if (isDeleteRecord(incomingRecord)) {
+    return isDeleteRecord(incomingRecord) ? Option.empty() : 
Option.of(incomingRecord);
+  }
+
+  @Override
+  public Option<IndexedRecord> getInsertValue(Schema schema, Properties 
properties) throws IOException {
+    if (recordBytes.length == 0) {
       return Option.empty();
-    } else {
-      return Option.of(incomingRecord);
     }
+    GenericRecord incomingRecord = incomingRecord(schema);
+    updateEventTime(incomingRecord, properties);
+
+    return isDeleteRecord(incomingRecord) ? Option.empty() : 
Option.of(incomingRecord);
+  }
+
+  private GenericRecord incomingRecord(Schema schema) throws IOException {
+    return bytesToAvro(recordBytes, schema);
+  }

Review comment:
       feel like this method is redundant. it only invokes `bytesToAvro`, which 
is more obvious to reader what happened. the variable name `incomingRecord` 
already tells what is returned.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
##########
@@ -68,17 +67,31 @@ public DefaultHoodieRecordPayload(Option<GenericRecord> 
record) {
     /*
      * We reached a point where the value is disk is older than the incoming 
record.
      */
-    eventTime = Option.ofNullable(getNestedFieldVal(incomingRecord, 
hoodieConfig
-        .getString(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), 
true));
+    updateEventTime(incomingRecord, properties);
 
     /*
      * Now check if the incoming record is a delete record.
      */
-    if (isDeleteRecord(incomingRecord)) {
+    return isDeleteRecord(incomingRecord) ? Option.empty() : 
Option.of(incomingRecord);
+  }
+
+  @Override
+  public Option<IndexedRecord> getInsertValue(Schema schema, Properties 
properties) throws IOException {
+    if (recordBytes.length == 0) {
       return Option.empty();
-    } else {
-      return Option.of(incomingRecord);
     }
+    GenericRecord incomingRecord = incomingRecord(schema);
+    updateEventTime(incomingRecord, properties);
+
+    return isDeleteRecord(incomingRecord) ? Option.empty() : 
Option.of(incomingRecord);
+  }
+
+  private GenericRecord incomingRecord(Schema schema) throws IOException {
+    return bytesToAvro(recordBytes, schema);
+  }
+
+  private void updateEventTime(GenericRecord record, Properties properties) {
+    eventTime = Option.ofNullable(getNestedFieldVal(record, 
properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), 
true));

Review comment:
       this method better be static, easier to test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to