[ 
https://issues.apache.org/jira/browse/HUDI-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394387#comment-17394387
 ] 

ASF GitHub Bot commented on HUDI-1771:
--------------------------------------

vinothchandar commented on a change in pull request #3285:
URL: https://github.com/apache/hudi/pull/3285#discussion_r683844723



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##########
@@ -620,23 +633,31 @@ public boolean reachedEnd() throws IOException {
       while (logKeysIterator.hasNext()) {
         final String curKey = logKeysIterator.next();
         if (!keyToSkip.contains(curKey)) {
-          Option<IndexedRecord> insertAvroRecord =
-              
scanner.getRecords().get(curKey).getData().getInsertValue(tableSchema);
+          Option<IndexedRecord> insertAvroRecord = getInsetValue(curKey);
           if (insertAvroRecord.isPresent()) {
             // the record is a DELETE if insertAvroRecord not present, skipping
-            GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+            GenericRecord avroRecord = buildAvroRecordBySchema(
                 insertAvroRecord.get(),
                 requiredSchema,
                 requiredPos,
                 recordBuilder);
-            this.currentRecord = (RowData) 
avroToRowDataConverter.convert(requiredAvroRecord);
+            this.currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
+            
this.currentRecord.setRowKind(FormatUtils.getRowKind(insertAvroRecord.get(), 
this.operationPos));
             return false;
           }
         }
       }
       return true;
     }
 
+    private Option<IndexedRecord> getInsetValue(String curKey) throws 
IOException {

Review comment:
       typo

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
##########
@@ -120,8 +136,10 @@ public static long generateChecksum(byte[] data) {
   public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String 
payloadClazz, Pair<String, String> recordKeyPartitionPathPair) {
     String recKey = rec.get(recordKeyPartitionPathPair.getLeft()).toString();
     String partitionPath = 
rec.get(recordKeyPartitionPathPair.getRight()).toString();
+    String operation = getNullableValAsString(rec, 
HoodieRecord.OPERATION_METADATA_FIELD);

Review comment:
       use an `Option`?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -141,6 +141,13 @@ private static Schema getWriteSchema(HoodieWriteConfig 
config) {
     return new Schema.Parser().parse(config.getWriteSchema());
   }
 
+  /**
+   * Whether to include the '_hoodie_operation' field in the metadata fields.
+   */
+  protected boolean withOperationField() {

Review comment:
       why can't this be just a `WriteConfig`? that we can set to different 
values for spark and flink

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -350,18 +352,18 @@ public HoodieRestoreMetadata restore(HoodieEngineContext 
context, String restore
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String 
partitionPath, String fileId,
                                               Map<String, HoodieRecord<T>> 
keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
     if (requireSortedRecords()) {
-      return new HoodieSortedMergeHandle<>(config, instantTime, this, 
keyToNewRecords, partitionPath, fileId,
+      return new FlinkCompactorSortedMergeHandle<>(config, instantTime, this, 
keyToNewRecords, partitionPath, fileId,

Review comment:
       adv of doing this as a `WriteConfig` would be that these new classes 
need not exist?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -191,6 +192,17 @@ protected boolean isUpdateRecord(HoodieRecord<T> 
hoodieRecord) {
     return hoodieRecord.getCurrentLocation() != null;
   }
 
+  /**
+   * Add cdc operation to the record, default do nothing.
+   *
+   * @param record The record.
+   * @param flag   The change flag name.
+   * @see HoodieOperation
+   */
+  protected void addOperationToRecord(GenericRecord record, String flag) {

Review comment:
       let's have a enum for the op flag?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Propagate CDC format for hoodie
> -------------------------------
>
>                 Key: HUDI-1771
>                 URL: https://issues.apache.org/jira/browse/HUDI-1771
>             Project: Apache Hudi
>          Issue Type: New Feature
>          Components: Flink Integration
>            Reporter: Danny Chen
>            Assignee: Zheng yunhong
>            Priority: Major
>              Labels: pull-request-available, sev:normal
>             Fix For: 0.9.0
>
>
> Like what we discussed in the dev mailing list: 
> https://lists.apache.org/thread.html/r31b2d1404e4e043a5f875b78105ba6f9a801e78f265ad91242ad5eb2%40%3Cdev.hudi.apache.org%3E
> Keep the change flags make new use cases possible: using HUDI as the unified 
> storage format for DWD and DWS layer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to