openinx opened a new issue #1639: URL: https://github.com/apache/iceberg/issues/1639
Currently, we apache iceberg have implemented the equality-deletes and position-delete internally, but we still lack the ability to work with different engines. In this PR https://github.com/apache/iceberg/pull/1444 , we spark could read the records by merging delete files and. data files, and PR https://github.com/apache/iceberg/pull/1517 give flink the similar ability to read delete records. So the core thing what we need is to make the engine writers enable, that means : 1. spark, flink, hive etc could write deletes CDC events into apache iceberg table; 2. we could execute SQL sentences to delete records which is matching the query conditions in a batch job. For spark engine, I think @rdblue and @chenjunjiedada have had a good plan to get those finished. I would try to PR a PoC solution in this issue, mainly focused on the first cases, batch delete should be similar to the spark implementation. We flink have provided a lib to consume various database's change log events, and made them into a RowData DataStream. The RowData has a `RowKind` to indicate whether this event is an INSERT ? DELETE ? UPDATE_BEFORE ? UPDATE_AFTER ? This is very friendly to accomplish the cdc writers for flink. ```java /** * Lists all kinds of changes that a row can describe in a changelog. */ @PublicEvolving public enum RowKind { // Note: Enums have no stable hash code across different JVMs, use toByteValue() for // this purpose. /** * Insertion operation. */ INSERT("+I", (byte) 0), /** * Update operation with the previous content of the updated row. * * <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that needs * to retract the previous row first. It is useful in cases of a non-idempotent update, i.e., an * update of a row that is not uniquely identifiable by a key. */ UPDATE_BEFORE("-U", (byte) 1), /** * Update operation with new content of the updated row. * * <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that * needs to retract the previous row first. OR it describes an idempotent update, i.e., an update * of a row that is uniquely identifiable by a key. */ UPDATE_AFTER("+U", (byte) 2), /** * Deletion operation. */ DELETE("-D", (byte) 3); // ... } ``` It may need some work for how to integrate the current flink CDC API with apache iceberg implementation. Let's track those thing in this issue. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
