[ 
https://issues.apache.org/jira/browse/HUDI-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17383054#comment-17383054
 ] 

ASF GitHub Bot commented on HUDI-1771:
--------------------------------------

danny0405 commented on a change in pull request #3285:
URL: https://github.com/apache/hudi/pull/3285#discussion_r672027021



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
##########
@@ -44,6 +46,28 @@
   private FormatUtils() {
   }
 
+  public static RowKind getRowKind(IndexedRecord record, int index) {
+    if (index == -1) {
+      return RowKind.INSERT;

Review comment:
       Add comment for the method: Returns the RowKind of the given record, 
never null.
   Returns `RowKind.INSERT` when the given field index not found.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
##########
@@ -312,14 +318,18 @@ private DataType getProducedDataType() {
     TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
     final Schema tableAvroSchema;
     try {
-      tableAvroSchema = schemaUtil.getTableAvroSchema();
+      // always includes the change flag
+      tableAvroSchema = schemaUtil.getTableAvroSchema(true);

Review comment:
       Seems unnecessary change.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##########
@@ -132,14 +133,20 @@
    */
   private boolean emitDelete;
 
+  /**
+   * Position of the hoodie cdc operation.
+   */
+  private int operationPos;
+

Review comment:
       Move to `MergeOnReadTableState`.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##########
@@ -639,8 +657,12 @@ public void close() throws IOException {
     private Option<IndexedRecord> mergeRowWithLog(
         RowData curRow,
         String curKey) throws IOException {
+      final HoodieRecord<?> record = logRecords.get(curKey);
+      if (HoodieOperation.isDelete(record.getOperation())) {
+        return Option.empty();
+      }

Review comment:
       We need to emit the deletes when flag `emitDelete` is true.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
##########
@@ -312,14 +318,18 @@ private DataType getProducedDataType() {
     TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
     final Schema tableAvroSchema;
     try {
-      tableAvroSchema = schemaUtil.getTableAvroSchema();
+      // always includes the change flag
+      tableAvroSchema = schemaUtil.getTableAvroSchema(true);
     } catch (Exception e) {
       throw new HoodieException("Get table avro schema error", e);
     }
     final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
     final RowType rowType = (RowType) rowDataType.getLogicalType();
     final RowType requiredRowType = (RowType) 
getProducedDataType().notNull().getLogicalType();
 
+    final Schema.Field operationField = 
tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD);
+    final int operationPos = operationField == null ? -1 : 
operationField.pos();

Review comment:
       We can move the `operationPos` into the `MergeOnReadTableState` to 
simplify the logic.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##########
@@ -624,6 +634,14 @@ public boolean reachedEnd() throws IOException {
       return true;
     }
 
+    private Option<IndexedRecord> getInsetValue(String curKey) throws 
IOException {
+      final HoodieRecord<?> record = logRecords.get(curKey);
+      if (HoodieOperation.isDelete(record.getOperation())) {
+        return Option.empty();

Review comment:
       We need to emit the deletes when flag `emitDelete` is true.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
##########
@@ -196,7 +198,11 @@ public boolean isBounded() {
 
   @Override
   public ChangelogMode getChangelogMode() {
-    return ChangelogMode.insertOnly();
+    if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
+      return ChangelogModes.FULL;
+    } else {
+      return ChangelogMode.insertOnly();
+    }

Review comment:
       ```java
   return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
       ? ChangelogModes.FULL
       : ChangelogMode.insertOnly();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Propagate CDC format for hoodie
> -------------------------------
>
>                 Key: HUDI-1771
>                 URL: https://issues.apache.org/jira/browse/HUDI-1771
>             Project: Apache Hudi
>          Issue Type: New Feature
>          Components: Flink Integration
>            Reporter: Danny Chen
>            Assignee: Zheng yunhong
>            Priority: Major
>              Labels: pull-request-available, sev:normal
>             Fix For: 0.9.0
>
>
> Like what we discussed in the dev mailing list: 
> https://lists.apache.org/thread.html/r31b2d1404e4e043a5f875b78105ba6f9a801e78f265ad91242ad5eb2%40%3Cdev.hudi.apache.org%3E
> Keep the change flags make new use cases possible: using HUDI as the unified 
> storage format for DWD and DWS layer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to