> 1. The frame can not know which `GlobalCommT` to retry if we use the > List<GlobalCommT> as parameter when the `commit` returns `RETRY`. > 2. Of course we can let the `commit` return more detailed info but it might > be too complicated.
If commit(List<GlobalCommT>) returns RETRY, it means the whole list needs to be retried. E.g. we have some outage with metadata service, commits for checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we don't want to commit them one by one. It is faster to commit the whole list as one batch. > 3. On the other hand, I think only when restoring IcebergSink needs a > collection of `GlobalCommT` and giving back another collection of > `GlobalCommT` that are not committed That is when the job restarted due to failure or deployment. On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma <guowei....@gmail.com> wrote: > Hi, all > > From the above discussion we could find that FLIP focuses on providing an > unified transactional sink API. So I updated the FLIP's title to "Unified > Transactional Sink API". But I found that the old link could not be opened > again. > > I would update the link[1] here. Sorry for the inconvenience. > > [1]https://cwiki.apache.org/confluence/x/KEJ4CQ > > Best, > Guowei > > > On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <guowei....@gmail.com> wrote: > > > Hi, Steven > > > > >>I also have a clarifying question regarding the WriterStateT. Since > > >>IcebergWriter won't need to checkpoint any state, should we set it to > > *Void* > > >>type? Since getWriterStateSerializer() returns Optional, that is clear > > and > > >>we can return Optional.empty(). > > > > Yes I think you could do it. If you return Optional.empty() we would > > ignore all the state you return. > > > > Best, > > Guowei > > > > > > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <guowei....@gmail.com> wrote: > > > >> Hi,Steven > >> > >> Thank you for reading the FLIP so carefully. > >> 1. The frame can not know which `GlobalCommT` to retry if we use the > >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`. > >> 2. Of course we can let the `commit` return more detailed info but it > >> might be too complicated. > >> 3. On the other hand, I think only when restoring IcebergSink needs a > >> collection of `GlobalCommT` and giving back another collection of > >> `GlobalCommT` that are not committed. > >> > >> Best, > >> Guowei > >> > >> > >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <stevenz...@gmail.com> wrote: > >> > >>> Guowei, > >>> > >>> Thanks a lot for updating the wiki page. It looks great. > >>> > >>> I noticed one inconsistency in the wiki with your last summary email > for > >>> GlobalCommitter interface. I think the version in the summary email is > >>> the > >>> intended one, because rollover from previous failed commits can > >>> accumulate > >>> a list. > >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki > >>> => > >>> CommitResult commit(List<GlobalCommT> globalCommittable); // in the > >>> summary email > >>> > >>> I also have a clarifying question regarding the WriterStateT. Since > >>> IcebergWriter won't need to checkpoint any state, should we set it to > >>> *Void* > >>> type? Since getWriterStateSerializer() returns Optional, that is clear > >>> and > >>> we can return Optional.empty(). > >>> > >>> Thanks, > >>> Steven > >>> > >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com> > wrote: > >>> > >>> > Thanks Aljoscha for your suggestion. I have updated FLIP. Any > >>> comments are > >>> > welcome. > >>> > > >>> > Best, > >>> > Guowei > >>> > > >>> > > >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek < > aljos...@apache.org> > >>> > wrote: > >>> > > >>> > > Yes, that sounds good! I'll probably have some comments on the FLIP > >>> > > about the names of generic parameters and the Javadoc but we can > >>> address > >>> > > them later or during implementation. > >>> > > > >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result > for > >>> > > globalCommit() but we can also do that as a later addition. > >>> > > > >>> > > So I think we're good to go to update the FLIP, do any last minute > >>> > > changes and then vote. > >>> > > > >>> > > Best, > >>> > > Aljoscha > >>> > > > >>> > > On 23.09.20 06:13, Guowei Ma wrote: > >>> > > > Hi, all > >>> > > > > >>> > > > Thank everyone very much for your ideas and suggestions. I would > >>> try to > >>> > > > summarize again the consensus :). Correct me if I am wrong or > >>> > > misunderstand > >>> > > > you. > >>> > > > > >>> > > > ## Consensus-1 > >>> > > > > >>> > > > 1. The motivation of the unified sink API is to decouple the sink > >>> > > > implementation from the different runtime execution mode. > >>> > > > 2. The initial scope of the unified sink API only covers the file > >>> > system > >>> > > > type, which supports the real transactions. The FLIP focuses more > >>> on > >>> > the > >>> > > > semantics the new sink api should support. > >>> > > > 3. We prefer the first alternative API, which could give the > >>> framework > >>> > a > >>> > > > greater opportunity to optimize. > >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which > would > >>> be > >>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` > >>> method. > >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to > >>> be > >>> > more > >>> > > > focused. > >>> > > > > >>> > > > ## Consensus-2 > >>> > > > > >>> > > > 1. What should the “Unified Sink API” support/cover? It includes > >>> two > >>> > > > aspects. 1. The same sink implementation would work for both the > >>> batch > >>> > > and > >>> > > > stream execution mode. 2. In the long run we should give the sink > >>> > > developer > >>> > > > the ability of building “arbitrary” topologies. But for > Flink-1.12 > >>> we > >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg > sink. > >>> > > > 2. Because the batch execution mode does not have the normal > >>> checkpoint > >>> > > the > >>> > > > sink developer should not depend on it any more if we want a > >>> unified > >>> > > sink. > >>> > > > 3. We can benefit by providing an asynchronous `Writer` version. > >>> But > >>> > > > because the unified sink is already very complicated, we don’t > add > >>> this > >>> > > in > >>> > > > the first version. > >>> > > > > >>> > > > > >>> > > > According to these consensus I would propose the first version of > >>> the > >>> > new > >>> > > > sink api as follows. What do you think? Any comments are welcome. > >>> > > > > >>> > > > /** > >>> > > > * This interface lets the sink developer build a simple > >>> transactional > >>> > > sink > >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. > >>> > > > * This sink topology includes one {@link Writer} + one {@link > >>> > > Committer} + > >>> > > > one {@link GlobalCommitter}. > >>> > > > * The {@link Writer} is responsible for producing the > >>> committable. > >>> > > > * The {@link Committer} is responsible for committing a single > >>> > > > committables. > >>> > > > * The {@link GlobalCommitter} is responsible for committing an > >>> > > aggregated > >>> > > > committable, which we called global committables. > >>> > > > * > >>> > > > * But both the {@link Committer} and the {@link > GlobalCommitter} > >>> are > >>> > > > optional. > >>> > > > */ > >>> > > > interface TSink<IN, CommT, GCommT, WriterS> { > >>> > > > > >>> > > > Writer<IN, CommT, WriterS> createWriter(InitContext > >>> > > initContext); > >>> > > > > >>> > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext > >>> > > initContext, > >>> > > > List<WriterS> states); > >>> > > > > >>> > > > Optional<Committer<CommT>> createCommitter(); > >>> > > > > >>> > > > Optional<GlobalCommitter<CommT, GCommT>> > >>> > > createGlobalCommitter(); > >>> > > > > >>> > > > SimpleVersionedSerializer<CommT> > >>> getCommittableSerializer(); > >>> > > > > >>> > > > Optional<SimpleVersionedSerializer<GCommT>> > >>> > > > getGlobalCommittableSerializer(); > >>> > > > } > >>> > > > > >>> > > > /** > >>> > > > * The {@link GlobalCommitter} is responsible for committing an > >>> > > aggregated > >>> > > > committable, which we called global committables. > >>> > > > */ > >>> > > > interface GlobalCommitter<CommT, GCommT> { > >>> > > > > >>> > > > /** > >>> > > > * This method is called when restoring from a failover. > >>> > > > * @param globalCommittables the global committables > that > >>> are > >>> > > not > >>> > > > committed in the previous session. > >>> > > > * @return the global committables that should be > >>> committed > >>> > > again > >>> > > > in the current session. > >>> > > > */ > >>> > > > List<GCommT> filterRecoveredCommittables(List<GCommT> > >>> > > > globalCommittables); > >>> > > > > >>> > > > /** > >>> > > > * Compute an aggregated committable from a collection > of > >>> > > > committables. > >>> > > > * @param committables a collection of committables that > >>> are > >>> > > needed > >>> > > > to combine > >>> > > > * @return an aggregated committable > >>> > > > */ > >>> > > > GCommT combine(List<CommT> committables); > >>> > > > > >>> > > > void commit(List<GCommT> globalCommittables); > >>> > > > > >>> > > > /** > >>> > > > * There are no committables any more. > >>> > > > */ > >>> > > > void endOfInput(); > >>> > > > } > >>> > > > > >>> > > > Best, > >>> > > > Guowei > >>> > > > >>> > > > >>> > > >>> > >> >