Thank you for the detailed explanation! I'll look into the conflict detection and validation APIs to come up with a solution.
Tamas On Wed, Mar 8, 2023 at 5:26 PM Ryan Blue <b...@tabular.io> wrote: > Hi Tamas, > > You're right about the problem that needs to be solved. This also exists > for the same task in Spark. Currently, we ensure correctness by running > validations at commit time that detect these cases and then retry or > recover. > > When committing using `RowDelta`, both jobs would set a conflict detection > filter (the portion of the table being modified), starting snapshot, and > call `validateNoConflictingDataFiles`. That instructs Iceberg to check > whether there are data files that can conflict with the operation being > committed. The second process committing would refresh the table and then > need to validate that no conflicting data files were added since the > starting point in time (which is the same for both jobs). It would then > find the data file written by the first committer and detect that it may > contain records that were modified in the second commit. That will cause > Iceberg to throw an exception that the commit can't proceed and needs to be > fixed. > > After that, you'd need to take some action to recover or retry. You could > restart from the last checkpoint, but that discards all of the work you've > already done. Another option is to keep track of the IDs that were upserted > and write deletes against those IDs for the new data files. That would > probably be better than retrying the entire checkpoint. From Iceberg's > perspective, recovery choices are left up to the engine. Flink might > recover as I described, but engines like Spark just retry the entire write > since that's simpler. > > Ryan > > On Wed, Mar 8, 2023 at 7:39 AM Tamas Sule <tamas.s...@cloudera.com.invalid> > wrote: > >> Hello, >> >> We are experimenting with implementing position deletes in the Flink >> connector to provide better support between components in our stack as >> equality deletes are not widely supported yet. I identified an issue which >> (potentially) makes this impossible to do correctly in upsert mode and I'd >> like to get some feedback if my understanding is correct. >> >> So, let's imagine we create an Iceberg table using Flink with a primary >> key and we want to write to this table in upsert mode. If we replace >> equality deletes with position deletes I think there is a possibility that >> we end up with multiple records with the same primary key. Let's say we >> have a Flink job called 'A' which writes to this table in upsert mode and >> job 'B' which does a similar thing or just someone manually wants to upsert >> some data at the same time. If both jobs receive data with the same primary >> key very close to each other in time, then essentially what would happen is >> that when we scan the table for rows we want to position delete, neither of >> the jobs would see the change from the other job as the commits haven't >> happened yet. So if there is an older row, both jobs will try to delete >> that row which is fine, however they will also write the new data as well >> and now we have 2 rows with the same primary key. As far as I understand >> this would not happen with equality deletes because with that we just apply >> the equality fields as a filter to previous data files, so in that case we >> would have only 1 record with that primary key which is from the job that >> did the commit later. >> >> Please let me know what you think. >> >> Thanks, >> Tamas >> > > > -- > Ryan Blue > Tabular >