aiborodin opened a new issue, #14425:
URL: https://github.com/apache/iceberg/issues/14425

   ### Apache Iceberg version
   
   1.10.0 (latest release)
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   The `DynamicIcebergSink` used with the Iceberg REST catalog duplicates data 
after recovering from a failed POST commit request.
   There is a race condition in the commit recovery logic of the 
`DynamicCommitter`, which checks if the data was committed using the 
`getMaxCommittedCheckpointId()`. This check leads to data duplication if the 
previous commit is still being processed in the REST catalog during Flink 
recovery, but the client has already aborted its POST request and sent a new 
one for the same data while recovering the Flink job.
   
   This happens in the following sequence of events:
   1. `DynamicIcebergSink` starts the checkpoint 1 (`chk-1`)
   2. `DynamicIcebergSink` completes `chk-1` and saves the commit request 1 
(`CR-1`) with data/delete files to the checkpoint state
   3. `DynamicCommitter` finds no commits for `chk-1` (`maxCheckpointId` is 
`-1`) and sends the POST commit request (`Req-1`) to the REST catalog
   4. REST catalog receives the request and triggers the commit to the 
underlying catalog (eg JDBC)
   5. `DynamicCommitter` restarts due to a Flink task manager process failure:
     - This can be any failure, even a shutdown of the Flink client JVM; one 
example is a `SocketTimeoutException` in the REST client, which happens when 
the REST catalog was too slow to commit to the underlying catalog, for example, 
due to retrying for too long.
   6. `DynamicCommitter` restarts and re-attempts to commit `CR-1` from `chk-1` 
with a new POST request (`Req-2`):
     - This happens because `Req-1` has not been completed yet, so the 
`maxCheckpointId` is still `-1`
   7. `Req-1` completes and updates the latest snapshot to `maxCheckpointId = 1`
   8. `Req-2` completes and creates a snapshot with duplicate content 
(identical data/delete files)
    - `Req-2` can either commit immediately and pass update requirements 
because it arrived before `Req-1` is committed, or be retried by the REST 
client (`DynamicCommitter`) due to a `CommitFailedException` triggered by the 
new base snapshot created by `Req-1`.
   
   This data duplication scenario is depicted in the following sequence diagram:
   
   <img width="2664" height="2832" alt="Image" 
src="https://github.com/user-attachments/assets/0ed4924e-73f0-47a3-a577-4a51f7b23454";
 />
   
   The `IcebergSink` will have the same issue due to identical code paths in 
`IcebergCommitter` and `DynamicCommitter`.
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [x] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to