openinx opened a new issue #1639:
URL: https://github.com/apache/iceberg/issues/1639


   Currently,  we apache iceberg have implemented the equality-deletes and 
position-delete internally,  but we still lack the ability to work with 
different engines.  In this PR https://github.com/apache/iceberg/pull/1444 ,  
we spark could read the records by merging delete files and. data files,  and 
PR https://github.com/apache/iceberg/pull/1517 give flink the similar ability 
to read delete records.   So the core thing what we need is to make the engine 
writers enable,  that means : 
   
   1.   spark, flink, hive etc could write deletes CDC events into apache 
iceberg table; 
   2.  we could execute SQL sentences to delete records which is matching the 
query conditions in a batch job. 
   
   For spark engine,  I think @rdblue and @chenjunjiedada  have had a good plan 
to get those finished.  I would try to PR a PoC solution in this issue, mainly 
focused on the first cases,  batch delete should be similar to the spark 
implementation. 
   
   We flink have provided a lib to consume various database's change log 
events,  and made them into a RowData DataStream.  The RowData has a `RowKind` 
to indicate whether this event is an INSERT ? DELETE ? UPDATE_BEFORE ?  
UPDATE_AFTER ?   This is very friendly to accomplish the cdc writers for flink.
   
   ```java
   /**
    * Lists all kinds of changes that a row can describe in a changelog.
    */
   @PublicEvolving
   public enum RowKind {
   
        // Note: Enums have no stable hash code across different JVMs, use 
toByteValue() for
        // this purpose.
   
        /**
         * Insertion operation.
         */
        INSERT("+I", (byte) 0),
   
        /**
         * Update operation with the previous content of the updated row.
         *
         * <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for 
modelling an update that needs
         * to retract the previous row first. It is useful in cases of a 
non-idempotent update, i.e., an
         * update of a row that is not uniquely identifiable by a key.
         */
        UPDATE_BEFORE("-U", (byte) 1),
   
        /**
         * Update operation with new content of the updated row.
         *
         * <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for 
modelling an update that
         * needs to retract the previous row first. OR it describes an 
idempotent update, i.e., an update
         * of a row that is uniquely identifiable by a key.
         */
        UPDATE_AFTER("+U", (byte) 2),
   
        /**
         * Deletion operation.
         */
        DELETE("-D", (byte) 3);
          // ... 
   }
   ```
   
   It may need some work for how to integrate the current flink CDC API with 
apache iceberg implementation. Let's track those thing in this issue.
   
   
    


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to