Ah sorry, I think I now see what you mean. I think it's ok to add a `List<GlobalCommittableT> recoverCommittables(List<GlobalCommittableT>)` method.

On 22.09.20 09:42, Aljoscha Krettek wrote:
On 22.09.20 06:06, Steven Wu wrote:
In addition, it is undesirable to do the committed-or-not check in the
commit method, which happens for each checkpoint cycle. CommitResult
already indicates SUCCESS or not. when framework calls commit with a list
of GlobalCommittableT, it should be certain they are uncommitted. The only time we aren't sure is when a list of  GlobalCommittableT is restored from a checkpoint. `*recoverGlobalCommittables*` is the ideal place to do such a
check and filter out the ones that were already committed. Retained ones
will be committed in the next checkpoint cycle. Since framework takes care
of the checkpoint and restore, we need some hook for the sink to add the
custom logic on the restored list.

I think we don't need the `recoverGlobalCommittables()` hook. The sink implementation would have to do the filtering once, so it can either do it in the recover hook or it could do it in the next `commit()` call. Both of these would mean we only have to do one pass through the list and connect to Iceberg. Doing the check in `commit()` would mean the interface of GlobalCommittable is simpler and to me it seems natural that we do the check in the commit() method to ensure that commits are idempotent.

What do you think?

Aljoscha

Reply via email to