Thank you for the detailed explanation! I'll look into the conflict
detection and validation APIs to come up with a solution.

Tamas

On Wed, Mar 8, 2023 at 5:26 PM Ryan Blue <b...@tabular.io> wrote:

> Hi Tamas,
>
> You're right about the problem that needs to be solved. This also exists
> for the same task in Spark. Currently, we ensure correctness by running
> validations at commit time that detect these cases and then retry or
> recover.
>
> When committing using `RowDelta`, both jobs would set a conflict detection
> filter (the portion of the table being modified), starting snapshot, and
> call `validateNoConflictingDataFiles`. That instructs Iceberg to check
> whether there are data files that can conflict with the operation being
> committed. The second process committing would refresh the table and then
> need to validate that no conflicting data files were added since the
> starting point in time (which is the same for both jobs). It would then
> find the data file written by the first committer and detect that it may
> contain records that were modified in the second commit. That will cause
> Iceberg to throw an exception that the commit can't proceed and needs to be
> fixed.
>
> After that, you'd need to take some action to recover or retry. You could
> restart from the last checkpoint, but that discards all of the work you've
> already done. Another option is to keep track of the IDs that were upserted
> and write deletes against those IDs for the new data files. That would
> probably be better than retrying the entire checkpoint. From Iceberg's
> perspective, recovery choices are left up to the engine. Flink might
> recover as I described, but engines like Spark just retry the entire write
> since that's simpler.
>
> Ryan
>
> On Wed, Mar 8, 2023 at 7:39 AM Tamas Sule <tamas.s...@cloudera.com.invalid>
> wrote:
>
>> Hello,
>>
>> We are experimenting with implementing position deletes in the Flink
>> connector to provide better support between components in our stack as
>> equality deletes are not widely supported yet. I identified an issue which
>> (potentially) makes this impossible to do correctly in upsert mode and I'd
>> like to get some feedback if my understanding is correct.
>>
>> So, let's imagine we create an Iceberg table using Flink with a primary
>> key and we want to write to this table in upsert mode. If we replace
>> equality deletes with position deletes I think there is a possibility that
>> we end up with multiple records with the same primary key. Let's say we
>> have a Flink job called 'A' which writes to this table in upsert mode and
>> job 'B' which does a similar thing or just someone manually wants to upsert
>> some data at the same time. If both jobs receive data with the same primary
>> key very close to each other in time, then essentially what would happen is
>> that when we scan the table for rows we want to position delete, neither of
>> the jobs would see the change from the other job as the commits haven't
>> happened yet. So if there is an older row, both jobs will try to delete
>> that row which is fine, however they will also write the new data as well
>> and now we have 2 rows with the same primary key. As far as I understand
>> this would not happen with equality deletes because with that we just apply
>> the equality fields as a filter to previous data files, so in that case we
>> would have only 1 record with that primary key which is from the job that
>> did  the commit later.
>>
>> Please let me know what you think.
>>
>> Thanks,
>> Tamas
>>
>
>
> --
> Ryan Blue
> Tabular
>

Reply via email to