Hi, all >From the above discussion we could find that FLIP focuses on providing an unified transactional sink API. So I updated the FLIP's title to "Unified Transactional Sink API". But I found that the old link could not be opened again.
I would update the link[1] here. Sorry for the inconvenience. [1]https://cwiki.apache.org/confluence/x/KEJ4CQ Best, Guowei On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <guowei....@gmail.com> wrote: > Hi, Steven > > >>I also have a clarifying question regarding the WriterStateT. Since > >>IcebergWriter won't need to checkpoint any state, should we set it to > *Void* > >>type? Since getWriterStateSerializer() returns Optional, that is clear > and > >>we can return Optional.empty(). > > Yes I think you could do it. If you return Optional.empty() we would > ignore all the state you return. > > Best, > Guowei > > > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi,Steven >> >> Thank you for reading the FLIP so carefully. >> 1. The frame can not know which `GlobalCommT` to retry if we use the >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`. >> 2. Of course we can let the `commit` return more detailed info but it >> might be too complicated. >> 3. On the other hand, I think only when restoring IcebergSink needs a >> collection of `GlobalCommT` and giving back another collection of >> `GlobalCommT` that are not committed. >> >> Best, >> Guowei >> >> >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <stevenz...@gmail.com> wrote: >> >>> Guowei, >>> >>> Thanks a lot for updating the wiki page. It looks great. >>> >>> I noticed one inconsistency in the wiki with your last summary email for >>> GlobalCommitter interface. I think the version in the summary email is >>> the >>> intended one, because rollover from previous failed commits can >>> accumulate >>> a list. >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki >>> => >>> CommitResult commit(List<GlobalCommT> globalCommittable); // in the >>> summary email >>> >>> I also have a clarifying question regarding the WriterStateT. Since >>> IcebergWriter won't need to checkpoint any state, should we set it to >>> *Void* >>> type? Since getWriterStateSerializer() returns Optional, that is clear >>> and >>> we can return Optional.empty(). >>> >>> Thanks, >>> Steven >>> >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com> wrote: >>> >>> > Thanks Aljoscha for your suggestion. I have updated FLIP. Any >>> comments are >>> > welcome. >>> > >>> > Best, >>> > Guowei >>> > >>> > >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <aljos...@apache.org> >>> > wrote: >>> > >>> > > Yes, that sounds good! I'll probably have some comments on the FLIP >>> > > about the names of generic parameters and the Javadoc but we can >>> address >>> > > them later or during implementation. >>> > > >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for >>> > > globalCommit() but we can also do that as a later addition. >>> > > >>> > > So I think we're good to go to update the FLIP, do any last minute >>> > > changes and then vote. >>> > > >>> > > Best, >>> > > Aljoscha >>> > > >>> > > On 23.09.20 06:13, Guowei Ma wrote: >>> > > > Hi, all >>> > > > >>> > > > Thank everyone very much for your ideas and suggestions. I would >>> try to >>> > > > summarize again the consensus :). Correct me if I am wrong or >>> > > misunderstand >>> > > > you. >>> > > > >>> > > > ## Consensus-1 >>> > > > >>> > > > 1. The motivation of the unified sink API is to decouple the sink >>> > > > implementation from the different runtime execution mode. >>> > > > 2. The initial scope of the unified sink API only covers the file >>> > system >>> > > > type, which supports the real transactions. The FLIP focuses more >>> on >>> > the >>> > > > semantics the new sink api should support. >>> > > > 3. We prefer the first alternative API, which could give the >>> framework >>> > a >>> > > > greater opportunity to optimize. >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would >>> be >>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` >>> method. >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to >>> be >>> > more >>> > > > focused. >>> > > > >>> > > > ## Consensus-2 >>> > > > >>> > > > 1. What should the “Unified Sink API” support/cover? It includes >>> two >>> > > > aspects. 1. The same sink implementation would work for both the >>> batch >>> > > and >>> > > > stream execution mode. 2. In the long run we should give the sink >>> > > developer >>> > > > the ability of building “arbitrary” topologies. But for Flink-1.12 >>> we >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink. >>> > > > 2. Because the batch execution mode does not have the normal >>> checkpoint >>> > > the >>> > > > sink developer should not depend on it any more if we want a >>> unified >>> > > sink. >>> > > > 3. We can benefit by providing an asynchronous `Writer` version. >>> But >>> > > > because the unified sink is already very complicated, we don’t add >>> this >>> > > in >>> > > > the first version. >>> > > > >>> > > > >>> > > > According to these consensus I would propose the first version of >>> the >>> > new >>> > > > sink api as follows. What do you think? Any comments are welcome. >>> > > > >>> > > > /** >>> > > > * This interface lets the sink developer build a simple >>> transactional >>> > > sink >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. >>> > > > * This sink topology includes one {@link Writer} + one {@link >>> > > Committer} + >>> > > > one {@link GlobalCommitter}. >>> > > > * The {@link Writer} is responsible for producing the >>> committable. >>> > > > * The {@link Committer} is responsible for committing a single >>> > > > committables. >>> > > > * The {@link GlobalCommitter} is responsible for committing an >>> > > aggregated >>> > > > committable, which we called global committables. >>> > > > * >>> > > > * But both the {@link Committer} and the {@link GlobalCommitter} >>> are >>> > > > optional. >>> > > > */ >>> > > > interface TSink<IN, CommT, GCommT, WriterS> { >>> > > > >>> > > > Writer<IN, CommT, WriterS> createWriter(InitContext >>> > > initContext); >>> > > > >>> > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext >>> > > initContext, >>> > > > List<WriterS> states); >>> > > > >>> > > > Optional<Committer<CommT>> createCommitter(); >>> > > > >>> > > > Optional<GlobalCommitter<CommT, GCommT>> >>> > > createGlobalCommitter(); >>> > > > >>> > > > SimpleVersionedSerializer<CommT> >>> getCommittableSerializer(); >>> > > > >>> > > > Optional<SimpleVersionedSerializer<GCommT>> >>> > > > getGlobalCommittableSerializer(); >>> > > > } >>> > > > >>> > > > /** >>> > > > * The {@link GlobalCommitter} is responsible for committing an >>> > > aggregated >>> > > > committable, which we called global committables. >>> > > > */ >>> > > > interface GlobalCommitter<CommT, GCommT> { >>> > > > >>> > > > /** >>> > > > * This method is called when restoring from a failover. >>> > > > * @param globalCommittables the global committables that >>> are >>> > > not >>> > > > committed in the previous session. >>> > > > * @return the global committables that should be >>> committed >>> > > again >>> > > > in the current session. >>> > > > */ >>> > > > List<GCommT> filterRecoveredCommittables(List<GCommT> >>> > > > globalCommittables); >>> > > > >>> > > > /** >>> > > > * Compute an aggregated committable from a collection of >>> > > > committables. >>> > > > * @param committables a collection of committables that >>> are >>> > > needed >>> > > > to combine >>> > > > * @return an aggregated committable >>> > > > */ >>> > > > GCommT combine(List<CommT> committables); >>> > > > >>> > > > void commit(List<GCommT> globalCommittables); >>> > > > >>> > > > /** >>> > > > * There are no committables any more. >>> > > > */ >>> > > > void endOfInput(); >>> > > > } >>> > > > >>> > > > Best, >>> > > > Guowei >>> > > >>> > > >>> > >>> >>