>> I think we should go with something like >> List<GlobalCommitT> filterRecoveredCommittables(List<>)
>> to keep things simple. This should also be easy to do from the framework >> side and then the sink doesn't need to do any custom state handling. I second Aljoscha's proposal. For the first version there is already much stuff to do. For now I think it would be satisfied with IceBerg Sink. Best, Guowei On Tue, Sep 22, 2020 at 10:54 PM Aljoscha Krettek <aljos...@apache.org> wrote: > I think we should go with something like > > List<GlobalCommitT> filterRecoveredCommittables(List<>) > > to keep things simple. This should also be easy to do from the framework > side and then the sink doesn't need to do any custom state handling. > > Best, > Aljoscha > > On 22.09.20 16:03, Steven Wu wrote: > > Previous APIs discussed have been trying to do more in the framework. If > we > > take a different approach to a lighter framework, these sets of > > minimal APIs are probably good enough. Sink can handle the bookkeeping, > > merge, retry logics. > > > > /** > > * CommT is the DataFile in Iceberg > > * GlobalCommT is the checkpoint data type, like ManifestFile in Iceberg > > */ > > interface GlobalCommitter<CommT, GlobalCommT> { > > > > void collect(CommT); > > > > void commit(); > > > > List<GlobalCommT> snapshotState(); > > > > // this is just a callback to sink so that it can perform filter and > > retain the uncommitted GlobalCommT in the internal bookkeeping > > void recoveredCommittables(List<GlobalCommT>) ; > > } > > > > The most important need from the framework is to run GlobalCommitter in > the > > jobmanager. It involves the topology creation, checkpoint handling, > > serializing the executions of commit() calls etc. > > > > Thanks, > > Steven > > > > On Tue, Sep 22, 2020 at 6:39 AM Steven Wu <stevenz...@gmail.com> wrote: > > > >> It is fine to leave the CommitResult/RETRY outside the scope of > framework. > >> Then the framework might need to provide some hooks in the > >> checkpoint/restore logic. because the commit happened in the post > >> checkpoint completion step, sink needs to update the internal state when > >> the commit is successful so that the next checkpoint won't include the > >> committed GlobalCommT. > >> > >> Maybe GlobalCommitter can have an API like this? > >>> List<GlobalCommT> snapshotState(); > >> > >> But then we still need the recover API if we don't let sink directly > >> manage the state. > >>> List<GlobalCommT> recoverCommittables(List<GlobalCommT>) > >> > >> Thanks, > >> Steven > >> > >> On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <aljos...@apache.org> > >> wrote: > >> > >>> On 22.09.20 13:26, Guowei Ma wrote: > >>>> Actually I am not sure adding `isAvailable` is enough. Maybe it is > not. > >>>> But for the initial version I hope we could make the sink api sync > >>> because > >>>> there is already a lot of stuff that has to finish. :--) > >>> > >>> I agree, for the first version we should stick to a simpler synchronous > >>> interface. > >>> > >>> Aljoscha > >>> > >> > > > >