vinothchandar commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r925085937


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +447,57 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected GenericData.Record cdcRecord(CDCOperationEnum operation, String 
recordKey, String partitionPath,

Review Comment:
   RFC-46 is moving away from GenericRecord as the canonical data record. So we 
may want to move in that direction as well. We need to sequence the two efforts 
correctly. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -229,6 +230,10 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
         HoodieLogBlock logBlock = logFormatReaderWrapper.next();
         final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
         totalLogBlocks.incrementAndGet();
+        if (logBlock.getBlockType() == CDC_DATA_BLOCK) {

Review Comment:
   if the data block is rolled back or commit is rolled back, is the CDC block 
skipped correctly. Can we write some tests to cover these scenarios



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -44,6 +44,12 @@ public class HoodieWriteStat implements Serializable {
    */
   private String path;
 
+  /**
+   * Relative cdc file path that store the CDC data.
+   */
+  @Nullable
+  private String cdcPath;

Review Comment:
   do these new fields evolve well? i.e backwards compatible with existing 
write stat without these new fields?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java:
##########
@@ -236,6 +240,40 @@ public static <T> T fromJsonString(String jsonStr, 
Class<T> clazz) throws Except
     return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
+  /**
+   * parse the bytes of deltacommit, and get the base file and the log files 
belonging to this
+   * provided file group.
+   */
+  public static Pair<String, List<String>> 
getFileSliceForFileGroupFromDeltaCommit(
+      byte[] bytes, HoodieFileGroupId fileGroupId)
+      throws Exception {
+    String jsonStr = new String(bytes, StandardCharsets.UTF_8);
+    if (jsonStr.isEmpty()) {
+      return null;

Review Comment:
   please avoid using `null` as return type



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -114,6 +118,8 @@ protected HoodieWriteHandle(HoodieWriteConfig config, 
String instantTime, String
                               HoodieTable<T, I, K, O> hoodieTable, 
Option<Schema> overriddenSchema,
                               TaskContextSupplier taskContextSupplier) {
     super(config, Option.of(instantTime), hoodieTable);
+    this.keyFiled = config.populateMetaFields() ? 
HoodieRecord.RECORD_KEY_METADATA_FIELD

Review Comment:
   typo: keyField



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java:
##########
@@ -236,6 +240,40 @@ public static <T> T fromJsonString(String jsonStr, 
Class<T> clazz) throws Except
     return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
+  /**
+   * parse the bytes of deltacommit, and get the base file and the log files 
belonging to this
+   * provided file group.
+   */
+  public static Pair<String, List<String>> 
getFileSliceForFileGroupFromDeltaCommit(

Review Comment:
   does nt any of the existing code do this? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -281,7 +313,18 @@ private boolean writeUpdateRecord(HoodieRecord<T> 
hoodieRecord, GenericRecord ol
         return false;
       }
     }
-    return writeRecord(hoodieRecord, indexedRecord, isDelete);
+    boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
+    if (cdcEnabled) {
+      if (indexedRecord.isPresent()) {
+        GenericRecord record = (GenericRecord) indexedRecord.get();
+        cdcData.add(cdcRecord(CDCOperationEnum.UPDATE, 
hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(),

Review Comment:
   In general, with all the java/jvm overhead, I think. it'll be more than 300M 
comfortably. Can we use the spillable map instead here in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to