kingeasternsun commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-1023020791


   > I merged this PR and did some testing. i used flink cdc to consume binlog 
and wrote to iceberg table, then run some flink sql query(streaming) and 
compared with mysql original query , then the results were not matched.
   > 
   > After some debug I found 2 issues.
   > 
   > 1. flink writing cdc data to iceberg will ignore UPDATE_BEFORE and treat 
UPDATE_AFTER as retract message,  it's completely not correct. The relevant 
codes are as follows:
   > 
   > `
   > 
   > ```
   > //org.apache.iceberg.flink.sink.BaseDeltaTaskWriter
   >  switch (row.getRowKind()) {
   >   case INSERT:
   >   case UPDATE_AFTER:
   >     if (upsert) {
   >       writer.delete(row);
   >     }
   >     writer.write(row);
   >     break;
   > 
   >   case UPDATE_BEFORE:
   >     if (upsert) {
   >       break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing 
to prevent delete one row twice
   >     }
   >     writer.delete(row);
   >     break;
   >   case DELETE:
   >     writer.delete(row);
   >     break;
   > 
   >   default:
   >     throw new UnsupportedOperationException("Unknown row kind: " + 
row.getRowKind());
   > }
   > ```
   > 
   > `
   > 
   > 2. when there is only delete operation during the period of one 
snapshot,the snapshot of iceberg will contains only equality delete files  and 
no data file(data file is deleted),  flink will ignore equality delete files 
and miss all  -D .   function 
org.apache.iceberg.ManifestGroup.planStreamingFiles()   create FileScanTask  
only when data file exists, so flink can not process equality delete files.
   > 
   > After fixed above 2 issues the test is passed.
   
   
   
   > BaseDeltaTaskWriter
   
   I think this code will be more clear if written like this
   ```java
     public void write(RowData row) throws IOException {
       RowDataDeltaWriter writer = route(row);
   
       switch (row.getRowKind()) {
         case INSERT:
           if (upsert) {
             writer.delete(row);
           }
           writer.write(row);
           break;
         case UPDATE_AFTER:
           writer.write(row);
           break;
         case UPDATE_BEFORE:
         case DELETE:
           writer.delete(row);
           break;
   
         default:
           throw new UnsupportedOperationException("Unknown row kind: " + 
row.getRowKind());
       }
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to