alexeykudinkin commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r970277676


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -273,6 +283,33 @@ protected HoodieFileWriter createNewFileWriter(String 
instantTime, Path path, Ho
     return HoodieFileWriterFactory.getFileWriter(instantTime, path, 
hoodieTable, config, schema, taskContextSupplier);
   }
 
+  protected HoodieLogFormat.Writer createLogWriter(
+      Option<FileSlice> fileSlice, String baseCommitTime) throws IOException {
+    int logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
+    long logFileSize = 0L;
+    String logWriteToken = writeToken;
+    if (fileSlice.isPresent()) {
+      Option<HoodieLogFile> latestLogFileOpt = 
fileSlice.get().getLatestLogFile();
+      if (latestLogFileOpt.isPresent()) {
+        HoodieLogFile latestLogFile = latestLogFileOpt.get();
+        logVersion = latestLogFile.getLogVersion();
+        logFileSize = latestLogFile.getFileSize();
+        logWriteToken = 
FSUtils.getWriteTokenFromLogPath(latestLogFile.getPath());
+      }
+    }
+    return HoodieLogFormat.newWriterBuilder()
+        
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
+        .withFileId(fileId)
+        .overBaseCommit(baseCommitTime)
+        .withLogVersion(logVersion)
+        .withFileSize(logFileSize)
+        .withSizeThreshold(config.getLogFileMaxSize())
+        .withFs(fs)
+        .withRolloverLogWriteToken(writeToken)
+        .withLogWriteToken(logWriteToken)
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();

Review Comment:
   Are we mixing `CDCBlocks` w/ normal Delta Data Blocks? 
   
   I don't think we can do that as this will severely affect performance for 
pure non-CDC queries for MOR tables



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected SerializableRecord cdcRecord(HoodieCDCOperation operation, String 
recordKey, String partitionPath,
+                                         GenericRecord oldRecord, 
GenericRecord newRecord) {
+    GenericData.Record record;
+    if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER))
 {
+      record = CDCUtils.cdcRecord(operation.getValue(), instantTime,
+          oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath));
+    } else if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE))
 {
+      record = CDCUtils.cdcRecord(operation.getValue(), recordKey, oldRecord);
+    } else {
+      record = CDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return new SerializableRecord(record);
+  }
+
+  protected GenericRecord addCommitMetadata(GenericRecord record, String 
recordKey, String partitionPath) {
+    if (record != null && config.populateMetaFields()) {
+      GenericRecord rewriteRecord = rewriteRecord(record);
+      String seqId = HoodieRecord.generateSequenceId(instantTime, 
getPartitionId(), writtenRecordCount.get());
+      HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, 
seqId);
+      HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, recordKey, 
partitionPath, newFilePath.getName());
+      return rewriteRecord;
+    }
+    return record;
+  }
+
+  protected Option<AppendResult> writeCDCData() {
+    if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || 
(recordsWritten == insertRecordsWritten)) {
+      // the following cases where we do not need to write out the cdc file:
+      // case 1: all the data from the previous file slice are deleted. and no 
new data is inserted;
+      // case 2: all the data are new-coming,
+      return Option.empty();
+    }

Review Comment:
   Second that: CDC is non-trivial logic that will be proliferated across 
Writing Handles, therefore we should encapsulate its state along w/ 
well-defined API w/in single component



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -102,6 +118,15 @@
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
   protected HoodieFileWriter<IndexedRecord> fileWriter;
+  // a flag that indicate whether allow the change data to write out a cdc log 
file.
+  protected boolean cdcEnabled = false;
+  protected String cdcSupplementalLoggingMode;

Review Comment:
   Why is this a String?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -281,7 +318,19 @@ private boolean writeUpdateRecord(HoodieRecord<T> 
hoodieRecord, GenericRecord ol
         return false;
       }
     }
-    return writeRecord(hoodieRecord, indexedRecord, isDelete);
+    boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
+    if (cdcEnabled) {
+      String recordKey = StringUtils.join(

Review Comment:
   This might not be matching the record key. We should be using proper 
record-key in here.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +453,65 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected SerializableRecord createCDCRecord(HoodieCDCOperation operation, 
String recordKey, String partitionPath,
+                                               GenericRecord oldRecord, 
GenericRecord newRecord) {
+    GenericData.Record record;
+    if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER))
 {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), instantTime,
+          oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath));
+    } else if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE))
 {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey, 
oldRecord);
+    } else {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return new SerializableRecord(record);
+  }
+
+  protected GenericRecord addCommitMetadata(GenericRecord record, String 
recordKey, String partitionPath) {

Review Comment:
   Why are we adding the metadata to CDC payload? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +453,65 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected SerializableRecord createCDCRecord(HoodieCDCOperation operation, 
String recordKey, String partitionPath,
+                                               GenericRecord oldRecord, 
GenericRecord newRecord) {
+    GenericData.Record record;
+    if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER))
 {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), instantTime,
+          oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath));
+    } else if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE))
 {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey, 
oldRecord);
+    } else {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return new SerializableRecord(record);
+  }
+
+  protected GenericRecord addCommitMetadata(GenericRecord record, String 
recordKey, String partitionPath) {
+    if (record != null && config.populateMetaFields()) {
+      GenericRecord rewriteRecord = rewriteRecord(record);
+      String seqId = HoodieRecord.generateSequenceId(instantTime, 
getPartitionId(), writtenRecordCount.get());
+      HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, 
seqId);
+      HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, recordKey, 
partitionPath, newFilePath.getName());
+      return rewriteRecord;
+    }
+    return record;
+  }
+
+  protected Option<AppendResult> writeCDCData() {
+    if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || 
(recordsWritten == insertRecordsWritten)) {
+      // the following cases where we do not need to write out the cdc file:
+      // case 1: all the data from the previous file slice are deleted. and no 
new data is inserted;
+      // case 2: all the data are new-coming,
+      return Option.empty();
+    }
+    try {
+      String keyField = config.populateMetaFields()
+          ? HoodieRecord.RECORD_KEY_METADATA_FIELD
+          : 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+      Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
+      if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER))
 {

Review Comment:
   Let's abstract this conditional as a method (i believe you already have one 
below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to